This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8e29badfe6531154373db2104707f0296e60880
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sun Nov 9 10:38:16 2025 +0100

    [FLINK-38674][table] Support schema definition for Materialized tables 
while `CREATE` operation
    
    This closes #27222.
---
 .../dev/table/materialized-table/statements.md     |  48 +++-
 .../dev/table/materialized-table/statements.md     |  46 +++-
 .../src/main/codegen/includes/parserImpls.ftl      |  20 +-
 .../sql/parser/ddl/SqlCreateMaterializedTable.java |  69 +++--
 .../MaterializedTableStatementParserTest.java      | 143 ++++++++++-
 .../AbstractCreateMaterializedTableConverter.java  | 221 ++++++++++++++++
 .../SqlCreateMaterializedTableConverter.java       | 267 ++++++--------------
 .../table/AbstractCreateTableConverter.java        |   2 +-
 .../converters/table/MergeTableAsUtil.java         |   2 +-
 .../planner/utils/MaterializedTableUtils.java      |  28 ++-
 .../operations/SqlCTASNodeToOperationTest.java     |  26 +-
 .../operations/SqlDdlToOperationConverterTest.java |   8 +-
 .../operations/SqlDmlToOperationConverterTest.java |  50 ++--
 ...erializedTableNodeToOperationConverterTest.java | 277 +++++++++++++++++----
 .../operations/SqlModelOperationConverterTest.java |  10 +-
 .../operations/SqlNodeToCallOperationTest.java     |   4 +-
 .../SqlNodeToOperationConversionTestBase.java      |   6 +-
 .../SqlRTASNodeToOperationConverterTest.java       |  30 +--
 .../SqlShowToOperationConverterTest.java           |   6 +-
 19 files changed, 895 insertions(+), 368 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/materialized-table/statements.md 
b/docs/content.zh/docs/dev/table/materialized-table/statements.md
index a27bce3093f..d65491c9084 100644
--- a/docs/content.zh/docs/dev/table/materialized-table/statements.md
+++ b/docs/content.zh/docs/dev/table/materialized-table/statements.md
@@ -36,7 +36,11 @@ Flink SQL 目前支持以下物化表操作:
 ```
 CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
-[ ([ <table_constraint> ]) ]
+[(
+    { <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> }[ , ...n]
+    [ <watermark_definition> ]
+    [ <table_constraint> ][ , ...n]
+)]
 
 [COMMENT table_comment]
 
@@ -44,14 +48,29 @@ CREATE MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name
 
 [WITH (key1=val1, key2=val2, ...)]
 
-FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
+[FRESHNESS = INTERVAL '<num>' { SECOND[S] | MINUTE[S] | HOUR[S] | DAY[S] }]
 
 [REFRESH_MODE = { CONTINUOUS | FULL }]
 
 AS <select_statement>
 
+<physical_column_definition>:
+  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
+  
+<column_constraint>:
+  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
+
 <table_constraint>:
   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
+
+<metadata_column_definition>:
+  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
+
+<computed_column_definition>:
+  column_name AS computed_column_expression [COMMENT column_comment]
+
+<watermark_definition>:
+  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
 ```
 
 ## PRIMARY KEY
@@ -270,9 +289,32 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
     GROUP BY
         p.ds, p.product_id, p.product_name
 ```
+And same materialized table with explicitly specified columns
+
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    ds, product_id, product_name, avg_sale_price, total_quantity)
+    ...
+```
+The order of the columns doesn't need to be the same as in the query,
+Flink will do reordering if required  i.e. this will be also valid
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    product_id, product_name, ds, avg_sale_price, total_quantity)
+    ...
+```
+Another way of doing this is putting name and data type
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    ds STRING, product_id STRING, product_name STRING, avg_sale_price DOUBLE, 
total_quantity BIGINT)
+    ...
+```
+It might happen that types of columns are not the same, in that case implicit 
casts will be applied. 
+If for some of the combinations implicit cast is not supported then there will 
be validation error thrown. 
+Also, it is worth to note that reordering can also be done here. 
 
 ## 限制
-- 不支持显式指定列名
+- Does not support explicitly specifying physical columns which are not used 
in the query
 - 不支持在 select 查询语句中引用临时表、临时视图或临时函数
 
 # ALTER MATERIALIZED TABLE
diff --git a/docs/content/docs/dev/table/materialized-table/statements.md 
b/docs/content/docs/dev/table/materialized-table/statements.md
index 07784bb3ede..6e9c0b153b8 100644
--- a/docs/content/docs/dev/table/materialized-table/statements.md
+++ b/docs/content/docs/dev/table/materialized-table/statements.md
@@ -36,7 +36,11 @@ Flink SQL supports the following Materialized Table 
statements for now:
 ```
 CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
-[ ([ <table_constraint> ]) ]
+[(
+    { <physical_column_definition> | <metadata_column_definition> | 
<computed_column_definition> }[ , ...n]
+    [ <watermark_definition> ]
+    [ <table_constraint> ][ , ...n]
+)]
 
 [COMMENT table_comment]
 
@@ -50,8 +54,23 @@ CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
 
 AS <select_statement>
 
+<physical_column_definition>:
+  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
+  
+<column_constraint>:
+  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
+
 <table_constraint>:
   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
+
+<metadata_column_definition>:
+  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
+
+<computed_column_definition>:
+  column_name AS computed_column_expression [COMMENT column_comment]
+
+<watermark_definition>:
+  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
 ```
 
 ## PRIMARY KEY
