This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd219d11203 [FLINK-38355][table] Support `CREATE OR ALTER MATERIALIZED 
TABLE` syntax
cd219d11203 is described below

commit cd219d11203f25d3833f43549238197c0be088ff
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Nov 18 02:26:09 2025 +0100

    [FLINK-38355][table] Support `CREATE OR ALTER MATERIALIZED TABLE` syntax
---
 .../dev/table/materialized-table/statements.md     |  70 +++++-
 .../dev/table/materialized-table/statements.md     |  70 +++++-
 .../MaterializedTableManager.java                  |  15 +-
 .../service/MaterializedTableStatementITCase.java  |  10 +-
 .../src/main/codegen/data/Parser.tdd               |   2 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  17 +-
 .../src/main/codegen/templates/Parser.jj           |   1 +
 .../sql/parser/ddl/SqlAlterMaterializedTable.java  |   4 +-
 .../ddl/SqlAlterMaterializedTableAsQuery.java      |   2 +
 .../sql/parser/ddl/SqlCreateMaterializedTable.java |  25 +-
 .../ddl/SqlCreateOrAlterMaterializedTable.java     |  77 ++++++
 .../MaterializedTableStatementParserTest.java      | 105 +++++---
 .../flink/table/api/internal/ShowCreateUtil.java   |   2 +-
 .../AlterMaterializedTableAsQueryOperation.java    |   6 +-
 .../table/api/internal/ShowCreateUtilTest.java     |  35 ++-
 .../catalog/CatalogBaseTableResolutionTest.java    |  26 +-
 .../table/catalog/CatalogMaterializedTable.java    |  49 +++-
 .../flink/table/catalog/CatalogPropertiesUtil.java |  18 +-
 .../catalog/DefaultCatalogMaterializedTable.java   |  40 ++-
 .../catalog/ResolvedCatalogMaterializedTable.java  |  15 +-
 .../table/catalog/CatalogPropertiesUtilTest.java   |   3 +-
 .../AbstractAlterMaterializedTableConverter.java   |   3 +-
 .../AbstractCreateMaterializedTableConverter.java  |  25 +-
 .../SqlAlterMaterializedTableAsQueryConverter.java |  51 +---
 .../SqlCreateMaterializedTableConverter.java       | 126 ----------
 ...SqlCreateOrAlterMaterializedTableConverter.java | 278 +++++++++++++++++++++
 .../operations/converters/SqlNodeConverters.java   |   2 +-
 .../planner/utils/MaterializedTableUtils.java      |  52 +++-
 .../operations/SqlDdlToOperationConverterTest.java |   7 +-
 ...erializedTableNodeToOperationConverterTest.java | 167 ++++++++++++-
 .../catalog/TestFileSystemCatalogTest.java         |  12 +-
 31 files changed, 988 insertions(+), 327 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md 
b/docs/content.zh/docs/dev/table/materialized-table/statements.md
index d65491c9084..bf6c3b57e4b 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/statements.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md
@@ -27,14 +27,14 @@ under the License.
 # 物化表语法
 
 Flink SQL 目前支持以下物化表操作:
-- [CREATE MATERIALIZED TABLE](#create-materialized-table)
+- [CREATE [OR ALTER] MATERIALIZED TABLE](#create-or-alter-materialized-table)
 - [ALTER MATERIALIZED TABLE](#alter-materialized-table)
 - [DROP MATERIALIZED TABLE](#drop-materialized-table)
 
-# CREATE MATERIALIZED TABLE
+# CREATE [OR ALTER] MATERIALIZED TABLE
 
 ```
-CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
+CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
 [(
     { <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> }[ , ...n]
@@ -228,6 +228,30 @@ CREATE MATERIALIZED TABLE my_materialized_table
     AS SELECT * FROM kafka_catalog.db1.kafka_table;
 ```
 
+## OR ALTER
+
+The `OR ALTER` clause provides create-or-update semantics:
+
+- **If the materialized table does not exist**: Creates a new materialized 
table with the specified options
+- **If the materialized table exists**: Modifies the query definition (behaves 
like `ALTER MATERIALIZED TABLE AS`)
+
+This is particularly useful in declarative deployment scenarios where you want 
to define the desired state without checking if the materialized table already 
exists.
+
+**Behavior when materialized table exists:**
+
+The operation updates the materialized table similarly to [ALTER MATERIALIZED 
TABLE AS](#as-select_statement-1):
+
+**Full mode:**
+1. Updates the schema and query definition
+2. The materialized table is refreshed using the new query when the next 
refresh job is triggered
+
+**Continuous mode:**
+1. Pauses the current running refresh job
+2. Updates the schema and query definition
+3. Starts a new refresh job from the beginning
+
+See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+
 ## 示例
 
 假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟。
@@ -313,6 +337,46 @@ It might happen that types of columns are not the same, in 
that case implicit ca
 If for some of the combinations implicit cast is not supported then there will 
be validation error thrown. 
 Also, it is worth to note that reordering can also be done here. 
 
+Create or alter a materialized table executed twice:
+
+```sql
+-- First execution: creates the materialized table
+CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
+    FRESHNESS = INTERVAL '10' SECOND
+    AS
+    SELECT
+        user_id,
+        COUNT(*) AS event_count,
+        SUM(amount) AS total_amount
+    FROM
+        kafka_catalog.db1.events
+    WHERE
+        event_type = 'purchase'
+    GROUP BY
+        user_id;
+
+-- Second execution: alters the query definition (adds avg_amount column)
+CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
+    FRESHNESS = INTERVAL '10' SECOND
+    AS
+    SELECT
+        user_id,
+        COUNT(*) AS event_count,
+        SUM(amount) AS total_amount,
+        AVG(amount) AS avg_amount  -- Add a new nullable column at the end
+    FROM
+        kafka_catalog.db1.events
+    WHERE
+        event_type = 'purchase'
+    GROUP BY
+        user_id;
+```
+
+<span class="label label-danger">Note</span>
+- When altering an existing materialized table, schema evolution currently 
only supports adding `nullable` columns to the end of the original materialized 
table's schema.
+- In continuous mode, the new refresh job will not restore from the state of 
the original refresh job when altering.
+- All limitations from both CREATE and ALTER operations apply.
+
 ## 限制
 - Does not support explicitly specifying physical columns which are not used 
in the query
 - 不支持在 select 查询语句中引用临时表、临时视图或临时函数
diff --git a/docs/content/docs/dev/table/materialized-table/statements.md 
b/docs/content/docs/dev/table/materialized-table/statements.md
index 6e9c0b153b8..1f0c88106a6 100644
--- a/docs/content/docs/dev/table/materialized-table/statements.md
+++ b/docs/content/docs/dev/table/materialized-table/statements.md
@@ -27,14 +27,14 @@ under the License.
 # Materialized Table Statements
 
 Flink SQL supports the following Materialized Table statements for now:
-- [CREATE MATERIALIZED TABLE](#create-materialized-table)
+- [CREATE [OR ALTER] MATERIALIZED TABLE](#create-or-alter-materialized-table)
 - [ALTER MATERIALIZED TABLE](#alter-materialized-table)
 - [DROP MATERIALIZED TABLE](#drop-materialized-table)
 
-# CREATE MATERIALIZED TABLE
+# CREATE [OR ALTER] MATERIALIZED TABLE
 
 ```
-CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
+CREATE [OR ALTER] MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
 [(
     { <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> }[ , ...n]
@@ -228,6 +228,30 @@ CREATE MATERIALIZED TABLE my_materialized_table
     AS SELECT * FROM kafka_catalog.db1.kafka_table;
 ```
 
+## OR ALTER
+
+The `OR ALTER` clause provides create-or-update semantics:
+
+- **If the materialized table does not exist**: Creates a new materialized 
table with the specified options
+- **If the materialized table exists**: Modifies the query definition (behaves 
like `ALTER MATERIALIZED TABLE AS`)
+
+This is particularly useful in declarative deployment scenarios where you want 
to define the desired state without checking if the materialized table already 
exists.
+
+**Behavior when materialized table exists:**
+
+The operation updates the materialized table similarly to [ALTER MATERIALIZED 
TABLE AS](#as-select_statement-1):
+
+**Full mode:**
+1. Updates the schema and query definition
+2. The materialized table is refreshed using the new query when the next 
refresh job is triggered
+
+**Continuous mode:**
+1. Pauses the current running refresh job
+2. Updates the schema and query definition
+3. Starts a new refresh job from the beginning
+
+See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+
 ## Examples
 
 Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes.
@@ -311,6 +335,46 @@ It might happen that types of columns are not the same, in 
that case implicit ca
 If for some of the combinations implicit cast is not supported then there will 
be validation error thrown.
 Also, it is worth to note that reordering can also be done here.
 
+Create or alter a materialized table executed twice:
+
+```sql
+-- First execution: creates the materialized table
+CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
+    FRESHNESS = INTERVAL '10' SECOND
+    AS
+    SELECT
+        user_id,
+        COUNT(*) AS event_count,
+        SUM(amount) AS total_amount
+    FROM
+        kafka_catalog.db1.events
+    WHERE
+        event_type = 'purchase'
+    GROUP BY
+        user_id;
+
+-- Second execution: alters the query definition (adds avg_amount column)
+CREATE OR ALTER MATERIALIZED TABLE my_materialized_table
+    FRESHNESS = INTERVAL '10' SECOND
+    AS
+    SELECT
+        user_id,
+        COUNT(*) AS event_count,
+        SUM(amount) AS total_amount,
+        AVG(amount) AS avg_amount  -- Add a new nullable column at the end
+    FROM
+        kafka_catalog.db1.events
+    WHERE
+        event_type = 'purchase'
+    GROUP BY
+        user_id;
+```
+
+<span class="label label-danger">Note</span>
+- When altering an existing materialized table, schema evolution currently 
only supports adding `nullable` columns to the end of the original materialized 
table's schema.
+- In continuous mode, the new refresh job will not restore from the state of 
the original refresh job when altering.
+- All limitations from both CREATE and ALTER operations apply.
+
 ## Limitations
 
 - Does not support explicitly specifying physical columns which are not used 
in the query 
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index bafb3650711..3c49ca0a59e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -265,7 +265,7 @@ public class MaterializedTableManager {
         CreateRefreshWorkflow createRefreshWorkflow =
                 new CreatePeriodicRefreshWorkflow(
                         materializedTableIdentifier,
-                        catalogMaterializedTable.getDefinitionQuery(),
+                        catalogMaterializedTable.getExpandedQuery(),
                         cronExpression,
                         getSessionInitializationConf(operationExecutor),
                         Collections.emptyMap(),
@@ -569,7 +569,7 @@ public class MaterializedTableManager {
         String insertStatement =
                 getInsertStatement(
                         materializedTableIdentifier,
-                        catalogMaterializedTable.getDefinitionQuery(),
+                        catalogMaterializedTable.getExpandedQuery(),
                         dynamicOptions);
 
         JobExecutionResult result =
@@ -651,7 +651,7 @@ public class MaterializedTableManager {
         String insertStatement =
                 getRefreshStatement(
                         materializedTableIdentifier,
-                        materializedTable.getDefinitionQuery(),
+                        materializedTable.getExpandedQuery(),
                         refreshPartitions,
                         dynamicOptions);
 
@@ -868,8 +868,8 @@ public class MaterializedTableManager {
                 LOG.warn(
                         "Failed to start the continuous refresh job for 
materialized table {} using new query {}, rollback to origin query {}.",
                         tableIdentifier,
-                        op.getCatalogMaterializedTable().getDefinitionQuery(),
-                        suspendMaterializedTable.getDefinitionQuery(),
+                        op.getCatalogMaterializedTable().getExpandedQuery(),
+                        suspendMaterializedTable.getExpandedQuery(),
                         e);
 
                 AlterMaterializedTableChangeOperation rollbackChangeOperation =
@@ -891,7 +891,7 @@ public class MaterializedTableManager {
                 throw new SqlExecutionException(
                         String.format(
                                 "Failed to start the continuous refresh job 
using new query %s when altering materialized table %s select query.",
-                                
op.getCatalogMaterializedTable().getDefinitionQuery(),
+                                
op.getCatalogMaterializedTable().getExpandedQuery(),
                                 tableIdentifier),
                         e);
             }
@@ -944,8 +944,7 @@ public class MaterializedTableManager {
                                 
oldMaterializedTable.getSerializedRefreshHandler()));
             } else if (tableChange instanceof 
TableChange.ModifyDefinitionQuery) {
                 rollbackChanges.add(
-                        TableChange.modifyDefinitionQuery(
-                                oldMaterializedTable.getDefinitionQuery()));
+                        
TableChange.modifyDefinitionQuery(oldMaterializedTable.getExpandedQuery()));
             } else {
                 throw new ValidationException(
                         String.format(
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index de73723176b..32f119bd28e 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -1268,7 +1268,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                 .isEqualTo(
                         Collections.singletonList(
                                 Column.physical("order_amount_sum", 
DataTypes.INT())));
-        assertThat(newTable.getDefinitionQuery())
+        assertThat(newTable.getExpandedQuery())
                 .isEqualTo(
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
@@ -1342,7 +1342,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops"));
 
-        
assertThat(newTable.getDefinitionQuery()).isEqualTo(oldTable.getDefinitionQuery());
+        
assertThat(newTable.getExpandedQuery()).isEqualTo(oldTable.getExpandedQuery());
 
         // the refresh handler in full mode should be the same as the old one
         assertThat(oldTable.getSerializedRefreshHandler())
@@ -1408,7 +1408,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                 .isEqualTo(
                         Collections.singletonList(
                                 Column.physical("order_amount_sum", 
DataTypes.INT())));
-        assertThat(newTable.getDefinitionQuery())
+        assertThat(newTable.getExpandedQuery())
                 .isEqualTo(
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
@@ -1495,7 +1495,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
                 .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
         assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
                 .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
-        assertThat(newTable.getDefinitionQuery())
+        assertThat(newTable.getExpandedQuery())
                 .isEqualTo(
                         String.format(
                                 "SELECT COALESCE(`tmp`.`user_id`, CAST(0 AS 
BIGINT)) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
@@ -1594,7 +1594,7 @@ class MaterializedTableStatementITCase extends 
AbstractMaterializedTableStatemen
 
         assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
                 .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
-        assertThat(newTable.getDefinitionQuery())
+        assertThat(newTable.getExpandedQuery())
                 .isEqualTo(
                         String.format(
                                 "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
`pv`\n"
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 12f5a5b6c07..748b0102449 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -76,9 +76,9 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
     "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
     "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
-    "org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateModel"
     "org.apache.flink.sql.parser.ddl.SqlCreateModelAs"
+    "org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
     "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 3bcaa7314d4..0fcbc543511 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1852,9 +1852,9 @@ SqlNode SqlReplaceTable() :
 }
 
 /**
-  * Parses a CREATE MATERIALIZED TABLE statement.
+  * Parses a CREATE [OR ALTER] MATERIALIZED TABLE statement.
 */
-SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean 
isTemporary) :
+SqlCreate SqlCreateOrAlterMaterializedTable(Span s, boolean replace, boolean 
isTemporary) :
 {
     final SqlParserPos startPos = s.pos();
     SqlIdentifier tableName;
@@ -1870,8 +1870,14 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     SqlNode asQuery = null;
     SqlParserPos pos = startPos;
     boolean isColumnsIdentifiersOnly = false;
+    boolean isOrAlter = false;
 }
 {
+    [
+      <OR> <ALTER> {
+        isOrAlter = true;
+      }
+    ]
     <MATERIALIZED>
     {
         if (isTemporary) {
@@ -1946,7 +1952,7 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     <AS>
     asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
     {
-        return new SqlCreateMaterializedTable(
+        return new SqlCreateOrAlterMaterializedTable(
             startPos.plus(getPos()),
             tableName,
             columnList,
@@ -1958,7 +1964,8 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
             propertyList,
             (SqlIntervalLiteral) freshness,
             refreshMode,
-            asQuery);
+            asQuery,
+            isOrAlter);
     }
 }
 
@@ -2646,7 +2653,7 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
     (
         create = SqlCreateCatalog(s, replace)
         |
-        create = SqlCreateMaterializedTable(s, replace, isTemporary)
+        create = SqlCreateOrAlterMaterializedTable(s, replace, isTemporary)
         |
         create = SqlCreateTable(s, replace, isTemporary)
         |
diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj 
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index 0de358fd15b..1bb939a46fa 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -4353,6 +4353,7 @@ SqlCreate SqlCreate() :
 {
     <CREATE> { s = span(); }
     [
+        LOOKAHEAD(2)
         <OR> <REPLACE> {
             replace = true;
         }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
index 3380b6a7e20..91095d2944f 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
@@ -29,8 +29,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 import static java.util.Objects.requireNonNull;
 
 /**
- * Abstract class to describe statements like ALTER MATERIALIZED TABLE 
[catalogName.]
- * [dataBasesName.]tableName ...
+ * Abstract class to describe statements like ALTER MATERIALIZED TABLE
+ * [catalogName.][dataBasesName.]tableName ...
  */
 public abstract class SqlAlterMaterializedTable extends SqlCall {
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableAsQuery.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableAsQuery.java
index 48ace3caee7..69b9d20021d 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableAsQuery.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableAsQuery.java
@@ -52,7 +52,9 @@ public class SqlAlterMaterializedTableAsQuery extends 
SqlAlterMaterializedTable
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
+        writer.newlineAndIndent();
         writer.keyword("AS");
+        writer.newlineAndIndent();
         asQuery.unparse(writer, leftPrec, rightPrec);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index b1ebd322914..02d3b8509ac 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -47,7 +47,7 @@ import static java.util.Objects.requireNonNull;
 /** CREATE MATERIALIZED TABLE DDL sql call. */
 public class SqlCreateMaterializedTable extends SqlCreate implements 
ExtendedSqlNode {
 
-    public static final SqlSpecialOperator OPERATOR =
+    public static final SqlSpecialOperator CREATE_OPERATOR =
             new SqlSpecialOperator("CREATE MATERIALIZED TABLE", 
SqlKind.CREATE_TABLE);
 
     private final SqlIdentifier tableName;
@@ -56,14 +56,14 @@ public class SqlCreateMaterializedTable extends SqlCreate 
implements ExtendedSql
 
     private final List<SqlTableConstraint> tableConstraints;
 
+    private final SqlWatermark watermark;
+
     private final @Nullable SqlCharStringLiteral comment;
 
     private final @Nullable SqlDistribution distribution;
 
     private final SqlNodeList partitionKeyList;
 
-    private final SqlWatermark watermark;
-
     private final SqlNodeList propertyList;
 
     private final @Nullable SqlIntervalLiteral freshness;
@@ -73,6 +73,7 @@ public class SqlCreateMaterializedTable extends SqlCreate 
implements ExtendedSql
     private final SqlNode asQuery;
 
     public SqlCreateMaterializedTable(
+            SqlSpecialOperator operator,
             SqlParserPos pos,
             SqlIdentifier tableName,
             SqlNodeList columnList,
@@ -85,7 +86,7 @@ public class SqlCreateMaterializedTable extends SqlCreate 
implements ExtendedSql
             @Nullable SqlIntervalLiteral freshness,
             @Nullable SqlRefreshMode refreshMode,
             SqlNode asQuery) {
-        super(OPERATOR, pos, false, false);
+        super(operator, pos, false, false);
         this.tableName = requireNonNull(tableName, "tableName should not be 
null");
         this.columnList = columnList;
         this.tableConstraints = tableConstraints;
@@ -100,11 +101,6 @@ public class SqlCreateMaterializedTable extends SqlCreate 
implements ExtendedSql
         this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
     }
 
-    @Override
-    public SqlOperator getOperator() {
-        return OPERATOR;
-    }
-
     @Override
     public List<SqlNode> getOperandList() {
         return ImmutableNullableList.of(
@@ -185,12 +181,15 @@ public class SqlCreateMaterializedTable extends SqlCreate 
implements ExtendedSql
         // CREATE MATERIALIZED TABLE supports passing only column identifiers 
in the column list. If
         // the first column in the list is an identifier, then we assume the 
rest of the
         // columns are identifiers as well.
-        return !getColumnList().isEmpty() && getColumnList().get(0) instanceof 
SqlIdentifier;
+        return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
     }
 
-    @Override
-    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.keyword("CREATE MATERIALIZED TABLE");
+    protected void unparseMaterializedTableAs(
+            final SqlOperator operation,
+            final SqlWriter writer,
+            final int leftPrec,
+            final int rightPrec) {
+        writer.keyword(operation.getName());
         tableName.unparse(writer, leftPrec, rightPrec);
 
         if (!columnList.isEmpty() || !tableConstraints.isEmpty() || watermark 
!= null) {
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateOrAlterMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateOrAlterMaterializedTable.java
new file mode 100644
index 00000000000..f8ad28ba732
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateOrAlterMaterializedTable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** CREATE [OR ALTER] MATERIALIZED TABLE DDL sql call. */
+public class SqlCreateOrAlterMaterializedTable extends 
SqlCreateMaterializedTable {
+
+    public static final SqlSpecialOperator CREATE_OR_ALTER_OPERATOR =
+            new SqlSpecialOperator("CREATE OR ALTER MATERIALIZED TABLE", 
SqlKind.OTHER_DDL);
+
+    public SqlCreateOrAlterMaterializedTable(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlWatermark watermark,
+            @Nullable SqlCharStringLiteral comment,
+            @Nullable SqlDistribution distribution,
+            SqlNodeList partitionKeyList,
+            SqlNodeList propertyList,
+            @Nullable SqlIntervalLiteral freshness,
+            @Nullable SqlRefreshMode refreshMode,
+            SqlNode asQuery,
+            boolean isOrAlter) {
+        super(
+                isOrAlter ? CREATE_OR_ALTER_OPERATOR : CREATE_OPERATOR,
+                pos,
+                tableName,
+                columnList,
+                tableConstraints,
+                watermark,
+                comment,
+                distribution,
+                partitionKeyList,
+                propertyList,
+                freshness,
+                refreshMode,
+                asQuery);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        this.unparseMaterializedTableAs(getOperator(), writer, leftPrec, 
rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 29f5a52f049..9758aa9122c 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -37,16 +37,23 @@ import static 
org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
 @Execution(CONCURRENT)
 class MaterializedTableStatementParserTest {
 
+    private static final String CREATE_OPERATION = "CREATE ";
+    private static final String CREATE_OR_ALTER_OPERATION = "CREATE OR ALTER ";
+
     @ParameterizedTest(name = "{index}: {0}")
-    @MethodSource("inputForCreateMaterializedTable")
-    void testCreateMaterializedTable(
+    @MethodSource("inputForMaterializedTable")
+    void testMaterializedTable(
             final String testName, final Map.Entry<String, String> 
sqlToExpected) {
         final String sql = sqlToExpected.getKey();
         final String expected = sqlToExpected.getValue();
-
         sql(sql).ok(expected);
     }
 
+    private static Stream<Arguments> inputForMaterializedTable() {
+        return Stream.concat(
+                inputForCreateMaterializedTable(), 
inputForCreateOrAlterMaterializedTable());
+    }
+
     @Test
     void testCreateMaterializedTableWithWrongSchema() {
         final String sql =
@@ -325,7 +332,7 @@ class MaterializedTableStatementParserTest {
     @Test
     void testAlterMaterializedTableAsQuery() {
         final String sql = "ALTER MATERIALIZED TABLE tbl1 AS SELECT * FROM t";
-        final String expected = "ALTER MATERIALIZED TABLE `TBL1` AS SELECT 
*\nFROM `T`";
+        final String expected = "ALTER MATERIALIZED TABLE `TBL1`\nAS\nSELECT 
*\nFROM `T`";
         sql(sql).ok(expected);
 
         final String sql2 = "ALTER MATERIALIZED TABLE tbl1 AS SELECT * FROM t 
A^S^";
@@ -383,18 +390,39 @@ class MaterializedTableStatementParserTest {
 
     private static Stream<Arguments> inputForCreateMaterializedTable() {
         return Stream.of(
-                Arguments.of("Full example", fullExample()),
-                Arguments.of("With columns", withColumns()),
-                Arguments.of("With columns and watermarks", 
withColumnsAndWatermark()),
-                Arguments.of("Without table constraint", 
withoutTableConstraint()),
-                Arguments.of("With primary key", withPrimaryKey()),
-                Arguments.of("Without freshness", withoutFreshness()),
-                Arguments.of("With column identifiers only", 
withColumnsIdentifiersOnly()));
+                Arguments.of("Full example", fullExample(CREATE_OPERATION)),
+                Arguments.of("With columns", withColumns(CREATE_OPERATION)),
+                Arguments.of(
+                        "With columns and watermarks", 
withColumnsAndWatermark(CREATE_OPERATION)),
+                Arguments.of("Without table constraint", 
withoutTableConstraint(CREATE_OPERATION)),
+                Arguments.of("With primary key", 
withPrimaryKey(CREATE_OPERATION)),
+                Arguments.of("Without freshness", 
withoutFreshness(CREATE_OPERATION)),
+                Arguments.of(
+                        "With column identifiers only",
+                        withColumnsIdentifiersOnly(CREATE_OPERATION)));
+    }
+
+    private static Stream<Arguments> inputForCreateOrAlterMaterializedTable() {
+        return Stream.of(
+                Arguments.of("Full example", 
fullExample(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of("With columns", 
withColumns(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of(
+                        "With columns and watermarks",
+                        withColumnsAndWatermark(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of(
+                        "Without table constraint",
+                        withoutTableConstraint(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of("With primary key", 
withPrimaryKey(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of("Without freshness", 
withoutFreshness(CREATE_OR_ALTER_OPERATION)),
+                Arguments.of(
+                        "With column identifiers only",
+                        
withColumnsIdentifiersOnly(CREATE_OR_ALTER_OPERATION)));
     }
 
-    private static Map.Entry<String, String> fullExample() {
+    private static Map.Entry<String, String> fullExample(final String 
operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(\n"
                         + "  ts timestamp(3),\n"
                         + "  id varchar,\n"
@@ -410,7 +438,8 @@ class MaterializedTableStatementParserTest {
                         + ")\n"
                         + "FRESHNESS = INTERVAL '3' MINUTES\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  `TS` TIMESTAMP(3),\n"
                         + "  `ID` VARCHAR,\n"
                         + "  PRIMARY KEY (`ID`),\n"
@@ -429,9 +458,10 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withPrimaryKey() {
+    private static Map.Entry<String, String> withPrimaryKey(final String 
operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(\n"
                         + "   PRIMARY KEY (a, b)\n"
                         + ")\n"
@@ -439,7 +469,8 @@ class MaterializedTableStatementParserTest {
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  PRIMARY KEY (`A`, `B`)\n"
                         + ")\n"
                         + "COMMENT 'table comment'\n"
@@ -450,14 +481,16 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withoutTableConstraint() {
+    private static Map.Entry<String, String> withoutTableConstraint(final 
String operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "COMMENT 'table comment'\n"
                         + "FRESHNESS = INTERVAL '3' DAYS\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1`\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1`\n"
                         + "COMMENT 'table comment'\n"
                         + "FRESHNESS = INTERVAL '3' DAY\n"
                         + "REFRESH_MODE = FULL\n"
@@ -466,9 +499,10 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withoutFreshness() {
+    private static Map.Entry<String, String> withoutFreshness(final String 
operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(\n"
                         + "   PRIMARY KEY (a, b)\n"
                         + ")\n"
@@ -477,7 +511,8 @@ class MaterializedTableStatementParserTest {
                         + "  'kafka.topic' = 'log.test'\n"
                         + ")\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  PRIMARY KEY (`A`, `B`)\n"
                         + ")\n"
                         + "WITH (\n"
@@ -489,9 +524,10 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withColumns() {
+    private static Map.Entry<String, String> withColumns(final String 
operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(\n"
                         + "  a INT, b STRING, h INT, m INT\n"
                         + ")\n"
@@ -504,7 +540,8 @@ class MaterializedTableStatementParserTest {
                         + ")\n"
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  `A` INTEGER,\n"
                         + "  `B` STRING,\n"
                         + "  `H` INTEGER,\n"
@@ -523,9 +560,10 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withColumnsAndWatermark() {
+    private static Map.Entry<String, String> withColumnsAndWatermark(final 
String operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(\n"
                         + "  ts timestamp(3),\n"
                         + "  id varchar,\n"
@@ -540,7 +578,8 @@ class MaterializedTableStatementParserTest {
                         + ")\n"
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  `TS` TIMESTAMP(3),\n"
                         + "  `ID` VARCHAR,\n"
                         + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' 
SECOND)\n"
@@ -558,9 +597,10 @@ class MaterializedTableStatementParserTest {
                         + "FROM `SOURCE`");
     }
 
-    private static Map.Entry<String, String> withColumnsIdentifiersOnly() {
+    private static Map.Entry<String, String> withColumnsIdentifiersOnly(final 
String operation) {
         return new AbstractMap.SimpleEntry<>(
-                "CREATE MATERIALIZED TABLE tbl1\n"
+                operation
+                        + "MATERIALIZED TABLE tbl1\n"
                         + "(a, b, h, m)\n"
                         + "COMMENT 'table comment'\n"
                         + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n"
@@ -571,7 +611,8 @@ class MaterializedTableStatementParserTest {
                         + ")\n"
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                operation
+                        + "MATERIALIZED TABLE `TBL1` (\n"
                         + "  `A`,\n"
                         + "  `B`,\n"
                         + "  `H`,\n"
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
index 2500bcd7e8c..5cd5035e806 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java
@@ -142,7 +142,7 @@ public class ShowCreateUtil {
                 .append("\n")
                 .append(extractRefreshMode(table))
                 .append("\n");
-        sb.append("AS ").append(table.getDefinitionQuery()).append('\n');
+        sb.append("AS ").append(table.getExpandedQuery()).append('\n');
         return sb.toString();
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
index 6afd3b6e96a..3df90d461f3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java
@@ -33,8 +33,8 @@ public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTab
     public AlterMaterializedTableAsQueryOperation(
             ObjectIdentifier tableIdentifier,
             List<MaterializedTableChange> tableChanges,
-            CatalogMaterializedTable newMaterializedTable) {
-        super(tableIdentifier, tableChanges, newMaterializedTable);
+            CatalogMaterializedTable catalogMaterializedTable) {
+        super(tableIdentifier, tableChanges, catalogMaterializedTable);
     }
 
     @Override
@@ -48,6 +48,6 @@ public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTab
         return String.format(
                 "ALTER MATERIALIZED TABLE %s AS %s",
                 tableIdentifier.asSummaryString(),
-                getCatalogMaterializedTable().getDefinitionQuery());
+                getCatalogMaterializedTable().getExpandedQuery());
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 840eded48ad..15464afc7e5 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -286,6 +286,7 @@ class ShowCreateUtilTest {
                                 null,
                                 IntervalFreshness.ofMinute("1"),
                                 RefreshMode.CONTINUOUS,
+                                "SELECT 1",
                                 "SELECT 1"),
                         "CREATE MATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
                                 + "  `id` INT\n"
@@ -303,6 +304,7 @@ class ShowCreateUtilTest {
                                 null,
                                 IntervalFreshness.ofMinute("1"),
                                 RefreshMode.CONTINUOUS,
+                                "SELECT 1",
                                 "SELECT 1"),
                         "CREATE MATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
                                 + "  `id` INT,\n"
@@ -322,7 +324,8 @@ class ShowCreateUtilTest {
                                 
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
                                 IntervalFreshness.ofMinute("3"),
                                 RefreshMode.FULL,
-                                "SELECT id, name FROM tbl_a"),
+                                "SELECT id, name FROM tbl_a",
+                                "SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`"),
                         "CREATE MATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
                                 + "  `id` INT,\n"
                                 + "  `name` VARCHAR(2147483647)\n"
@@ -332,7 +335,29 @@ class ShowCreateUtilTest {
                                 + "PARTITIONED BY (`id`)\n"
                                 + "FRESHNESS = INTERVAL '3' MINUTE\n"
                                 + "REFRESH_MODE = FULL\n"
-                                + "AS SELECT id, name FROM tbl_a\n"));
+                                + "AS SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`\n"));
+
+        argList.add(
+                Arguments.of(
+                        createResolvedMaterialized(
+                                TWO_COLUMNS_SCHEMA,
+                                "Materialized table comment",
+                                List.of("id"),
+                                
TableDistribution.of(TableDistribution.Kind.HASH, 5, List.of("id")),
+                                IntervalFreshness.ofMinute("3"),
+                                RefreshMode.FULL,
+                                "SELECT * FROM tbl_a",
+                                "SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`"),
+                        "CREATE MATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
+                                + "  `id` INT,\n"
+                                + "  `name` VARCHAR(2147483647)\n"
+                                + ")\n"
+                                + "COMMENT 'Materialized table comment'\n"
+                                + "DISTRIBUTED BY HASH(`id`) INTO 5 BUCKETS\n"
+                                + "PARTITIONED BY (`id`)\n"
+                                + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                                + "REFRESH_MODE = FULL\n"
+                                + "AS SELECT id, name FROM 
`catalogName`.`dbName`.`tbl_a`\n"));
 
         return argList;
     }
@@ -383,7 +408,8 @@ class ShowCreateUtilTest {
             TableDistribution distribution,
             IntervalFreshness freshness,
             RefreshMode refreshMode,
-            String definitionQuery) {
+            String originalQuery,
+            String expandedQuery) {
         return new ResolvedCatalogMaterializedTable(
                 CatalogMaterializedTable.newBuilder()
                         .comment(comment)
@@ -392,7 +418,8 @@ class ShowCreateUtilTest {
                         
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build())
                         .freshness(freshness)
                         .refreshMode(refreshMode)
-                        .definitionQuery(definitionQuery)
+                        .originalQuery(originalQuery)
+                        .expandedQuery(expandedQuery)
                         .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
                         .refreshStatus(RefreshStatus.ACTIVATED)
                         .build(),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 2ef20f7f80c..8a0e34e9bc0 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -155,7 +155,8 @@ class CatalogBaseTableResolutionTest {
             new ContinuousRefreshHandler(
                     "remote", "StandaloneClusterId", 
JobID.generate().toHexString());
 
-    private static final String DEFINITION_QUERY =
+    private static final String DEFAULT_ORIGINAL_QUERY = "SELECT id, region, 
county FROM T";
+    private static final String DEFAULT_EXPANDED_QUERY =
             String.format(
                     "SELECT id, region, county FROM %s.%s.T", DEFAULT_CATALOG, 
DEFAULT_DATABASE);
 
@@ -195,8 +196,10 @@ class CatalogBaseTableResolutionTest {
         assertThat(resolvedMaterializedTable.getResolvedSchema())
                 .isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
 
-        assertThat(resolvedMaterializedTable.getDefinitionQuery())
-                .isEqualTo(materializedTable.getDefinitionQuery());
+        assertThat(resolvedMaterializedTable.getOriginalQuery())
+                .isEqualTo(materializedTable.getOriginalQuery());
+        assertThat(resolvedMaterializedTable.getExpandedQuery())
+                .isEqualTo(materializedTable.getExpandedQuery());
         assertThat(resolvedMaterializedTable.getDefinitionFreshness())
                 .isEqualTo(materializedTable.getDefinitionFreshness());
         assertThat(resolvedMaterializedTable.getLogicalRefreshMode())
@@ -245,8 +248,10 @@ class CatalogBaseTableResolutionTest {
                 .isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);
         assertThat(resolvedCatalogMaterializedTable.getDefinitionFreshness())
                 .isEqualTo(IntervalFreshness.ofSecond("30"));
-        assertThat(resolvedCatalogMaterializedTable.getDefinitionQuery())
-                .isEqualTo(DEFINITION_QUERY);
+        assertThat(resolvedCatalogMaterializedTable.getOriginalQuery())
+                .isEqualTo(DEFAULT_ORIGINAL_QUERY);
+        assertThat(resolvedCatalogMaterializedTable.getExpandedQuery())
+                .isEqualTo(DEFAULT_EXPANDED_QUERY);
         assertThat(resolvedCatalogMaterializedTable.getLogicalRefreshMode())
                 
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
         assertThat(resolvedCatalogMaterializedTable.getRefreshMode())
@@ -421,7 +426,8 @@ class CatalogBaseTableResolutionTest {
         properties.put("logical-refresh-mode", "CONTINUOUS");
         properties.put("refresh-mode", "CONTINUOUS");
         properties.put("refresh-status", "INITIALIZING");
-        properties.put("definition-query", DEFINITION_QUERY);
+        properties.put("original-query", DEFAULT_ORIGINAL_QUERY);
+        properties.put("expanded-query", DEFAULT_EXPANDED_QUERY);
 
         // put refresh handler
         properties.put(
@@ -436,17 +442,13 @@ class CatalogBaseTableResolutionTest {
         final String comment = "This is an example materialized table.";
         final List<String> partitionKeys = Arrays.asList("region", "county");
 
-        final String definitionQuery =
-                String.format(
-                        "SELECT id, region, county FROM %s.%s.T",
-                        DEFAULT_CATALOG, DEFAULT_DATABASE);
-
         CatalogMaterializedTable.Builder builder = 
CatalogMaterializedTable.newBuilder();
         return builder.schema(MATERIALIZED_TABLE_SCHEMA)
                 .comment(comment)
                 .partitionKeys(partitionKeys)
                 .options(Collections.emptyMap())
-                .definitionQuery(definitionQuery)
+                .originalQuery(DEFAULT_ORIGINAL_QUERY)
+                .expandedQuery(DEFAULT_EXPANDED_QUERY)
                 .freshness(IntervalFreshness.ofSecond("30"))
                 
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
                 .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index f0691d6252c..92546ecee3b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -106,6 +106,21 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
     /** Return the snapshot specified for the table. Return Optional.empty() 
if not specified. */
     Optional<Long> getSnapshot();
 
+    /**
+     * Original text of the materialized table definition that preserves the 
original formatting.
+     *
+     * @return the original string literal provided by the user.
+     */
+    String getOriginalQuery();
+
+    /**
+     * Expanded text of the original materialized table definition with 
resolved identifiers. This
+     * is needed because context such as current DB is lost after the session.
+     *
+     * @return the materialized table definition in expanded text.
+     */
+    String getExpandedQuery();
+
     /**
      * The definition query text of materialized table, text is expanded in 
contrast to the original
      * SQL. This is needed because the context such as current DB is lost 
after the session, in
@@ -117,8 +132,13 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
      * database "default" and has two columns ("name" and "value").
      *
      * @return the materialized table definition in expanded text.
+     * @deprecated The definition query will be removed in future versions, 
please use {@link
+     *     #getExpandedQuery()} instead.
      */
-    String getDefinitionQuery();
+    @Deprecated
+    default String getDefinitionQuery() {
+        return getExpandedQuery();
+    }
 
     /**
      * Get the definition freshness of materialized table which is used to 
determine the physical
@@ -209,7 +229,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
         private List<String> partitionKeys = Collections.emptyList();
         private Map<String, String> options = Collections.emptyMap();
         private @Nullable Long snapshot;
-        private String definitionQuery;
+        private String originalQuery;
+        private String expandedQuery;
         private @Nullable IntervalFreshness freshness;
         private LogicalRefreshMode logicalRefreshMode;
         private @Nullable RefreshMode refreshMode;
@@ -245,13 +266,26 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
             return this;
         }
 
-        public Builder definitionQuery(String definitionQuery) {
-            this.definitionQuery =
-                    Preconditions.checkNotNull(
-                            definitionQuery, "Definition query must not be 
null.");
+        public Builder originalQuery(String originalQuery) {
+            this.originalQuery =
+                    Preconditions.checkNotNull(originalQuery, "Original query 
must not be null.");
+            return this;
+        }
+
+        public Builder expandedQuery(String expandedQuery) {
+            this.expandedQuery =
+                    Preconditions.checkNotNull(expandedQuery, "Expanded query 
must not be null.");
             return this;
         }
 
+        /**
+         * @deprecated Use {@link #expandedQuery(String)} instead.
+         */
+        @Deprecated
+        public Builder definitionQuery(String definitionQuery) {
+            return expandedQuery(definitionQuery);
+        }
+
         public Builder freshness(@Nullable IntervalFreshness freshness) {
             this.freshness = freshness;
             return this;
@@ -298,7 +332,8 @@ public interface CatalogMaterializedTable extends 
CatalogBaseTable {
                     partitionKeys,
                     options,
                     snapshot,
-                    definitionQuery,
+                    originalQuery,
+                    expandedQuery,
                     freshness,
                     logicalRefreshMode,
                     refreshMode,
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index a30139445ad..8184797b916 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -139,7 +139,7 @@ public final class CatalogPropertiesUtil {
                     properties, resolvedMaterializedTable.getResolvedSchema(), 
sqlFactory);
 
             final String comment = resolvedMaterializedTable.getComment();
-            if (comment != null && comment.length() > 0) {
+            if (comment != null && !comment.isEmpty()) {
                 properties.put(COMMENT, comment);
             }
 
@@ -154,7 +154,8 @@ public final class CatalogPropertiesUtil {
 
             properties.putAll(resolvedMaterializedTable.getOptions());
 
-            properties.put(DEFINITION_QUERY, 
resolvedMaterializedTable.getDefinitionQuery());
+            properties.put(ORIGINAL_QUERY, 
resolvedMaterializedTable.getOriginalQuery());
+            properties.put(EXPANDED_QUERY, 
resolvedMaterializedTable.getExpandedQuery());
 
             IntervalFreshness intervalFreshness =
                     resolvedMaterializedTable.getDefinitionFreshness();
@@ -280,7 +281,8 @@ public final class CatalogPropertiesUtil {
 
             final Map<String, String> options = deserializeOptions(properties);
 
-            final String definitionQuery = properties.get(DEFINITION_QUERY);
+            final String originalQuery = properties.get(ORIGINAL_QUERY);
+            final String expandedQuery = properties.get(EXPANDED_QUERY);
 
             final String freshnessInterval = 
properties.get(FRESHNESS_INTERVAL);
             final IntervalFreshness.TimeUnit timeUnit =
@@ -313,7 +315,8 @@ public final class CatalogPropertiesUtil {
                     .distribution(distribution)
                     .options(options)
                     .snapshot(snapshot)
-                    .definitionQuery(definitionQuery)
+                    .originalQuery(originalQuery)
+                    .expandedQuery(expandedQuery)
                     .freshness(freshness)
                     .logicalRefreshMode(logicalRefreshMode)
                     .refreshMode(refreshMode)
@@ -405,7 +408,9 @@ public final class CatalogPropertiesUtil {
 
     private static final String SNAPSHOT = "snapshot";
 
-    private static final String DEFINITION_QUERY = "definition-query";
+    private static final String ORIGINAL_QUERY = "original-query";
+
+    private static final String EXPANDED_QUERY = "expanded-query";
 
     private static final String FRESHNESS_INTERVAL = "freshness-interval";
 
@@ -450,7 +455,8 @@ public final class CatalogPropertiesUtil {
     }
 
     private static boolean isMaterializedTableAttribute(String key) {
-        return key.equals(DEFINITION_QUERY)
+        return key.equals(ORIGINAL_QUERY)
+                || key.equals(EXPANDED_QUERY)
                 || key.equals(FRESHNESS_INTERVAL)
                 || key.equals(FRESHNESS_UNIT)
                 || key.equals(LOGICAL_REFRESH_MODE)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
index 265e16fde93..78eca0ce897 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogMaterializedTable.java
@@ -44,7 +44,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
 
     private final @Nullable Long snapshot;
 
-    private final String definitionQuery;
+    private final String originalQuery;
+    private final String expandedQuery;
     private final IntervalFreshness freshness;
     private final LogicalRefreshMode logicalRefreshMode;
     private final RefreshMode refreshMode;
@@ -59,7 +60,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
             List<String> partitionKeys,
             Map<String, String> options,
             @Nullable Long snapshot,
-            String definitionQuery,
+            String originalQuery,
+            String expandedQuery,
             @Nullable IntervalFreshness freshness,
             LogicalRefreshMode logicalRefreshMode,
             @Nullable RefreshMode refreshMode,
@@ -72,7 +74,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
         this.partitionKeys = checkNotNull(partitionKeys, "Partition keys must 
not be null.");
         this.options = checkNotNull(options, "Options must not be null.");
         this.snapshot = snapshot;
-        this.definitionQuery = checkNotNull(definitionQuery, "Definition query 
must not be null.");
+        this.originalQuery = checkNotNull(originalQuery, "Original query must 
not be null.");
+        this.expandedQuery = checkNotNull(expandedQuery, "Expanded query must 
not be null.");
         this.freshness = freshness;
         this.logicalRefreshMode =
                 checkNotNull(logicalRefreshMode, "Logical refresh mode must 
not be null.");
@@ -126,7 +129,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 partitionKeys,
                 options,
                 snapshot,
-                definitionQuery,
+                originalQuery,
+                expandedQuery,
                 freshness,
                 logicalRefreshMode,
                 refreshMode,
@@ -144,7 +148,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 partitionKeys,
                 options,
                 snapshot,
-                definitionQuery,
+                originalQuery,
+                expandedQuery,
                 freshness,
                 logicalRefreshMode,
                 refreshMode,
@@ -165,7 +170,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 partitionKeys,
                 options,
                 snapshot,
-                definitionQuery,
+                originalQuery,
+                expandedQuery,
                 freshness,
                 logicalRefreshMode,
                 refreshMode,
@@ -190,8 +196,13 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
     }
 
     @Override
-    public String getDefinitionQuery() {
-        return definitionQuery;
+    public String getOriginalQuery() {
+        return originalQuery;
+    }
+
+    @Override
+    public String getExpandedQuery() {
+        return expandedQuery;
     }
 
     @Override
@@ -239,7 +250,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 && Objects.equals(partitionKeys, that.partitionKeys)
                 && Objects.equals(options, that.options)
                 && Objects.equals(snapshot, that.snapshot)
-                && Objects.equals(definitionQuery, that.definitionQuery)
+                && Objects.equals(originalQuery, that.originalQuery)
+                && Objects.equals(expandedQuery, that.expandedQuery)
                 && Objects.equals(freshness, that.freshness)
                 && logicalRefreshMode == that.logicalRefreshMode
                 && refreshMode == that.refreshMode
@@ -257,7 +269,8 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                         partitionKeys,
                         options,
                         snapshot,
-                        definitionQuery,
+                        originalQuery,
+                        expandedQuery,
                         freshness,
                         logicalRefreshMode,
                         refreshMode,
@@ -283,8 +296,11 @@ public class DefaultCatalogMaterializedTable implements 
CatalogMaterializedTable
                 + options
                 + ", snapshot="
                 + snapshot
-                + ", definitionQuery='"
-                + definitionQuery
+                + ", originalQuery='"
+                + originalQuery
+                + '\''
+                + ", expandedQuery='"
+                + expandedQuery
                 + '\''
                 + ", freshness="
                 + freshness
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index 6f0bf960b18..b87db09f1f2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -119,6 +119,16 @@ public class ResolvedCatalogMaterializedTable
         return origin.getSnapshot();
     }
 
+    @Override
+    public String getOriginalQuery() {
+        return origin.getOriginalQuery();
+    }
+
+    @Override
+    public String getExpandedQuery() {
+        return origin.getExpandedQuery();
+    }
+
     @Override
     public CatalogMaterializedTable getOrigin() {
         return origin;
@@ -129,11 +139,6 @@ public class ResolvedCatalogMaterializedTable
         return resolvedSchema;
     }
 
-    @Override
-    public String getDefinitionQuery() {
-        return origin.getDefinitionQuery();
-    }
-
     @Override
     public @Nonnull IntervalFreshness getDefinitionFreshness() {
         return freshness;
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index adb8a845421..7951662c9fe 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -158,7 +158,8 @@ class CatalogPropertiesUtilTest {
                                 .schema(schema)
                                 .comment("some comment")
                                 .distribution(unknownDist)
-                                .definitionQuery("SELECT 1, 'two'")
+                                .originalQuery("SELECT 1, 'two'")
+                                .expandedQuery("SELECT 1, 'two'")
                                 .freshness(IntervalFreshness.ofHour("123"))
                                 .logicalRefreshMode(
                                         
CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
index 6c25a36929b..33f4471c471 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractAlterMaterializedTableConverter.java
@@ -66,7 +66,8 @@ public abstract class 
AbstractAlterMaterializedTableConverter<T extends SqlNode>
                         .comment(oldTable.getComment())
                         .partitionKeys(oldTable.getPartitionKeys())
                         .options(oldTable.getOptions())
-                        .definitionQuery(oldTable.getDefinitionQuery())
+                        .originalQuery(oldTable.getOriginalQuery())
+                        .expandedQuery(oldTable.getExpandedQuery())
                         .distribution(oldTable.getDistribution().orElse(null))
                         .freshness(oldTable.getDefinitionFreshness())
                         .logicalRefreshMode(oldTable.getLogicalRefreshMode())
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
index 36e3e150915..2bb558935fe 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
@@ -63,7 +63,9 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
 
         Optional<TableDistribution> getMergedTableDistribution();
 
-        String getMergedDefinitionQuery();
+        String getMergedOriginalQuery();
+
+        String getMergedExpandedQuery();
 
         ResolvedSchema getMergedQuerySchema();
     }
@@ -113,11 +115,17 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         return 
MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode);
     }
 
-    protected final String getDerivedDefinitionQuery(
+    protected final String getDerivedOriginalQuery(
+            T sqlCreateMaterializedTable, ConvertContext context) {
+        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
+        return context.toQuotedSqlString(selectQuery);
+    }
+
+    protected final String getDerivedExpandedQuery(
             T sqlCreateMaterializedTable, ConvertContext context) {
         SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
         SqlNode validatedQuery = 
context.getSqlValidator().validate(selectQuery);
-        return context.toQuotedSqlString(validatedQuery);
+        return 
context.expandSqlIdentifiers(context.toQuotedSqlString(validatedQuery));
     }
 
     protected final String getComment(T sqlCreateMaterializedTable) {
@@ -136,11 +144,17 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         final TableDistribution distribution =
                 mergeContext.getMergedTableDistribution().orElse(null);
         final String comment = getComment(sqlCreateMaterializedTable);
-        final String definitionQuery = mergeContext.getMergedDefinitionQuery();
+
+        final String originalQuery = mergeContext.getMergedOriginalQuery();
+        final String expandedQuery = mergeContext.getMergedExpandedQuery();
+
         final IntervalFreshness intervalFreshness = 
getDerivedFreshness(sqlCreateMaterializedTable);
+
         final LogicalRefreshMode logicalRefreshMode =
                 getDerivedLogicalRefreshMode(sqlCreateMaterializedTable);
+
         final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
+
         return context.getCatalogManager()
                 .resolveCatalogMaterializedTable(
                         CatalogMaterializedTable.newBuilder()
@@ -149,7 +163,8 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
                                 .distribution(distribution)
                                 .partitionKeys(partitionKeys)
                                 .options(tableOptions)
-                                .definitionQuery(definitionQuery)
+                                .originalQuery(originalQuery)
+                                .expandedQuery(expandedQuery)
                                 .freshness(intervalFreshness)
                                 .logicalRefreshMode(logicalRefreshMode)
                                 .refreshMode(refreshMode)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
index 7c145b5dcf2..7deda62b12e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
 import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -31,6 +30,7 @@ import 
org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
 
 import org.apache.calcite.sql.SqlNode;
 
@@ -63,13 +63,15 @@ public class SqlAlterMaterializedTableAsQueryConverter
                         identifier,
                         () -> "Only materialized tables support modifying the 
definition query.");
 
+        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
         List<Column> addedColumns =
-                validateAndExtractNewColumns(
-                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+                MaterializedTableUtils.validateAndExtractNewColumns(
+                        oldSchema, queryOperation.getResolvedSchema());
 
         // Build new materialized table and apply changes
         CatalogMaterializedTable updatedTable =
-                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+                buildUpdatedMaterializedTable(
+                        oldTable, addedColumns, originalQuery, 
definitionQuery);
         List<MaterializedTableChange> tableChanges = new ArrayList<>();
         addedColumns.forEach(column -> 
tableChanges.add(TableChange.add(column)));
         tableChanges.add(TableChange.modifyDefinitionQuery(definitionQuery));
@@ -80,7 +82,8 @@ public class SqlAlterMaterializedTableAsQueryConverter
     private CatalogMaterializedTable buildUpdatedMaterializedTable(
             ResolvedCatalogMaterializedTable oldTable,
             List<Column> addedColumns,
-            String definitionQuery) {
+            String originalQuery,
+            String expandedQuery) {
         Schema.Builder newSchemaBuilder =
                 
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
         addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
@@ -89,43 +92,7 @@ public class SqlAlterMaterializedTableAsQueryConverter
                 oldTable,
                 builder -> {
                     builder.schema(newSchemaBuilder.build());
-                    builder.definitionQuery(definitionQuery);
+                    
builder.originalQuery(originalQuery).expandedQuery(expandedQuery);
                 });
     }
-
-    private List<Column> validateAndExtractNewColumns(
-            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
-        List<Column> newAddedColumns = new ArrayList<>();
-        int originalColumnSize = oldSchema.getColumns().size();
-        int newColumnSize = newSchema.getColumns().size();
-
-        if (originalColumnSize > newColumnSize) {
-            throw new ValidationException(
-                    String.format(
-                            "Failed to modify query because drop column is 
unsupported. "
-                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
-                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
-                            originalColumnSize, newColumnSize));
-        }
-
-        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
-            Column oldColumn = oldSchema.getColumns().get(i);
-            Column newColumn = newSchema.getColumns().get(i);
-            if (!oldColumn.equals(newColumn)) {
-                throw new ValidationException(
-                        String.format(
-                                "When modifying the query of a materialized 
table, "
-                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                i, oldColumn, newColumn));
-            }
-        }
-
-        for (int i = oldSchema.getColumns().size(); i < 
newSchema.getColumns().size(); i++) {
-            Column newColumn = newSchema.getColumns().get(i);
-            
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
-        }
-
-        return newAddedColumns;
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
deleted file mode 100644
index 3beb5596a90..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations.converters;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
-import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.operations.Operation;
-import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
-import 
org.apache.flink.table.planner.operations.converters.table.MergeTableAsUtil;
-
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * A converter for {@link SqlCreateMaterializedTable} to {@link 
CreateMaterializedTableOperation}.
- */
-public class SqlCreateMaterializedTableConverter
-        extends 
AbstractCreateMaterializedTableConverter<SqlCreateMaterializedTable> {
-
-    @Override
-    public Operation convertSqlNode(
-            SqlCreateMaterializedTable sqlCreateMaterializedTable, 
ConvertContext context) {
-        return new CreateMaterializedTableOperation(
-                getIdentifier(sqlCreateMaterializedTable, context),
-                
getResolvedCatalogMaterializedTable(sqlCreateMaterializedTable, context));
-    }
-
-    @Override
-    protected MergeContext getMergeContext(
-            SqlCreateMaterializedTable sqlCreateMaterializedTable, 
ConvertContext context) {
-        return new MergeContext() {
-            private final MergeTableAsUtil mergeTableAsUtil = new 
MergeTableAsUtil(context);
-            private final String definitionQuery =
-                    
SqlCreateMaterializedTableConverter.this.getDerivedDefinitionQuery(
-                            sqlCreateMaterializedTable, context);
-            private final ResolvedSchema querySchema =
-                    
SqlCreateMaterializedTableConverter.this.getQueryResolvedSchema(
-                            sqlCreateMaterializedTable, context);
-
-            @Override
-            public Schema getMergedSchema() {
-                final Set<String> querySchemaColumnNames =
-                        new HashSet<>(querySchema.getColumnNames());
-                final SqlNodeList sqlNodeList = 
sqlCreateMaterializedTable.getColumnList();
-                for (SqlNode column : sqlNodeList) {
-                    if (!(column instanceof SqlRegularColumn)) {
-                        continue;
-                    }
-
-                    SqlRegularColumn physicalColumn = (SqlRegularColumn) 
column;
-                    if 
(!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Invalid as physical column '%s' is 
defined in the DDL, but is not used in a query column.",
-                                        physicalColumn.getName().getSimple()));
-                    }
-                }
-                if 
(sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) {
-                    // If only column identifiers are provided, then these are 
used to
-                    // order the columns in the schema.
-                    return mergeTableAsUtil.reorderSchema(sqlNodeList, 
querySchema);
-                } else {
-                    return mergeTableAsUtil.mergeSchemas(
-                            sqlNodeList,
-                            
sqlCreateMaterializedTable.getWatermark().orElse(null),
-                            sqlCreateMaterializedTable.getFullConstraints(),
-                            querySchema);
-                }
-            }
-
-            @Override
-            public Map<String, String> getMergedTableOptions() {
-                return 
SqlCreateMaterializedTableConverter.this.getDerivedTableOptions(
-                        sqlCreateMaterializedTable);
-            }
-
-            @Override
-            public List<String> getMergedPartitionKeys() {
-                return 
SqlCreateMaterializedTableConverter.this.getDerivedPartitionKeys(
-                        sqlCreateMaterializedTable);
-            }
-
-            @Override
-            public Optional<TableDistribution> getMergedTableDistribution() {
-                return 
SqlCreateMaterializedTableConverter.this.getDerivedTableDistribution(
-                        sqlCreateMaterializedTable);
-            }
-
-            @Override
-            public String getMergedDefinitionQuery() {
-                return definitionQuery;
-            }
-
-            @Override
-            public ResolvedSchema getMergedQuerySchema() {
-                return querySchema;
-            }
-        };
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateOrAlterMaterializedTableConverter.java
new file mode 100644
index 00000000000..aae6433758b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateOrAlterMaterializedTableConverter.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateOrAlterMaterializedTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.planner.operations.converters.table.MergeTableAsUtil;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
+
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** A converter for {@link SqlCreateOrAlterMaterializedTable}. */
+public class SqlCreateOrAlterMaterializedTableConverter
+        extends 
AbstractCreateMaterializedTableConverter<SqlCreateOrAlterMaterializedTable> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            ConvertContext context) {
+        final ObjectIdentifier identifier =
+                this.getIdentifier(sqlCreateOrAlterMaterializedTable, context);
+
+        if (createOrAlterOperation(sqlCreateOrAlterMaterializedTable)) {
+            return handleCreateOrAlter(sqlCreateOrAlterMaterializedTable, 
context, identifier);
+        }
+        return handleCreate(sqlCreateOrAlterMaterializedTable, context, 
identifier);
+    }
+
+    private Operation handleCreateOrAlter(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final Optional<ResolvedCatalogBaseTable<?>> resolvedBaseTable =
+                context.getCatalogManager().getCatalogBaseTable(identifier);
+        return resolvedBaseTable
+                .map(
+                        oldBaseTable -> {
+                            if (oldBaseTable.getTableKind() != 
TableKind.MATERIALIZED_TABLE) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Table %s is not a 
materialized table. Only materialized table support create or alter operation.",
+                                                identifier.asSummaryString()));
+                            }
+                            return handleAlter(
+                                    sqlCreateOrAlterMaterializedTable,
+                                    (ResolvedCatalogMaterializedTable) 
oldBaseTable,
+                                    context,
+                                    identifier);
+                        })
+                .orElseGet(
+                        () -> handleCreate(sqlCreateOrAlterMaterializedTable, 
context, identifier));
+    }
+
+    private static boolean createOrAlterOperation(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable) {
+        return sqlCreateOrAlterMaterializedTable.getOperator()
+                == SqlCreateOrAlterMaterializedTable.CREATE_OR_ALTER_OPERATOR;
+    }
+
+    private Operation handleAlter(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ResolvedCatalogMaterializedTable oldMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final CatalogMaterializedTable newTable =
+                buildNewCatalogMaterializedTableFromOldTable(
+                        oldMaterializedTable, 
sqlCreateOrAlterMaterializedTable, context);
+
+        List<MaterializedTableChange> tableChanges =
+                buildTableChanges(sqlCreateOrAlterMaterializedTable, 
oldMaterializedTable, context);
+
+        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, newTable);
+    }
+
+    private Operation handleCreate(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context,
+            final ObjectIdentifier identifier) {
+        final ResolvedCatalogMaterializedTable 
resolvedCatalogMaterializedTable =
+                this.getResolvedCatalogMaterializedTable(
+                        sqlCreateOrAlterMaterializedTable, context);
+
+        return new CreateMaterializedTableOperation(identifier, 
resolvedCatalogMaterializedTable);
+    }
+
+    private List<MaterializedTableChange> buildTableChanges(
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ResolvedCatalogMaterializedTable oldTable,
+            final ConvertContext context) {
+        List<MaterializedTableChange> changes = new ArrayList<>();
+        final MergeContext mergeContext =
+                this.getMergeContext(sqlCreateOrAlterMaterializedTable, 
context);
+
+        final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+        final List<Column> newColumns =
+                MaterializedTableUtils.validateAndExtractNewColumns(
+                        oldSchema, mergeContext.getMergedQuerySchema());
+
+        newColumns.forEach(column -> changes.add(TableChange.add(column)));
+        
changes.add(TableChange.modifyDefinitionQuery(mergeContext.getMergedExpandedQuery()));
+
+        return changes;
+    }
+
+    private CatalogMaterializedTable 
buildNewCatalogMaterializedTableFromOldTable(
+            final ResolvedCatalogMaterializedTable oldMaterializedTable,
+            final SqlCreateOrAlterMaterializedTable 
sqlCreateOrAlterMaterializedTable,
+            final ConvertContext context) {
+        final Schema.Builder schemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldMaterializedTable.getResolvedSchema());
+
+        // Add new columns if this is an alter operation
+        final ResolvedSchema oldSchema = 
oldMaterializedTable.getResolvedSchema();
+        final MergeContext mergeContext =
+                this.getMergeContext(sqlCreateOrAlterMaterializedTable, 
context);
+        final List<Column> newColumns =
+                MaterializedTableUtils.validateAndExtractNewColumns(
+                        oldSchema, mergeContext.getMergedQuerySchema());
+        newColumns.forEach(col -> schemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        final String comment = 
this.getComment(sqlCreateOrAlterMaterializedTable);
+        final IntervalFreshness freshness =
+                this.getDerivedFreshness(sqlCreateOrAlterMaterializedTable);
+        final LogicalRefreshMode logicalRefreshMode =
+                
this.getDerivedLogicalRefreshMode(sqlCreateOrAlterMaterializedTable);
+        final RefreshMode refreshMode = 
this.getDerivedRefreshMode(logicalRefreshMode);
+
+        CatalogMaterializedTable.Builder builder =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(schemaBuilder.build())
+                        .comment(comment)
+                        
.distribution(mergeContext.getMergedTableDistribution().orElse(null))
+                        .partitionKeys(mergeContext.getMergedPartitionKeys())
+                        .options(mergeContext.getMergedTableOptions())
+                        .originalQuery(mergeContext.getMergedOriginalQuery())
+                        .expandedQuery(mergeContext.getMergedExpandedQuery())
+                        .freshness(freshness)
+                        .logicalRefreshMode(logicalRefreshMode)
+                        .refreshMode(refreshMode)
+                        .refreshStatus(RefreshStatus.INITIALIZING);
+
+        // Preserve refresh handler from old materialized table
+        oldMaterializedTable
+                .getRefreshHandlerDescription()
+                .ifPresent(builder::refreshHandlerDescription);
+        
builder.serializedRefreshHandler(oldMaterializedTable.getSerializedRefreshHandler());
+
+        return builder.build();
+    }
+
+    @Override
+    protected MergeContext getMergeContext(
+            final SqlCreateOrAlterMaterializedTable sqlCreateMaterializedTable,
+            final ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableAsUtil mergeTableAsUtil = new 
MergeTableAsUtil(context);
+
+            // Cache original query. If we call getDerivedExpandedQuery() 
first, without storing the
+            // original query, it the SqlNode will be changed and the 
getAsQuery() will always
+            // return the expanded query.
+            private final String originalQuery =
+                    
SqlCreateOrAlterMaterializedTableConverter.this.getDerivedOriginalQuery(
+                            sqlCreateMaterializedTable, context);
+
+            private final ResolvedSchema querySchema =
+                    
SqlCreateOrAlterMaterializedTableConverter.this.getQueryResolvedSchema(
+                            sqlCreateMaterializedTable, context);
+
+            @Override
+            public Schema getMergedSchema() {
+                final Set<String> querySchemaColumnNames =
+                        new HashSet<>(querySchema.getColumnNames());
+                final SqlNodeList sqlNodeList = 
sqlCreateMaterializedTable.getColumnList();
+                for (SqlNode column : sqlNodeList) {
+                    if (!(column instanceof SqlRegularColumn)) {
+                        continue;
+                    }
+
+                    SqlRegularColumn physicalColumn = (SqlRegularColumn) 
column;
+                    if 
(!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Invalid as physical column '%s' is 
defined in the DDL, but is not used in a query column.",
+                                        physicalColumn.getName().getSimple()));
+                    }
+                }
+                if 
(sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) {
+                    // If only column identifiers are provided, then these are 
used to
+                    // order the columns in the schema.
+                    return mergeTableAsUtil.reorderSchema(sqlNodeList, 
querySchema);
+                } else {
+                    return mergeTableAsUtil.mergeSchemas(
+                            sqlNodeList,
+                            
sqlCreateMaterializedTable.getWatermark().orElse(null),
+                            sqlCreateMaterializedTable.getFullConstraints(),
+                            querySchema);
+                }
+            }
+
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                return 
SqlCreateOrAlterMaterializedTableConverter.this.getDerivedTableOptions(
+                        sqlCreateMaterializedTable);
+            }
+
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return 
SqlCreateOrAlterMaterializedTableConverter.this.getDerivedPartitionKeys(
+                        sqlCreateMaterializedTable);
+            }
+
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return 
SqlCreateOrAlterMaterializedTableConverter.this.getDerivedTableDistribution(
+                        sqlCreateMaterializedTable);
+            }
+
+            @Override
+            public String getMergedOriginalQuery() {
+                return this.originalQuery;
+            }
+
+            @Override
+            public String getMergedExpandedQuery() {
+                return 
SqlCreateOrAlterMaterializedTableConverter.this.getDerivedExpandedQuery(
+                        sqlCreateMaterializedTable, context);
+            }
+
+            @Override
+            public ResolvedSchema getMergedQuerySchema() {
+                return this.querySchema;
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 7d7218f5580..ca305b3dd0a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -80,7 +80,7 @@ public class SqlNodeConverters {
         register(new SqlShowCreateCatalogConverter());
         register(new SqlDescribeCatalogConverter());
         register(new SqlDescribeJobConverter());
-        register(new SqlCreateMaterializedTableConverter());
+        register(new SqlCreateOrAlterMaterializedTableConverter());
         register(new SqlCreateModelConverter());
         register(new SqlAlterMaterializedTableRefreshConverter());
         register(new SqlAlterMaterializedTableSuspendConverter());
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 2f32aac16f6..92d70036707 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -23,11 +23,16 @@ import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
 import org.apache.flink.table.api.ValidationException;
 import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ResolvedSchema;
 
 import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /** The utils for materialized table. */
 @Internal
 public class MaterializedTableUtils {
@@ -77,17 +82,6 @@ public class MaterializedTableUtils {
         }
     }
 
-    public static RefreshMode fromSqlToRefreshMode(SqlRefreshMode 
sqlRefreshMode) {
-        switch (sqlRefreshMode) {
-            case FULL:
-                return RefreshMode.FULL;
-            case CONTINUOUS:
-                return RefreshMode.CONTINUOUS;
-            default:
-                throw new IllegalArgumentException("Unknown refresh mode: " + 
sqlRefreshMode);
-        }
-    }
-
     public static RefreshMode fromLogicalRefreshModeToRefreshMode(
             LogicalRefreshMode logicalRefreshMode) {
         switch (logicalRefreshMode) {
@@ -102,4 +96,40 @@ public class MaterializedTableUtils {
                         "Unknown logical refresh mode: " + logicalRefreshMode);
         }
     }
+
+    public static List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(
+                    String.format(
+                            "Failed to modify query because drop column is 
unsupported. "
+                                    + "When modifying a query, you can only 
append new columns at the end of original schema. "
+                                    + "The original schema has %d columns, but 
the newly derived schema from the query has %d columns.",
+                            originalColumnSize, newColumnSize));
+        }
+
+        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
+            Column oldColumn = oldSchema.getColumns().get(i);
+            Column newColumn = newSchema.getColumns().get(i);
+            if (!oldColumn.equals(newColumn)) {
+                throw new ValidationException(
+                        String.format(
+                                "When modifying the query of a materialized 
table, "
+                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
+                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                                i, oldColumn, newColumn));
+            }
+        }
+
+        for (int i = oldSchema.getColumns().size(); i < 
newSchema.getColumns().size(); i++) {
+            Column newColumn = newSchema.getColumns().get(i);
+            
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
+        }
+
+        return newAddedColumns;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 1ddd3a15860..338f215e539 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1443,7 +1443,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + "), comment='null', distribution=null, 
partitionKeys=[], "
-                                + "options={format=debezium-json}, 
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
                                 + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
                                 + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
                                 + "  `shop_id` INT,\n"
@@ -1475,7 +1475,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + "), comment='null', distribution=DISTRIBUTED 
BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], "
-                                + "options={format=debezium-json}, 
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
+                                + "options={format=debezium-json}, 
snapshot=null, originalQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', 
expandedQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
                                 + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=AUTOMATIC, refreshMode=null, "
                                 + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
                                 + "  `shop_id` INT NOT NULL,\n"
@@ -2858,7 +2858,8 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                         .logicalRefreshMode(LogicalRefreshMode.CONTINUOUS)
                         .refreshMode(RefreshMode.CONTINUOUS)
                         .refreshStatus(RefreshStatus.ACTIVATED)
-                        .definitionQuery(query);
+                        .originalQuery(query)
+                        .expandedQuery(query);
 
         if (tableDistribution != null) {
             tableBuilder.distribution(tableDistribution);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 460d8a91c91..63d1d759bfd 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -72,6 +72,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class SqlMaterializedTableNodeToOperationConverterTest
         extends SqlNodeToOperationConversionTestBase {
 
+    private static final String CREATE_OPERATION = "CREATE ";
+    private static final String CREATE_OR_ALTER_OPERATION = "CREATE OR ALTER ";
+
     @BeforeEach
     void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
         super.before();
@@ -155,7 +158,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
                         .refreshMode(RefreshMode.FULL)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .definitionQuery(
+                        .originalQuery("SELECT *\nFROM `t1`")
+                        .expandedQuery(
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`")
                         .build();
@@ -208,7 +212,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         .logicalRefreshMode(LogicalRefreshMode.FULL)
                         .refreshMode(RefreshMode.FULL)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .definitionQuery(
+                        .originalQuery("SELECT *\nFROM `t1`")
+                        .expandedQuery(
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`")
                         .build();
@@ -260,7 +265,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         .partitionKeys(Arrays.asList("a", "d"))
                         .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC)
                         
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .definitionQuery(
+                        .originalQuery("SELECT *\nFROM `t1`")
+                        .expandedQuery(
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
                                         + "FROM `builtin`.`default`.`t1` AS 
`t1`")
                         .build();
@@ -298,7 +304,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
         CreateMaterializedTableOperation createOperation =
                 (CreateMaterializedTableOperation) operation;
 
-        
assertThat(createOperation.getCatalogMaterializedTable().getDefinitionQuery())
+        
assertThat(createOperation.getCatalogMaterializedTable().getExpandedQuery())
                 .isEqualTo(
                         "SELECT `t1`.`a`, `T`.`f1`, `T`.`f2`\n"
                                 + "FROM `builtin`.`default`.`t1` AS `t1`,\n"
@@ -669,7 +675,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
         assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
                 .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
-        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        
assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery());
+        
assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery());
         
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
         
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
         
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
@@ -729,6 +736,131 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         "DROP MATERIALIZED TABLE: (identifier: 
[`builtin`.`default`.`mtbl1`], IfExists: [true])");
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTable() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mtbl1 (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+
+        CreateMaterializedTableOperation op = 
(CreateMaterializedTableOperation) operation;
+        ResolvedCatalogMaterializedTable materializedTable = 
op.getCatalogMaterializedTable();
+        
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        Map<String, String> options = new HashMap<>();
+        options.put("connector", "filesystem");
+        options.put("format", "json");
+        CatalogMaterializedTable expected =
+                CatalogMaterializedTable.newBuilder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", 
DataTypes.BIGINT().notNull())
+                                        .column("b", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .column("c", DataTypes.INT())
+                                        .column("d", 
DataTypes.VARCHAR(Integer.MAX_VALUE))
+                                        .primaryKeyNamed("ct1", 
Collections.singletonList("a"))
+                                        .build())
+                        .comment("materialized table comment")
+                        .options(options)
+                        .partitionKeys(Arrays.asList("a", "d"))
+                        .freshness(IntervalFreshness.ofSecond("30"))
+                        
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.FULL)
+                        .refreshMode(CatalogMaterializedTable.RefreshMode.FULL)
+                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                        .originalQuery("SELECT *\nFROM `t1`")
+                        .expandedQuery(
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`")
+                        .build();
+
+        assertThat(materializedTable.getOrigin()).isEqualTo(expected);
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableForExistingTable() throws 
TableNotExistException {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT a, b, c, d, d as e, cast('123' as string) 
as f FROM t3";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(AlterMaterializedTableAsQueryOperation.class);
+
+        AlterMaterializedTableAsQueryOperation op =
+                (AlterMaterializedTableAsQueryOperation) operation;
+        assertThat(op.getTableChanges())
+                .isEqualTo(
+                        Arrays.asList(
+                                TableChange.add(
+                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.add(
+                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                                TableChange.modifyDefinitionQuery(
+                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
+                                + "FROM `builtin`.`default`.`t3` AS `t3`");
+
+        // new table only difference schema & definition query with old table.
+        CatalogMaterializedTable oldTable =
+                (CatalogMaterializedTable)
+                        catalog.getTable(
+                                new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
+        CatalogMaterializedTable newTable = op.getCatalogMaterializedTable();
+
+        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
+        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
+                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
+        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
+                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery());
+        
assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
+        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+
+        List<Schema.UnresolvedColumn> addedColumn =
+                newTable.getUnresolvedSchema().getColumns().stream()
+                        .filter(
+                                column ->
+                                        !oldTable.getUnresolvedSchema()
+                                                .getColumns()
+                                                .contains(column))
+                        .collect(Collectors.toList());
+        // added column should be a nullable column.
+        assertThat(addedColumn)
+                .isEqualTo(
+                        Arrays.asList(
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                                new Schema.UnresolvedPhysicalColumn(
+                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+    }
+
     private static Collection<Arguments> 
testDataForCreateMaterializedTableFailedCase() {
         final Collection<Arguments> list = new ArrayList<>();
         list.addAll(create());
@@ -823,37 +955,49 @@ class SqlMaterializedTableNodeToOperationConverterTest
     }
 
     private static Collection<Arguments> 
testDataWithDifferentSchemasSuccessCase() {
+        final Collection<Arguments> list = new ArrayList<>();
+        list.addAll(createOrAlter(CREATE_OPERATION));
+        list.addAll(createOrAlter(CREATE_OR_ALTER_OPERATION));
+        return list;
+    }
+
+    private static List<Arguments> createOrAlter(final String operation) {
         return List.of(
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops (shop_id, 
user_id)"
+                        operation
+                                + "MATERIALIZED TABLE users_shops (shop_id, 
user_id)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
                         ResolvedSchema.of(
                                 Column.physical("shop_id", 
DataTypes.INT().notNull()),
                                 Column.physical("user_id", 
DataTypes.INT().notNull()))),
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops"
+                        operation
+                                + "MATERIALIZED TABLE users_shops"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT CAST(1 AS DOUBLE) AS shop_id, 
CAST(2 AS STRING) AS user_id",
                         ResolvedSchema.of(
                                 Column.physical("shop_id", 
DataTypes.DOUBLE().notNull()),
                                 Column.physical("user_id", 
DataTypes.STRING().notNull()))),
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops (user_id, 
shop_id)"
+                        operation
+                                + "MATERIALIZED TABLE users_shops (user_id, 
shop_id)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
                         ResolvedSchema.of(
                                 Column.physical("user_id", 
DataTypes.INT().notNull()),
                                 Column.physical("shop_id", 
DataTypes.INT().notNull()))),
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops (user_id INT, 
shop_id BIGINT)"
+                        operation
+                                + "MATERIALIZED TABLE users_shops (user_id 
INT, shop_id BIGINT)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
                         ResolvedSchema.of(
                                 Column.physical("shop_id", DataTypes.BIGINT()),
                                 Column.physical("user_id", DataTypes.INT()))),
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops (user_id INT, 
shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)"
+                        operation
+                                + "MATERIALIZED TABLE users_shops (user_id 
INT, shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
                         new ResolvedSchema(
@@ -865,7 +1009,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         "PK_user_id", List.of("user_id")),
                                 List.of())),
                 Arguments.of(
-                        "CREATE MATERIALIZED TABLE users_shops (PRIMARY 
KEY(user_id) NOT ENFORCED)"
+                        operation
+                                + "MATERIALIZED TABLE users_shops (PRIMARY 
KEY(user_id) NOT ENFORCED)"
                                 + " FRESHNESS = INTERVAL '30' SECOND"
                                 + " AS SELECT 1 AS shop_id, 2 AS user_id",
                         new ResolvedSchema(
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 5a63cc2a2d8..efb0686afb2 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -94,7 +94,10 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                             .build(),
                     CREATE_RESOLVED_SCHEMA);
 
-    private static final String DEFINITION_QUERY = "SELECT id, region, county 
FROM T";
+    private static final String DEFAULT_ORIGINAL_QUERY = "SELECT id, region, 
county FROM T";
+    private static final String DEFAULT_EXPANDED_QUERY =
+            String.format(
+                    "SELECT id, region, county FROM %s.%s.T", TEST_CATALOG, 
TEST_DEFAULT_DATABASE);
     private static final IntervalFreshness FRESHNESS = 
IntervalFreshness.ofMinute("3");
     private static final ResolvedCatalogMaterializedTable 
EXPECTED_CATALOG_MATERIALIZED_TABLE =
             new ResolvedCatalogMaterializedTable(
@@ -103,7 +106,8 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                             .comment("test materialized table")
                             .partitionKeys(PARTITION_KEYS)
                             .options(EXPECTED_OPTIONS)
-                            .definitionQuery(DEFINITION_QUERY)
+                            .originalQuery(DEFAULT_ORIGINAL_QUERY)
+                            .expandedQuery(DEFAULT_EXPANDED_QUERY)
                             .freshness(FRESHNESS)
                             .logicalRefreshMode(
                                     
CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
@@ -237,8 +241,8 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
         
assertThat(actualMaterializedTable.getPartitionKeys()).isEqualTo(PARTITION_KEYS);
         // validate options
         
assertThat(actualMaterializedTable.getOptions()).isEqualTo(expectedOptions);
-        // validate definition query
-        
assertThat(actualMaterializedTable.getDefinitionQuery()).isEqualTo(DEFINITION_QUERY);
+        // validate expanded query
+        
assertThat(actualMaterializedTable.getExpandedQuery()).isEqualTo(DEFAULT_EXPANDED_QUERY);
         // validate freshness
         
assertThat(actualMaterializedTable.getDefinitionFreshness()).isEqualTo(FRESHNESS);
         // validate logical refresh mode

Reply via email to