@@ -268,10 +287,33 @@ CREATE MATERIALIZED TABLE my_materialized_table_full
     GROUP BY
         p.ds, p.product_id, p.product_name
 ```
+And same materialized table with explicitly specified columns 
+
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    ds, product_id, product_name, avg_sale_price, total_quantity)
+    ...
+```
+The order of the columns doesn't need to be the same as in the query,
+Flink will do reordering if required i.e. this will be also valid
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    product_id, product_name, ds, avg_sale_price, total_quantity)
+    ...
+```
+Another way of doing this is putting name and data type
+```sql
+CREATE MATERIALIZED TABLE my_materialized_table_full (
+    ds STRING, product_id STRING, product_name STRING, avg_sale_price DOUBLE, 
total_quantity BIGINT)
+    ...
+```
+It might happen that types of columns are not the same, in that case implicit 
casts will be applied.
+If for some of the combinations implicit cast is not supported then there will 
be validation error thrown.
+Also, it is worth to note that reordering can also be done here.
 
 ## Limitations
 
-- Does not support explicitly specifying columns
+- Does not support explicitly specifying physical columns which are not used 
in the query 
 - Does not support referring to temporary tables, temporary views, or 
temporary functions in the select query
 
 # ALTER MATERIALIZED TABLE
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 764d0685b97..f7818dc81d8 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1859,13 +1859,17 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     final SqlParserPos startPos = s.pos();
     SqlIdentifier tableName;
     SqlCharStringLiteral comment = null;
-    SqlTableConstraint constraint = null;
+    List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
+    SqlWatermark watermark = null;
+    SqlNodeList columnList = SqlNodeList.EMPTY;
     SqlDistribution distribution = null;
     SqlNodeList partitionColumns = SqlNodeList.EMPTY;
     SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlNode freshness = null;
     SqlLiteral refreshMode = null;
     SqlNode asQuery = null;
+    SqlParserPos pos = startPos;
+    boolean isColumnsIdentifiersOnly = false;
 }
 {
     <MATERIALIZED>
@@ -1884,8 +1888,14 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     <TABLE>
     tableName = CompoundIdentifier()
     [
-        <LPAREN>
-        constraint = TableConstraint()
+        <LPAREN> { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
+            TableColumnsOrIdentifiers(pos, ctx) {
+                pos = pos.plus(getPos());
+                isColumnsIdentifiersOnly = ctx.isColumnsIdentifiersOnly();
+                columnList = new SqlNodeList(ctx.columnList, pos);
+                constraints = ctx.constraints;
+                watermark = ctx.watermark;
+            }
         <RPAREN>
     ]
     [
@@ -1939,7 +1949,9 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
         return new SqlCreateMaterializedTable(
             startPos.plus(getPos()),
             tableName,
-            constraint,
+            columnList,
+            constraints,
+            watermark,
             comment,
             distribution,
             partitionColumns,
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 8c7fe2d5089..687226a6220 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.sql.parser.ddl;
 
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlConstraintValidator;
 import org.apache.flink.sql.parser.SqlUnparseUtils;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.error.SqlValidateException;
 
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlCreate;
@@ -37,21 +40,22 @@ import org.apache.calcite.util.ImmutableNullableList;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
 
 /** CREATE MATERIALIZED TABLE DDL sql call. */
-public class SqlCreateMaterializedTable extends SqlCreate {
+public class SqlCreateMaterializedTable extends SqlCreate implements 
ExtendedSqlNode {
 
     public static final SqlSpecialOperator OPERATOR =
             new SqlSpecialOperator("CREATE MATERIALIZED TABLE", 
SqlKind.CREATE_TABLE);
 
     private final SqlIdentifier tableName;
 
-    private final @Nullable SqlTableConstraint tableConstraint;
+    private final SqlNodeList columnList;
+
+    private final List<SqlTableConstraint> tableConstraints;
 
     private final @Nullable SqlCharStringLiteral comment;
 
@@ -59,6 +63,8 @@ public class SqlCreateMaterializedTable extends SqlCreate {
 
     private final SqlNodeList partitionKeyList;
 
+    private final SqlWatermark watermark;
+
     private final SqlNodeList propertyList;
 
     private final @Nullable SqlIntervalLiteral freshness;
@@ -70,7 +76,9 @@ public class SqlCreateMaterializedTable extends SqlCreate {
     public SqlCreateMaterializedTable(
             SqlParserPos pos,
             SqlIdentifier tableName,
-            @Nullable SqlTableConstraint tableConstraint,
+            SqlNodeList columnList,
+            List<SqlTableConstraint> tableConstraints,
+            SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
             @Nullable SqlDistribution distribution,
             SqlNodeList partitionKeyList,
@@ -80,7 +88,9 @@ public class SqlCreateMaterializedTable extends SqlCreate {
             SqlNode asQuery) {
         super(OPERATOR, pos, false, false);
         this.tableName = requireNonNull(tableName, "tableName should not be 
null");
-        this.tableConstraint = tableConstraint;
+        this.columnList = columnList;
+        this.tableConstraints = tableConstraints;
+        this.watermark = watermark;
         this.comment = comment;
         this.distribution = distribution;
         this.partitionKeyList =
@@ -100,8 +110,10 @@ public class SqlCreateMaterializedTable extends SqlCreate {
     public List<SqlNode> getOperandList() {
         return ImmutableNullableList.of(
                 tableName,
+                columnList,
+                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
+                watermark,
                 comment,
-                tableConstraint,
                 partitionKeyList,
                 propertyList,
                 freshness,
@@ -116,12 +128,20 @@ public class SqlCreateMaterializedTable extends SqlCreate 
{
         return tableName.names.toArray(new String[0]);
     }
 
-    public Optional<SqlCharStringLiteral> getComment() {
-        return Optional.ofNullable(comment);
+    public SqlNodeList getColumnList() {
+        return columnList;
+    }
+
+    public List<SqlTableConstraint> getTableConstraints() {
+        return tableConstraints;
+    }
+
+    public Optional<SqlWatermark> getWatermark() {
+        return Optional.ofNullable(watermark);
     }
 
-    public Optional<SqlTableConstraint> getTableConstraint() {
-        return Optional.ofNullable(tableConstraint);
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
     }
 
     public @Nullable SqlDistribution getDistribution() {
@@ -150,20 +170,33 @@ public class SqlCreateMaterializedTable extends SqlCreate 
{
         return asQuery;
     }
 
+    /** Returns the column constraints plus the table constraints. */
+    public List<SqlTableConstraint> getFullConstraints() {
+        return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
+    }
+
+    @Override
+    public void validate() throws SqlValidateException {
+        if (!isSchemaWithColumnsIdentifiersOnly()) {
+            
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+        }
+    }
+
+    public boolean isSchemaWithColumnsIdentifiersOnly() {
+        // CREATE MATERIALIZED TABLE supports passing only column identifiers 
in the column list. If
+        // the first column in the list is an identifier, then we assume the 
rest of the
+        // columns are identifiers as well.
+        return !getColumnList().isEmpty() && getColumnList().get(0) instanceof 
SqlIdentifier;
+    }
+
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         writer.keyword("CREATE MATERIALIZED TABLE");
         tableName.unparse(writer, leftPrec, rightPrec);
 
-        if (tableConstraint != null) {
-            writer.newlineAndIndent();
+        if (!columnList.isEmpty() || !tableConstraints.isEmpty() || watermark 
!= null) {
             SqlUnparseUtils.unparseTableSchema(
-                    writer,
-                    leftPrec,
-                    rightPrec,
-                    SqlNodeList.EMPTY,
-                    Collections.singletonList(tableConstraint),
-                    null);
+                    writer, leftPrec, rightPrec, columnList, tableConstraints, 
watermark);
         }
 
         if (comment != null) {
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index d089ecc8079..29f5a52f049 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -39,13 +39,23 @@ class MaterializedTableStatementParserTest {
 
     @ParameterizedTest(name = "{index}: {0}")
     @MethodSource("inputForCreateMaterializedTable")
-    void testCreateMaterializedTable(Map.Entry<String, String> sqlToExpected) {
+    void testCreateMaterializedTable(
+            final String testName, final Map.Entry<String, String> 
sqlToExpected) {
         final String sql = sqlToExpected.getKey();
         final String expected = sqlToExpected.getValue();
 
         sql(sql).ok(expected);
     }
 
+    @Test
+    void testCreateMaterializedTableWithWrongSchema() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(a, b ^STRING^)\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        sql(sql).fails("(?s).*Encountered \"STRING\" at line 2, column 7.*");
+    }
+
     @Test
     void testCreateMaterializedTableWithUnsupportedFreshnessInterval() {
         final String sql =
@@ -373,17 +383,23 @@ class MaterializedTableStatementParserTest {
 
     private static Stream<Arguments> inputForCreateMaterializedTable() {
         return Stream.of(
-                Arguments.of(fullExample()),
-                Arguments.of(withoutTableConstraint()),
-                Arguments.of(withPrimaryKey()),
-                Arguments.of(withoutFreshness()));
+                Arguments.of("Full example", fullExample()),
+                Arguments.of("With columns", withColumns()),
+                Arguments.of("With columns and watermarks", 
withColumnsAndWatermark()),
+                Arguments.of("Without table constraint", 
withoutTableConstraint()),
+                Arguments.of("With primary key", withPrimaryKey()),
+                Arguments.of("Without freshness", withoutFreshness()),
+                Arguments.of("With column identifiers only", 
withColumnsIdentifiersOnly()));
     }
 
     private static Map.Entry<String, String> fullExample() {
         return new AbstractMap.SimpleEntry<>(
                 "CREATE MATERIALIZED TABLE tbl1\n"
                         + "(\n"
-                        + "   PRIMARY KEY (a, b)\n"
+                        + "  ts timestamp(3),\n"
+                        + "  id varchar,\n"
+                        + "  watermark FOR ts AS ts - interval '3' second,\n"
+                        + "  PRIMARY KEY (id)\n"
                         + ")\n"
                         + "COMMENT 'table comment'\n"
                         + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n"
@@ -394,9 +410,11 @@ class MaterializedTableStatementParserTest {
                         + ")\n"
                         + "FRESHNESS = INTERVAL '3' MINUTES\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1`\n"
-                        + "(\n"
-                        + "  PRIMARY KEY (`A`, `B`)\n"
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                        + "  `TS` TIMESTAMP(3),\n"
+                        + "  `ID` VARCHAR,\n"
+                        + "  PRIMARY KEY (`ID`),\n"
+                        + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' 
SECOND)\n"
                         + ")\n"
                         + "COMMENT 'table comment'\n"
                         + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n"
@@ -421,8 +439,7 @@ class MaterializedTableStatementParserTest {
                         + "FRESHNESS = INTERVAL '3' MINUTE\n"
                         + "REFRESH_MODE = FULL\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1`\n"
-                        + "(\n"
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
                         + "  PRIMARY KEY (`A`, `B`)\n"
                         + ")\n"
                         + "COMMENT 'table comment'\n"
@@ -460,8 +477,7 @@ class MaterializedTableStatementParserTest {
                         + "  'kafka.topic' = 'log.test'\n"
                         + ")\n"
                         + "AS SELECT a, b, h, t m FROM source",
-                "CREATE MATERIALIZED TABLE `TBL1`\n"
-                        + "(\n"
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
                         + "  PRIMARY KEY (`A`, `B`)\n"
                         + ")\n"
                         + "WITH (\n"
@@ -472,4 +488,105 @@ class MaterializedTableStatementParserTest {
                         + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
                         + "FROM `SOURCE`");
     }
+
+    private static Map.Entry<String, String> withColumns() {
+        return new AbstractMap.SimpleEntry<>(
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "  a INT, b STRING, h INT, m INT\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source",
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                        + "  `A` INTEGER,\n"
+                        + "  `B` STRING,\n"
+                        + "  `H` INTEGER,\n"
+                        + "  `M` INTEGER\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (`A`, `H`)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest',\n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`");
+    }
+
+    private static Map.Entry<String, String> withColumnsAndWatermark() {
+        return new AbstractMap.SimpleEntry<>(
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "  ts timestamp(3),\n"
+                        + "  id varchar,\n"
+                        + "  watermark FOR ts AS ts - interval '3' second\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source",
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                        + "  `TS` TIMESTAMP(3),\n"
+                        + "  `ID` VARCHAR,\n"
+                        + "  WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' 
SECOND)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (`A`, `H`)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest',\n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`");
+    }
+
+    private static Map.Entry<String, String> withColumnsIdentifiersOnly() {
+        return new AbstractMap.SimpleEntry<>(
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(a, b, h, m)\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH (a) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source",
+                "CREATE MATERIALIZED TABLE `TBL1` (\n"
+                        + "  `A`,\n"
+                        + "  `B`,\n"
+                        + "  `H`,\n"
+                        + "  `M`\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "DISTRIBUTED BY HASH(`A`) INTO 4 BUCKETS\n"
+                        + "PARTITIONED BY (`A`, `H`)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest',\n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
new file mode 100644
index 00000000000..fb61e48b7ef
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateMaterializedTableConverter.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
+import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
+import org.apache.flink.table.catalog.IntervalFreshness;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.utils.MaterializedTableUtils;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
+import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
+
+/**
+ * Abstract class for converting {@link SqlCreateMaterializedTable} and it's 
children to create
+ * materialized table operations.
+ */
+public abstract class AbstractCreateMaterializedTableConverter<T extends 
SqlCreateMaterializedTable>
+        implements SqlNodeConverter<T> {
+    /** Context of create table converters while merging source and derived 
items. */
+    protected interface MergeContext {
+        Schema getMergedSchema();
+
+        Map<String, String> getMergedTableOptions();
+
+        List<String> getMergedPartitionKeys();
+
+        Optional<TableDistribution> getMergedTableDistribution();
+
+        String getMergedDefinitionQuery();
+
+        ResolvedSchema getMergedQuerySchema();
+    }
+
+    protected abstract MergeContext getMergeContext(
+            T sqlCreateMaterializedTable, ConvertContext context);
+
+    protected final Optional<TableDistribution> getDerivedTableDistribution(
+            T sqlCreateMaterializedTable) {
+        return 
Optional.ofNullable(sqlCreateMaterializedTable.getDistribution())
+                
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+    }
+
+    protected final List<String> getDerivedPartitionKeys(T 
sqlCreateMaterializedTable) {
+        return OperationConverterUtils.getColumnNames(
+                sqlCreateMaterializedTable.getPartitionKeyList());
+    }
+
+    protected final Map<String, String> getDerivedTableOptions(T 
sqlCreateMaterializedTable) {
+        return 
OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList());
+    }
+
+    protected final IntervalFreshness getDerivedFreshness(T 
sqlCreateMaterializedTable) {
+        return Optional.ofNullable(sqlCreateMaterializedTable.getFreshness())
+                .map(MaterializedTableUtils::getMaterializedTableFreshness)
+                .orElse(null);
+    }
+
+    protected final ResolvedSchema getQueryResolvedSchema(
+            T sqlCreateMaterializedTable, ConvertContext context) {
+        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
+        SqlNode validateQuery = 
context.getSqlValidator().validate(selectQuery);
+
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validateQuery).project(),
+                        () -> context.toQuotedSqlString(validateQuery));
+        return queryOperation.getResolvedSchema();
+    }
+
+    protected final LogicalRefreshMode getDerivedLogicalRefreshMode(T 
sqlCreateMaterializedTable) {
+        SqlRefreshMode sqlRefreshMode =
+                
Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode())
+                        .map(mode -> mode.getValueAs(SqlRefreshMode.class))
+                        .orElse(null);
+
+        return MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
+    }
+
+    protected final RefreshMode getDerivedRefreshMode(LogicalRefreshMode 
logicalRefreshMode) {
+        return 
MaterializedTableUtils.fromLogicalRefreshModeToRefreshMode(logicalRefreshMode);
+    }
+
+    protected final String getDerivedDefinitionQuery(
+            T sqlCreateMaterializedTable, ConvertContext context) {
+        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
+        SqlNode validatedQuery = 
context.getSqlValidator().validate(selectQuery);
+        return context.toQuotedSqlString(validatedQuery);
+    }
+
+    protected final String getComment(T sqlCreateMaterializedTable) {
+        return 
OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment());
+    }
+
+    protected final ResolvedCatalogMaterializedTable 
getResolvedCatalogMaterializedTable(
+            T sqlCreateMaterializedTable, ConvertContext context) {
+        final MergeContext mergeContext = 
getMergeContext(sqlCreateMaterializedTable, context);
+        final List<String> partitionKeys = 
mergeContext.getMergedPartitionKeys();
+        final Schema schema = mergeContext.getMergedSchema();
+        final ResolvedSchema querySchema = mergeContext.getMergedQuerySchema();
+        final Map<String, String> tableOptions = 
mergeContext.getMergedTableOptions();
+        verifyPartitioningColumnsExist(querySchema, partitionKeys, 
tableOptions);
+
+        final TableDistribution distribution =
+                mergeContext.getMergedTableDistribution().orElse(null);
+        final String comment = getComment(sqlCreateMaterializedTable);
+        final String definitionQuery = mergeContext.getMergedDefinitionQuery();
+        final IntervalFreshness intervalFreshness = 
getDerivedFreshness(sqlCreateMaterializedTable);
+        final LogicalRefreshMode logicalRefreshMode =
+                getDerivedLogicalRefreshMode(sqlCreateMaterializedTable);
+        final RefreshMode refreshMode = 
getDerivedRefreshMode(logicalRefreshMode);
+        return context.getCatalogManager()
+                .resolveCatalogMaterializedTable(
+                        CatalogMaterializedTable.newBuilder()
+                                .schema(schema)
+                                .comment(comment)
+                                .distribution(distribution)
+                                .partitionKeys(partitionKeys)
+                                .options(tableOptions)
+                                .definitionQuery(definitionQuery)
+                                .freshness(intervalFreshness)
+                                .logicalRefreshMode(logicalRefreshMode)
+                                .refreshMode(refreshMode)
+                                
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                                .build());
+    }
+
+    protected final ObjectIdentifier getIdentifier(
+            SqlCreateMaterializedTable node, ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(node.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private void verifyPartitioningColumnsExist(
+            ResolvedSchema schema, List<String> partitionKeys, Map<String, 
String> tableOptions) {
+        final Set<String> partitionFieldOptions =
+                tableOptions.keySet().stream()
+                        .filter(k -> k.startsWith(PARTITION_FIELDS))
+                        .collect(Collectors.toSet());
+
+        for (String partitionKey : partitionKeys) {
+            if (schema.getColumn(partitionKey).isEmpty()) {
+                throw new ValidationException(
+                        String.format(
+                                "Partition column '%s' not defined in the 
query schema. Available columns: [%s].",
+                                partitionKey,
+                                schema.getColumnNames().stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+        }
+
+        // verify partition key used by materialized table partition option
+        // partition.fields.#.date-formatter whether exist
+        for (String partitionOption : partitionFieldOptions) {
+            String partitionKey =
+                    partitionOption.substring(
+                            PARTITION_FIELDS.length() + 1,
+                            partitionOption.length() - 
(DATE_FORMATTER.length() + 1));
+            // partition key used in option partition.fields.#.date-formatter 
must be existed
+            if (!partitionKeys.contains(partitionKey)) {
+                throw new ValidationException(
+                        String.format(
+                                "Column '%s' referenced by materialized table 
option '%s' isn't a partition column. Available partition columns: [%s].",
+                                partitionKey,
+                                partitionOption,
+                                partitionKeys.stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+
+            // partition key used in option partition.fields.#.date-formatter 
must be string type
+            LogicalType partitionKeyType =
+                    
schema.getColumn(partitionKey).get().getDataType().getLogicalType();
+            if (!partitionKeyType
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                throw new ValidationException(
+                        String.format(
+                                "Materialized table option '%s' only supports 
referring to char, varchar and string type partition column. Column %s type is 
%s.",
+                                partitionOption, partitionKey, 
partitionKeyType.asSummaryString()));
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index 2107b209b27..3beb5596a90 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -18,234 +18,109 @@
 
 package org.apache.flink.table.planner.operations.converters;
 
-import org.apache.flink.sql.parser.SqlConstraintValidator;
 import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
-import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
-import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
-import org.apache.flink.sql.parser.error.SqlValidateException;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlRegularColumn;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
-import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
-import org.apache.flink.table.catalog.Column;
-import org.apache.flink.table.catalog.IntervalFreshness;
-import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
-import org.apache.flink.table.planner.operations.PlannerQueryOperation;
-import org.apache.flink.table.planner.utils.MaterializedTableUtils;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import 
org.apache.flink.table.planner.operations.converters.table.MergeTableAsUtil;
 
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 
-import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
-import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
-import static 
org.apache.flink.table.catalog.IntervalFreshness.validateFreshnessForCron;
-
-/** A converter for {@link SqlCreateMaterializedTable}. */
+/**
+ * A converter for {@link SqlCreateMaterializedTable} to {@link 
CreateMaterializedTableOperation}.
+ */
 public class SqlCreateMaterializedTableConverter
-        implements SqlNodeConverter<SqlCreateMaterializedTable> {
+        extends 
AbstractCreateMaterializedTableConverter<SqlCreateMaterializedTable> {
 
     @Override
     public Operation convertSqlNode(
             SqlCreateMaterializedTable sqlCreateMaterializedTable, 
ConvertContext context) {
-        UnresolvedIdentifier unresolvedIdentifier =
-                
UnresolvedIdentifier.of(sqlCreateMaterializedTable.fullTableName());
-        ObjectIdentifier identifier =
-                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
-
-        // get comment
-        String tableComment =
-                
OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment());
-
-        // get options
-        final Map<String, String> tableOptions =
-                
OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList());
-
-        // get freshness
-        IntervalFreshness intervalFreshness =
-                Optional.ofNullable(sqlCreateMaterializedTable.getFreshness())
-                        
.map(MaterializedTableUtils::getMaterializedTableFreshness)
-                        .orElse(null);
-
-        // Get the logical refresh mode from SQL
-        SqlRefreshMode sqlRefreshMode =
-                
Optional.ofNullable(sqlCreateMaterializedTable.getRefreshMode())
-                        .map(mode -> mode.getValueAs(SqlRefreshMode.class))
-                        .orElse(null);
-
-        final LogicalRefreshMode logicalRefreshMode =
-                
MaterializedTableUtils.deriveLogicalRefreshMode(sqlRefreshMode);
-
-        // get the physical refresh mode from SQL
-        final RefreshMode refreshMode =
-                sqlRefreshMode == null
-                        ? null
-                        : 
MaterializedTableUtils.fromSqltoRefreshMode(sqlRefreshMode);
-
-        if (CatalogMaterializedTable.RefreshMode.FULL == refreshMode && 
intervalFreshness != null) {
-            validateFreshnessForCron(intervalFreshness);
-        }
-
-        // get query schema and definition query
-        SqlNode selectQuery = sqlCreateMaterializedTable.getAsQuery();
-        SqlNode validatedQuery = 
context.getSqlValidator().validate(selectQuery);
-
-        String definitionQuery = context.toQuotedSqlString(validatedQuery);
-
-        PlannerQueryOperation queryOperation =
-                new PlannerQueryOperation(
-                        context.toRelRoot(validatedQuery).project(), () -> 
definitionQuery);
-
-        // get schema
-        ResolvedSchema resolvedSchema = queryOperation.getResolvedSchema();
-        Schema.Builder builder = 
Schema.newBuilder().fromResolvedSchema(resolvedSchema);
-
-        // get and verify partition key
-        List<String> partitionKeys =
-                OperationConverterUtils.getColumnNames(
-                        sqlCreateMaterializedTable.getPartitionKeyList());
-        verifyPartitioningColumnsExist(
-                resolvedSchema,
-                partitionKeys,
-                tableOptions.keySet().stream()
-                        .filter(k -> k.startsWith(PARTITION_FIELDS))
-                        .collect(Collectors.toSet()));
-
-        // verify and build primary key
-        sqlCreateMaterializedTable
-                .getTableConstraint()
-                .ifPresent(
-                        sqlTableConstraint ->
-                                verifyAndBuildPrimaryKey(
-                                        builder, resolvedSchema, 
sqlTableConstraint));
-
-        Optional<TableDistribution> tableDistribution =
-                
Optional.ofNullable(sqlCreateMaterializedTable.getDistribution())
-                        
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-
-        CatalogMaterializedTable materializedTable =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(builder.build())
-                        .comment(tableComment)
-                        .distribution(tableDistribution.orElse(null))
-                        .partitionKeys(partitionKeys)
-                        .options(tableOptions)
-                        .definitionQuery(definitionQuery)
-                        .freshness(intervalFreshness)
-                        .logicalRefreshMode(logicalRefreshMode)
-                        .refreshMode(refreshMode)
-                        
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
-                        .build();
-
         return new CreateMaterializedTableOperation(
-                identifier,
-                
context.getCatalogManager().resolveCatalogMaterializedTable(materializedTable));
+                getIdentifier(sqlCreateMaterializedTable, context),
+                
getResolvedCatalogMaterializedTable(sqlCreateMaterializedTable, context));
     }
 
-    private static void verifyPartitioningColumnsExist(
-            ResolvedSchema resolvedSchema,
-            List<String> partitionKeys,
-            Set<String> partitionFieldOptions) {
-        // verify partition key whether exists
-        for (String partitionKey : partitionKeys) {
-            if (!resolvedSchema.getColumn(partitionKey).isPresent()) {
-                throw new ValidationException(
-                        String.format(
-                                "Partition column '%s' not defined in the 
query schema. Available columns: [%s].",
-                                partitionKey,
-                                resolvedSchema.getColumnNames().stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+    @Override
+    protected MergeContext getMergeContext(
+            SqlCreateMaterializedTable sqlCreateMaterializedTable, 
ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableAsUtil mergeTableAsUtil = new 
MergeTableAsUtil(context);
+            private final String definitionQuery =
+                    
SqlCreateMaterializedTableConverter.this.getDerivedDefinitionQuery(
+                            sqlCreateMaterializedTable, context);
+            private final ResolvedSchema querySchema =
+                    
SqlCreateMaterializedTableConverter.this.getQueryResolvedSchema(
+                            sqlCreateMaterializedTable, context);
+
+            @Override
+            public Schema getMergedSchema() {
+                final Set<String> querySchemaColumnNames =
+                        new HashSet<>(querySchema.getColumnNames());
+                final SqlNodeList sqlNodeList = 
sqlCreateMaterializedTable.getColumnList();
+                for (SqlNode column : sqlNodeList) {
+                    if (!(column instanceof SqlRegularColumn)) {
+                        continue;
+                    }
+
+                    SqlRegularColumn physicalColumn = (SqlRegularColumn) 
column;
+                    if 
(!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Invalid as physical column '%s' is 
defined in the DDL, but is not used in a query column.",
+                                        physicalColumn.getName().getSimple()));
+                    }
+                }
+                if 
(sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) {
+                    // If only column identifiers are provided, then these are 
used to
+                    // order the columns in the schema.
+                    return mergeTableAsUtil.reorderSchema(sqlNodeList, 
querySchema);
+                } else {
+                    return mergeTableAsUtil.mergeSchemas(
+                            sqlNodeList,
+                            
sqlCreateMaterializedTable.getWatermark().orElse(null),
+                            sqlCreateMaterializedTable.getFullConstraints(),
+                            querySchema);
+                }
             }
-        }
 
-        // verify partition key used by materialized table partition option
-        // partition.fields.#.date-formatter whether exist
-        for (String partitionOption : partitionFieldOptions) {
-            String partitionKey =
-                    partitionOption.substring(
-                            PARTITION_FIELDS.length() + 1,
-                            partitionOption.length() - 
(DATE_FORMATTER.length() + 1));
-            // partition key used in option partition.fields.#.date-formatter 
must be existed
-            if (!partitionKeys.contains(partitionKey)) {
-                throw new ValidationException(
-                        String.format(
-                                "Column '%s' referenced by materialized table 
option '%s' isn't a partition column. Available partition columns: [%s].",
-                                partitionKey,
-                                partitionOption,
-                                partitionKeys.stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                return 
SqlCreateMaterializedTableConverter.this.getDerivedTableOptions(
+                        sqlCreateMaterializedTable);
             }
 
-            // partition key used in option partition.fields.#.date-formatter 
must be string type
-            LogicalType partitionKeyType =
-                    
resolvedSchema.getColumn(partitionKey).get().getDataType().getLogicalType();
-            if (!partitionKeyType
-                    .getTypeRoot()
-                    .getFamilies()
-                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
-                throw new ValidationException(
-                        String.format(
-                                "Materialized table option '%s' only supports 
referring to char, varchar and string type partition column. Column %s type is 
%s.",
-                                partitionOption, partitionKey, 
partitionKeyType.asSummaryString()));
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return 
SqlCreateMaterializedTableConverter.this.getDerivedPartitionKeys(
+                        sqlCreateMaterializedTable);
             }
-        }
-    }
 
-    private static void verifyAndBuildPrimaryKey(
-            Schema.Builder schemaBuilder,
-            ResolvedSchema resolvedSchema,
-            SqlTableConstraint sqlTableConstraint) {
-        // check constraint type
-        try {
-            SqlConstraintValidator.validate(sqlTableConstraint);
-        } catch (SqlValidateException e) {
-            throw new ValidationException(
-                    String.format("Primary key validation failed: %s.", 
e.getMessage()), e);
-        }
-
-        List<String> primaryKeyColumns = 
Arrays.asList(sqlTableConstraint.getColumnNames());
-        for (String columnName : primaryKeyColumns) {
-            Optional<Column> columnOptional = 
resolvedSchema.getColumn(columnName);
-            if (!columnOptional.isPresent()) {
-                throw new ValidationException(
-                        String.format(
-                                "Primary key column '%s' not defined in the 
query schema. Available columns: [%s].",
-                                columnName,
-                                resolvedSchema.getColumnNames().stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return 
SqlCreateMaterializedTableConverter.this.getDerivedTableDistribution(
+                        sqlCreateMaterializedTable);
             }
 
-            if 
(columnOptional.get().getDataType().getLogicalType().isNullable()) {
-                throw new ValidationException(
-                        String.format(
-                                "Could not create a PRIMARY KEY with nullable 
column '%s'.\n"
-                                        + "A PRIMARY KEY column must be 
declared on non-nullable physical columns.",
-                                columnName));
+            @Override
+            public String getMergedDefinitionQuery() {
+                return definitionQuery;
             }
-        }
 
-        // build primary key
-        String constraintName =
-                sqlTableConstraint
-                        .getConstraintName()
-                        .orElseGet(
-                                () ->
-                                        primaryKeyColumns.stream()
-                                                
.collect(Collectors.joining("_", "PK_", "")));
-        schemaBuilder.primaryKeyNamed(constraintName, primaryKeyColumns);
+            @Override
+            public ResolvedSchema getMergedQuerySchema() {
+                return querySchema;
+            }
+        };
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java
index 3a3de75c9f2..c22c52b9457 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/AbstractCreateTableConverter.java
@@ -45,7 +45,7 @@ import java.util.stream.Collectors;
 public abstract class AbstractCreateTableConverter<T extends SqlCreateTable>
         implements SqlNodeConverter<T> {
 
-    /** Context of create table converters while mering source and derived 
items. */
+    /** Context of create table converters while merging source and derived 
items. */
     protected interface MergeContext {
         Schema getMergedSchema(ResolvedSchema schemaToMerge);
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java
index a96c607693c..1af8044cc25 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/table/MergeTableAsUtil.java
@@ -351,7 +351,7 @@ public class MergeTableAsUtil {
                 String name = ((SqlIdentifier) identifier).getSimple();
                 if (!sourceSchemaCols.containsKey(name)) {
                     throw new ValidationException(
-                            String.format("Column '%s' not found in the source 
schema. ", name));
+                            String.format("Column '%s' not found in the source 
schema.", name));
                 }
 
                 sinkSchemaCols.put(name, sourceSchemaCols.get(name));
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index ba765e40a69..2f32aac16f6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import 
org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.IntervalFreshness;
 
@@ -61,24 +61,23 @@ public class MaterializedTableUtils {
         }
     }
 
-    public static CatalogMaterializedTable.LogicalRefreshMode 
deriveLogicalRefreshMode(
-            SqlRefreshMode sqlRefreshMode) {
+    public static LogicalRefreshMode deriveLogicalRefreshMode(SqlRefreshMode 
sqlRefreshMode) {
         if (sqlRefreshMode == null) {
-            return CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC;
+            return LogicalRefreshMode.AUTOMATIC;
         }
 
         switch (sqlRefreshMode) {
             case FULL:
-                return CatalogMaterializedTable.LogicalRefreshMode.FULL;
+                return LogicalRefreshMode.FULL;
             case CONTINUOUS:
-                return CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS;
+                return LogicalRefreshMode.CONTINUOUS;
             default:
                 throw new ValidationException(
                         String.format("Unsupported logical refresh mode: %s.", 
sqlRefreshMode));
         }
     }
 
-    public static RefreshMode fromSqltoRefreshMode(SqlRefreshMode 
sqlRefreshMode) {
+    public static RefreshMode fromSqlToRefreshMode(SqlRefreshMode 
sqlRefreshMode) {
         switch (sqlRefreshMode) {
             case FULL:
                 return RefreshMode.FULL;
@@ -88,4 +87,19 @@ public class MaterializedTableUtils {
                 throw new IllegalArgumentException("Unknown refresh mode: " + 
sqlRefreshMode);
         }
     }
+
+    public static RefreshMode fromLogicalRefreshModeToRefreshMode(
+            LogicalRefreshMode logicalRefreshMode) {
+        switch (logicalRefreshMode) {
+            case AUTOMATIC:
+                return null;
+            case FULL:
+                return RefreshMode.FULL;
+            case CONTINUOUS:
+                return RefreshMode.CONTINUOUS;
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown logical refresh mode: " + logicalRefreshMode);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
index b4aa3ee7837..3add4e37f46 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlCTASNodeToOperationTest.java
@@ -48,13 +48,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test base for testing convert CREATE TABLE AS statement to operation. */
-public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlCTASNodeToOperationTest extends SqlNodeToOperationConversionTestBase {
 
     private static final Map<String, String> TABLE_OPTIONS =
             Map.of("connector", 
TestSimpleDynamicTableSourceFactory.IDENTIFIER());
 
     @Test
-    public void testCreateTableAsWithNotFoundColumnIdentifiers() {
+    void testCreateTableAsWithNotFoundColumnIdentifiers() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -76,7 +76,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithMismatchIdentifiersLength() {
+    void testCreateTableAsWithMismatchIdentifiersLength() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -100,7 +100,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithColumns() {
+    void testCreateTableAsWithColumns() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -142,7 +142,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithColumnsOverridden() {
+    void testCreateTableAsWithColumnsOverridden() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -181,7 +181,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void 
testCreateTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() {
+    void testCreateTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -207,7 +207,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithOverriddenComputedColumnsNotAllowed() {
+    void testCreateTableAsWithOverriddenComputedColumnsNotAllowed() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -231,7 +231,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithPrimaryAndPartitionKey() {
+    void testCreateTableAsWithPrimaryAndPartitionKey() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -266,7 +266,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithWatermark() {
+    void testCreateTableAsWithWatermark() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -301,7 +301,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithNotNullColumnsAreNotAllowed() {
+    void testCreateTableAsWithNotNullColumnsAreNotAllowed() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -323,7 +323,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testCreateTableAsWithIncompatibleImplicitCastTypes() {
+    void testCreateTableAsWithIncompatibleImplicitCastTypes() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -348,7 +348,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testMergingCreateTableAsWithDistribution() {
+    void testMergingCreateTableAsWithDistribution() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
@@ -388,7 +388,7 @@ public class SqlCTASNodeToOperationTest extends 
SqlNodeToOperationConversionTest
     }
 
     @Test
-    public void testMergingCreateTableAsWitEmptyDistribution() {
+    void testMergingCreateTableAsWitEmptyDistribution() {
         CatalogTable catalogTable =
                 CatalogTable.newBuilder()
                         .schema(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 6a465210929..1ddd3a15860 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -1406,7 +1406,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         checkAlterNonExistTable("alter table %s nonexistent drop watermark");
     }
 
-    @ParameterizedTest(name = "[{index}] {0}")
+    @ParameterizedTest(name = "{index}: {0}")
     @MethodSource("provideCreateMaterializedTableTestCases")
     void createMaterializedTableWithVariousOptions(
             String testName,
@@ -1429,7 +1429,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
         return Stream.of(
                 Arguments.of(
                         "with refresh mode continuous",
-                        "CREATE MATERIALIZED TABLE users_shops ("
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id int, 
user_id int,"
                                 + " PRIMARY KEY (user_id) not enforced)"
                                 + " WITH(\n"
                                 + "   'format' = 'debezium-json'\n"
@@ -1439,14 +1439,14 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 + " AS SELECT 1 as shop_id, 2 as user_id ",
                         "CREATE MATERIALIZED TABLE: (materializedTable: "
                                 + 
"[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n"
-                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `shop_id` INT,\n"
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + "), comment='null', distribution=null, 
partitionKeys=[], "
                                 + "options={format=debezium-json}, 
snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', "
                                 + "freshness=INTERVAL '30' SECOND, 
logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, "
                                 + "refreshStatus=INITIALIZING, 
refreshHandlerDescription='null', serializedRefreshHandler=null}, 
resolvedSchema=(\n"
-                                + "  `shop_id` INT NOT NULL,\n"
+                                + "  `shop_id` INT,\n"
                                 + "  `user_id` INT NOT NULL,\n"
                                 + "  CONSTRAINT `PK_user_id` PRIMARY KEY 
(`user_id`) NOT ENFORCED\n"
                                 + ")}], identifier: 
[`builtin`.`default`.`users_shops`])",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
index afe84800fd3..89d2ee48822 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDmlToOperationConverterTest.java
@@ -53,34 +53,34 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.InstanceOfAssertFactories.type;
 
 /** Test cases for the DML statements for {@link 
SqlNodeToOperationConversion}. */
-public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @Test
-    public void testExplainWithSelect() {
+    void testExplainWithSelect() {
         final String sql = "explain select * from t1";
         checkExplainSql(sql);
     }
 
     @Test
-    public void testExplainWithInsert() {
+    void testExplainWithInsert() {
         final String sql = "explain insert into t2 select * from t1";
         checkExplainSql(sql);
     }
 
     @Test
-    public void testExplainWithUnion() {
+    void testExplainWithUnion() {
         final String sql = "explain select * from t1 union select * from t2";
         checkExplainSql(sql);
     }
 
     @Test
-    public void testExplainWithExplainDetails() {
+    void testExplainWithExplainDetails() {
         String sql = "explain changelog_mode, estimated_cost, 
json_execution_plan select * from t1";
         checkExplainSql(sql);
     }
 
     @Test
-    public void testSqlInsertWithStaticPartition() {
+    void testSqlInsertWithStaticPartition() {
         final String sql = "insert into t1 partition(a=1) select b, c, d from 
t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -93,7 +93,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlInsertWithDynamicTableOptions() {
+    void testSqlInsertWithDynamicTableOptions() {
         final String sql =
                 "insert into t1 /*+ OPTIONS('k1'='v1', 'k2'='v2') */\n"
                         + "select a, b, c, d from t2";
@@ -109,7 +109,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testDynamicTableWithInvalidOptions() {
+    void testDynamicTableWithInvalidOptions() {
         final String sql = "select * from t1 /*+ OPTIONS('opt1', 'opt2') */";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -120,7 +120,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testBeginStatementSet() {
+    void testBeginStatementSet() {
         final String sql = "BEGIN STATEMENT SET";
         Operation operation = parse(sql);
         assertThat(operation).isInstanceOf(BeginStatementSetOperation.class);
@@ -131,7 +131,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testEnd() {
+    void testEnd() {
         final String sql = "END";
         Operation operation = parse(sql);
         assertThat(operation).isInstanceOf(EndStatementSetOperation.class);
@@ -142,7 +142,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithSelect() {
+    void testSqlRichExplainWithSelect() {
         final String sql = "explain plan for select a, b, c, d from t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -151,7 +151,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithInsert() {
+    void testSqlRichExplainWithInsert() {
         final String sql = "explain plan for insert into t1 select a, b, c, d 
from t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -160,7 +160,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithStatementSet() {
+    void testSqlRichExplainWithStatementSet() {
         final String sql =
                 "explain plan for statement set begin "
                         + "insert into t1 select a, b, c, d from t2 where a > 
1;"
@@ -173,7 +173,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testExplainDetailsWithSelect() {
+    void testExplainDetailsWithSelect() {
         final String sql =
                 "explain estimated_cost, changelog_mode, plan_advice select a, 
b, c, d from t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
@@ -182,7 +182,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testExplainDetailsWithInsert() {
+    void testExplainDetailsWithInsert() {
         final String sql =
                 "explain estimated_cost, changelog_mode, plan_advice insert 
into t1 select a, b, c, d from t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
@@ -191,7 +191,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testExplainDetailsWithStatementSet() {
+    void testExplainDetailsWithStatementSet() {
         final String sql =
                 "explain estimated_cost, changelog_mode, plan_advice statement 
set begin "
                         + "insert into t1 select a, b, c, d from t2 where a > 
1;"
@@ -215,7 +215,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlExecuteWithStatementSet() {
+    void testSqlExecuteWithStatementSet() {
         final String sql =
                 "execute statement set begin "
                         + "insert into t1 select a, b, c, d from t2 where a > 
1;"
@@ -228,7 +228,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithExecuteStatementSet() {
+    void testSqlRichExplainWithExecuteStatementSet() {
         final String sql =
                 "EXPLAIN EXECUTE STATEMENT SET BEGIN "
                         + "INSERT INTO t1 SELECT a, b, c, d FROM t2 WHERE a > 
1;"
@@ -241,7 +241,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlExecuteWithInsert() {
+    void testSqlExecuteWithInsert() {
         final String sql = "execute insert into t1 select a, b, c, d from t2 
where a > 1";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -250,7 +250,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithExecuteInsert() {
+    void testSqlRichExplainWithExecuteInsert() {
         final String sql = "EXPLAIN EXECUTE INSERT INTO t1 SELECT a, b, c, d 
FROM t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -259,7 +259,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlExecuteWithSelect() {
+    void testSqlExecuteWithSelect() {
         final String sql = "execute select a, b, c, d from t2 where a > 1";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -268,7 +268,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testSqlRichExplainWithExecuteSelect() {
+    void testSqlRichExplainWithExecuteSelect() {
         final String sql = "EXPLAIN EXECUTE SELECT a, b, c, d FROM t2";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
@@ -277,7 +277,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testDelete() throws Exception {
+    void testDelete() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
         CatalogTable catalogTable =
@@ -311,7 +311,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testUpdate() throws Exception {
+    void testUpdate() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
         CatalogTable catalogTable =
@@ -340,7 +340,7 @@ public class SqlDmlToOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testTruncateTable() {
+    void testTruncateTable() {
         String sql = "TRUNCATE TABLE t1";
         FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
         final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 67a828d7fe0..460d8a91c91 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.operations;
 
+import org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.FunctionDescriptor;
 import org.apache.flink.table.api.Schema;
@@ -34,6 +35,7 @@ import 
org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -50,8 +52,13 @@ import 
org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMap;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -62,11 +69,11 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for the materialized table statements for {@link 
SqlNodeToOperationConversion}. */
-public class SqlMaterializedTableNodeToOperationConverterTest
+class SqlMaterializedTableNodeToOperationConverterTest
         extends SqlNodeToOperationConversionTestBase {
 
     @BeforeEach
-    public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+    void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
         super.before();
         final ObjectPath path3 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t3");
         final Schema tableSchema =
@@ -284,7 +291,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         + ")\n"
                         + "FRESHNESS = INTERVAL '30' SECOND\n"
                         + "REFRESH_MODE = FULL\n"
-                        + "AS SELECT a, f1, f2 FROM t1,LATERAL 
TABLE(myFunc(b)) as T(f1, f2)";
+                        + "AS SELECT a, f1, f2 FROM t1, LATERAL 
TABLE(myFunc(b)) as T(f1, f2)";
         Operation operation = parse(sql);
         
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
 
@@ -397,9 +404,8 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         + "AS SELECT * FROM t1";
 
         assertThatThrownBy(() -> parse(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Primary key validation failed: UNIQUE constraint is 
not supported yet.");
+                .isInstanceOf(SqlValidateException.class)
+                .hasMessageContaining("UNIQUE constraint is not supported 
yet");
 
         // test primary key not defined in source table
         final String sql2 =
@@ -412,7 +418,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
         assertThatThrownBy(() -> parse(sql2))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Primary key column 'e' not defined in the query 
schema. Available columns: ['a', 'b', 'c', 'd'].");
+                        "Primary key column 'e' is not defined in the schema 
at line 2, column 31");
 
         // test primary key with nullable source column
         final String sql3 =
@@ -424,7 +430,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
 
         assertThatThrownBy(() -> parse(sql3))
                 .isInstanceOf(ValidationException.class)
-                .hasMessageContaining("Could not create a PRIMARY KEY with 
nullable column 'd'.");
+                .hasMessageContaining("Invalid primary key 'ct1'. Column 'd' 
is nullable.");
     }
 
     @Test
@@ -438,7 +444,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
         assertThatThrownBy(() -> parse(sql))
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
-                        "Partition column 'e' not defined in the query schema. 
Available columns: ['a', 'b', 'c', 'd'].");
+                        "Partition column 'e' not defined in the query schema. 
Available columns: ['a', 'b', 'c', 'd']");
 
         final String sql2 =
                 "CREATE MATERIALIZED TABLE mtbl1\n"
@@ -516,6 +522,65 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                         "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit.");
     }
 
+    @ParameterizedTest
+    @MethodSource("testDataForCreateMaterializedTableFailedCase")
+    void createMaterializedTableFailedCase(String sql, String 
expectedErrorMsg) {
+        assertThatThrownBy(() -> parse(sql))
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(expectedErrorMsg);
+    }
+
+    @ParameterizedTest
+    @MethodSource("testDataWithDifferentSchemasSuccessCase")
+    void createMaterializedTableSuccessCase(String sql, ResolvedSchema 
expected) {
+        CreateMaterializedTableOperation operation = 
(CreateMaterializedTableOperation) parse(sql);
+        
assertThat(operation.getCatalogMaterializedTable().getResolvedSchema()).isEqualTo(expected);
+    }
+
+    @Test
+    void createMaterializedTableWithWatermark() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE users_shops (watermark for ts as ts 
- interval '2' second)"
+                        + " FRESHNESS = INTERVAL '30' SECOND"
+                        + " AS SELECT 1 as shop_id, 2 as user_id, 
cast(current_timestamp as timestamp(3)) ts";
+        CreateMaterializedTableOperation operation = 
(CreateMaterializedTableOperation) parse(sql);
+        ResolvedSchema schema = 
operation.getCatalogMaterializedTable().getResolvedSchema();
+        assertThat(schema.getColumns())
+                .containsExactly(
+                        Column.physical("shop_id", DataTypes.INT().notNull()),
+                        Column.physical("user_id", DataTypes.INT().notNull()),
+                        Column.physical("ts", 
DataTypes.TIMESTAMP(3).notNull()));
+
+        assertThat(schema.getWatermarkSpecs()).hasSize(1);
+        WatermarkSpec watermarkSpec = schema.getWatermarkSpecs().get(0);
+        assertThat(watermarkSpec.getWatermarkExpression())
+                .hasToString("`ts` - INTERVAL '2' SECOND");
+        assertThat(schema.getPrimaryKey()).isEmpty();
+        assertThat(schema.getIndexes()).isEmpty();
+    }
+
+    @Test
+    void createMaterializedTableWithWatermarkUsingColumnFromCreatePart() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE users_shops (ts TIMESTAMP(3), 
watermark for ts as ts - interval '2' second)"
+                        + " FRESHNESS = INTERVAL '30' SECOND"
+                        + " AS SELECT current_timestamp() as ts, 1 as shop_id, 
2 as user_id";
+        CreateMaterializedTableOperation operation = 
(CreateMaterializedTableOperation) parse(sql);
+        ResolvedSchema schema = 
operation.getCatalogMaterializedTable().getResolvedSchema();
+        assertThat(schema.getColumns())
+                .containsExactly(
+                        Column.physical("ts", DataTypes.TIMESTAMP(3)),
+                        Column.physical("shop_id", DataTypes.INT().notNull()),
+                        Column.physical("user_id", DataTypes.INT().notNull()));
+
+        assertThat(schema.getWatermarkSpecs()).hasSize(1);
+        WatermarkSpec watermarkSpec = schema.getWatermarkSpecs().get(0);
+        assertThat(watermarkSpec.getWatermarkExpression())
+                .hasToString("`ts` - INTERVAL '2' SECOND");
+        assertThat(schema.getPrimaryKey()).isEmpty();
+        assertThat(schema.getIndexes()).isEmpty();
+    }
+
     @Test
     void testAlterMaterializedTableRefreshOperationWithPartitionSpec() {
         final String sql =
@@ -531,7 +596,7 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
     }
 
     @Test
-    public void 
testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() {
+    void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() {
         final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH";
 
         Operation operation = parse(sql);
@@ -644,49 +709,6 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                                                 + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
     }
 
-    @Test
-    void testAlterMaterializedTableAsQueryWithUnsupportedColumnChange() {
-        // 1. delete existing column
-        String sql1 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b FROM 
t3";
-        assertThatThrownBy(() -> parse(sql1))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Failed to modify query because drop column is 
unsupported. When modifying a query, you can only append new columns at the end 
of original schema. The original schema has 4 columns, but the newly derived 
schema from the query has 2 columns.");
-        // 2. swap column position
-        String sql2 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, c 
FROM t3";
-        assertThatThrownBy(() -> parse(sql2))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
-                                + "Column mismatch at position 2: Original 
column is [`c` INT], but new column is [`d` STRING].");
-        // 3. change existing column type
-        String sql3 =
-                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, cast(d 
as int) as d FROM t3";
-        assertThatThrownBy(() -> parse(sql3))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
-                                + "Column mismatch at position 3: Original 
column is [`d` STRING], but new column is [`d` INT].");
-        // 4. change existing column nullability
-        String sql4 =
-                "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
cast('d' as string) as d FROM t3";
-        assertThatThrownBy(() -> parse(sql4))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "When modifying the query of a materialized table, 
currently only support appending columns at the end of original schema, 
dropping, renaming, and reordering columns are not supported.\n"
-                                + "Column mismatch at position 3: Original 
column is [`d` STRING], but new column is [`d` STRING NOT NULL].");
-    }
-
-    @Test
-    void testAlterAlterMaterializedTableAsQueryWithCatalogTable() {
-        // t1 is a CatalogTable not a Materialized Table
-        final String sql = "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1";
-        assertThatThrownBy(() -> parse(sql))
-                .isInstanceOf(ValidationException.class)
-                .hasMessageContaining(
-                        "Only materialized tables support modifying the 
definition query.");
-    }
-
     @Test
     void testDropMaterializedTable() {
         final String sql = "DROP MATERIALIZED TABLE mtbl1";
@@ -706,4 +728,153 @@ public class 
SqlMaterializedTableNodeToOperationConverterTest
                 .isEqualTo(
                         "DROP MATERIALIZED TABLE: (identifier: 
[`builtin`.`default`.`mtbl1`], IfExists: [true])");
     }
+
+    private static Collection<Arguments> 
testDataForCreateMaterializedTableFailedCase() {
+        final Collection<Arguments> list = new ArrayList<>();
+        list.addAll(create());
+        list.addAll(alter());
+        return list;
+    }
+
+    private static Collection<Arguments> alter() {
+        return List.of(
+                Arguments.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b 
FROM t3",
+                        "Failed to modify query because drop column is 
unsupported. When modifying "
+                                + "a query, you can only append new columns at 
the end of original "
+                                + "schema. The original schema has 4 columns, 
but the newly derived "
+                                + "schema from the query has 2 columns."),
+                Arguments.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, d, 
c FROM t3",
+                        "When modifying the query of a materialized table, 
currently only support "
+                                + "appending columns at the end of original 
schema, dropping, "
+                                + "renaming, and reordering columns are not 
supported.\n"
+                                + "Column mismatch at position 2: Original 
column is [`c` INT], "
+                                + "but new column is [`d` STRING]."),
+                Arguments.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
CAST(d AS INT) AS d FROM t3",
+                        "When modifying the query of a materialized table, 
currently only support "
+                                + "appending columns at the end of original 
schema, dropping, "
+                                + "renaming, and reordering columns are not 
supported.\n"
+                                + "Column mismatch at position 3: Original 
column is [`d` STRING], "
+                                + "but new column is [`d` INT]."),
+                Arguments.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
CAST('d' AS STRING) AS d FROM t3",
+                        "When modifying the query of a materialized table, 
currently only support "
+                                + "appending columns at the end of original 
schema, dropping, "
+                                + "renaming, and reordering columns are not 
supported.\n"
+                                + "Column mismatch at position 3: Original 
column is [`d` STRING], "
+                                + "but new column is [`d` STRING NOT NULL]."),
+                Arguments.of(
+                        "ALTER MATERIALIZED TABLE t1 AS SELECT * FROM t1",
+                        "Only materialized tables support modifying the 
definition query."));
+    }
+
+    private static Collection<Arguments> create() {
+        return List.of(
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "The number of columns in the column list must match 
the number of columns in the source schema."),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (id, name, 
address)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "The number of columns in the column list must match 
the number of columns in the source schema."),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (id, name)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "Column 'id' not found in the source schema."),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id 
STRING, user_id STRING)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "Incompatible types for sink column 'shop_id' at 
position 0. The source column has type 'INT NOT NULL', "
+                                + "while the target column has type 
'STRING'."),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id INT, 
WATERMARK FOR ts AS `ts` - INTERVAL '5' SECOND)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "The rowtime attribute field 'ts' is not defined in 
the table schema, at line 1, column 67\n"
+                                + "Available fields: ['shop_id', 'user_id']"),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id INT, 
user_id INT, PRIMARY KEY(id) NOT ENFORCED)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "Primary key column 'id' is not defined in the schema 
at line 1, column 78"),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (WATERMARK FOR 
ts AS ts - INTERVAL '2' SECOND)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "The rowtime attribute field 'ts' is not defined in 
the table schema, at line 1, column 54\n"
+                                + "Available fields: ['shop_id', 'user_id']"),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (a INT, b INT)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "Invalid as physical column 'a' is defined in the DDL, 
but is not used in a query column."),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id INT, b 
INT)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        "Invalid as physical column 'b' is defined in the DDL, 
but is not used in a query column."));
+    }
+
+    private static Collection<Arguments> 
testDataWithDifferentSchemasSuccessCase() {
+        return List.of(
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (shop_id, 
user_id)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        ResolvedSchema.of(
+                                Column.physical("shop_id", 
DataTypes.INT().notNull()),
+                                Column.physical("user_id", 
DataTypes.INT().notNull()))),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT CAST(1 AS DOUBLE) AS shop_id, 
CAST(2 AS STRING) AS user_id",
+                        ResolvedSchema.of(
+                                Column.physical("shop_id", 
DataTypes.DOUBLE().notNull()),
+                                Column.physical("user_id", 
DataTypes.STRING().notNull()))),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (user_id, 
shop_id)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        ResolvedSchema.of(
+                                Column.physical("user_id", 
DataTypes.INT().notNull()),
+                                Column.physical("shop_id", 
DataTypes.INT().notNull()))),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (user_id INT, 
shop_id BIGINT)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        ResolvedSchema.of(
+                                Column.physical("shop_id", DataTypes.BIGINT()),
+                                Column.physical("user_id", DataTypes.INT()))),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (user_id INT, 
shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        new ResolvedSchema(
+                                List.of(
+                                        Column.physical("shop_id", 
DataTypes.BIGINT()),
+                                        Column.physical("user_id", 
DataTypes.INT().notNull())),
+                                List.of(),
+                                
org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
+                                        "PK_user_id", List.of("user_id")),
+                                List.of())),
+                Arguments.of(
+                        "CREATE MATERIALIZED TABLE users_shops (PRIMARY 
KEY(user_id) NOT ENFORCED)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        new ResolvedSchema(
+                                List.of(
+                                        Column.physical("shop_id", 
DataTypes.INT().notNull()),
+                                        Column.physical("user_id", 
DataTypes.INT().notNull())),
+                                List.of(),
+                                
org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
+                                        "PK_user_id", List.of("user_id")),
+                                List.of())));
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java
index 53f45f58d0c..87bc54a5af1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlModelOperationConverterTest.java
@@ -58,9 +58,9 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 /** Test for testing convert model statement to operation. */
-public class SqlModelOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlModelOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
     @Test
-    public void testCreateModel() {
+    void testCreateModel() {
         final String sql =
                 "CREATE MODEL model1 \n"
                         + "INPUT(a bigint comment 'column a', b varchar, c 
int, d varchar)\n"
@@ -102,7 +102,7 @@ public class SqlModelOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testDropModel() throws Exception {
+    void testDropModel() throws Exception {
         Catalog catalog = new GenericInMemoryCatalog("default", "default");
         if (!catalogManager.getCatalog("cat1").isPresent()) {
             catalogManager.registerCatalog("cat1", catalog);
@@ -135,7 +135,7 @@ public class SqlModelOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testDescribeModel() {
+    void testDescribeModel() {
         Operation operation = parse("DESCRIBE MODEL m1");
         assertThat(operation).isInstanceOf(DescribeModelOperation.class);
         DescribeModelOperation describeModelOperation = 
(DescribeModelOperation) operation;
@@ -202,7 +202,7 @@ public class SqlModelOperationConverterTest extends 
SqlNodeToOperationConversion
     }
 
     @Test
-    public void testAlterModel() throws Exception {
+    void testAlterModel() throws Exception {
         Catalog catalog = new GenericInMemoryCatalog("default", "default");
         if (!catalogManager.getCatalog("cat1").isPresent()) {
             catalogManager.registerCatalog("cat1", catalog);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
index a15ac7fc71b..c47d2431080 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToCallOperationTest.java
@@ -42,10 +42,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test cases for the call statements for {@link 
SqlNodeToOperationConversion}. */
-public class SqlNodeToCallOperationTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlNodeToCallOperationTest extends SqlNodeToOperationConversionTestBase {
 
     @BeforeEach
-    public void before() {
+    void before() {
         CatalogWithBuiltInProcedure procedureCatalog =
                 new CatalogWithBuiltInProcedure("procedure_catalog");
         catalogManager.registerCatalog("p1", procedureCatalog);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
index ef251378a5a..9ed19411a7a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversionTestBase.java
@@ -59,7 +59,7 @@ import java.util.function.Supplier;
 import static org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema;
 
 /** Test base for testing convert sql statement to operation. */
-public class SqlNodeToOperationConversionTestBase {
+class SqlNodeToOperationConversionTestBase {
     private final boolean isStreamingMode = false;
     private final TableConfig tableConfig = TableConfig.getDefault();
     protected final Catalog catalog = new 
GenericInMemoryCatalog("MockCatalog", "default");
@@ -96,7 +96,7 @@ public class SqlNodeToOperationConversionTestBase {
                     plannerContext.getRexFactory());
 
     @BeforeEach
-    public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+    void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
         catalogManager.initSchemaResolver(
                 isStreamingMode,
                 ExpressionResolverMocks.basicResolver(catalogManager, 
functionCatalog, parser),
@@ -122,7 +122,7 @@ public class SqlNodeToOperationConversionTestBase {
     }
 
     @AfterEach
-    public void after() throws TableNotExistException {
+    void after() throws TableNotExistException {
         final ObjectPath path1 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t1");
         final ObjectPath path2 = new 
ObjectPath(catalogManager.getCurrentDatabase(), "t2");
         catalog.dropTable(path1, true);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
index 40114de579f..dba843b5eb7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java
@@ -46,10 +46,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to 
operation. */
-public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @Test
-    public void testReplaceTableAs() {
+    void testReplaceTableAs() {
         String tableName = "replace_table";
         String tableComment = "test table comment 表描述";
         String sql =
@@ -62,7 +62,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testReplaceTableAsWithOrderingColumns() {
+    void testReplaceTableAsWithOrderingColumns() {
         String tableName = "replace_table";
         String sql =
                 "REPLACE TABLE "
@@ -78,7 +78,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testReplaceTableAsWithNotFoundColumnIdentifiers() {
+    void testReplaceTableAsWithNotFoundColumnIdentifiers() {
         String tableName = "replace_table";
         String sql =
                 "REPLACE TABLE "
@@ -91,7 +91,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testReplaceTableAsWithMismatchIdentifiersLength() {
+    void testReplaceTableAsWithMismatchIdentifiersLength() {
         String tableName = "replace_table";
         String sql =
                 "REPLACE TABLE "
@@ -106,7 +106,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAs() {
+    void testCreateOrReplaceTableAs() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -116,7 +116,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithColumns() {
+    void testCreateOrReplaceTableAsWithColumns() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -136,7 +136,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithColumnsOverridden() {
+    void testCreateOrReplaceTableAsWithColumnsOverridden() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -157,7 +157,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() {
+    void testCreateOrReplaceTableAsWithNotNullColumnsAreNotAllowed() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -171,7 +171,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void 
testCreateOrReplaceTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() {
+    void 
testCreateOrReplaceTableAsWithOverriddenVirtualMetadataColumnsNotAllowed() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -188,7 +188,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void 
testCreateOrReplaceTableAsWithOverriddenComputedColumnsNotAllowed() {
+    void testCreateOrReplaceTableAsWithOverriddenComputedColumnsNotAllowed() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -204,7 +204,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() {
+    void testCreateOrReplaceTableAsWithIncompatibleImplicitCastTypes() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -221,7 +221,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithDistribution() {
+    void testCreateOrReplaceTableAsWithDistribution() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -240,7 +240,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithPrimaryKey() {
+    void testCreateOrReplaceTableAsWithPrimaryKey() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
@@ -260,7 +260,7 @@ public class SqlRTASNodeToOperationConverterTest extends 
SqlNodeToOperationConve
     }
 
     @Test
-    public void testCreateOrReplaceTableAsWithWatermark() {
+    void testCreateOrReplaceTableAsWithWatermark() {
         String tableName = "create_or_replace_table";
         String sql =
                 "CREATE OR REPLACE TABLE "
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
index 60606aa9771..baab04a471e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlShowToOperationConverterTest.java
@@ -27,17 +27,17 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-public class SqlShowToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
+class SqlShowToOperationConverterTest extends 
SqlNodeToOperationConversionTestBase {
 
     @BeforeEach
-    public void before() throws TableAlreadyExistException, 
DatabaseNotExistException {
+    void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
         // Do nothing
         // No need to create schema, tables and etc. since the test executes 
for unset catalog and
         // database
     }
 
     @AfterEach
-    public void after() throws TableNotExistException {
+    void after() throws TableNotExistException {
         // Do nothing
     }
 

Reply via email to