This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 538c24c8bf4 [FLINK-31499][table] Move `SqlCreateTable` conversion 
logic to `SqlCreateTableConverter` and make it more generic
538c24c8bf4 is described below

commit 538c24c8bf490eb8870c4d39c8ac5b204d3613ee
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Mon Sep 29 10:50:18 2025 +0200

    [FLINK-31499][table] Move `SqlCreateTable` conversion logic to 
`SqlCreateTableConverter` and make it more generic
---
 .../flink/sql/parser/ddl/SqlCreateTable.java       |  16 +-
 .../flink/sql/parser/ddl/SqlCreateTableAs.java     |   3 +-
 .../flink/sql/parser/ddl/SqlCreateTableLike.java   |   3 +-
 .../flink/sql/parser/ddl/SqlReplaceTableAs.java    | 117 ++-----
 .../operations/SqlCreateTableConverter.java        | 359 ---------------------
 .../operations/SqlNodeToOperationConversion.java   |  40 +--
 .../converters/AbstractCreateTableConverter.java   | 131 ++++++++
 .../{ => converters}/MergeTableAsUtil.java         |  20 +-
 .../{ => converters}/MergeTableLikeUtil.java       |   9 +-
 .../{ => converters}/SchemaBuilderUtil.java        |   7 +-
 .../SqlAlterMaterializedTableResumeConverter.java  |  15 +-
 .../converters/SqlAlterModelSetConverter.java      |   2 +-
 .../SqlAlterTableAddPartitionConverter.java        |   3 +-
 .../SqlAlterViewPropertiesConverter.java           |   2 +-
 .../converters/SqlCreateCatalogConverter.java      |  13 +-
 .../SqlCreateMaterializedTableConverter.java       |  25 +-
 .../converters/SqlCreateModelConverter.java        |   1 -
 .../converters/SqlCreateTableAsConverter.java      | 121 +++++++
 .../converters/SqlCreateTableConverter.java        |  86 +++++
 .../converters/SqlCreateTableLikeConverter.java    | 140 ++++++++
 .../converters/SqlCreateViewConverter.java         |   3 +-
 .../operations/converters/SqlNodeConverters.java   |   3 +
 .../converters/SqlReplaceTableAsConverter.java     | 152 +++------
 .../planner/utils/OperationConverterUtils.java     |  10 +-
 .../{ => converters}/MergeTableLikeUtilTest.java   |   2 +-
 25 files changed, 621 insertions(+), 662 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 50e36baf712..d02f696c1fb 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -63,10 +63,6 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
 
     private final List<SqlTableConstraint> tableConstraints;
 
-    public SqlDistribution getDistribution() {
-        return distribution;
-    }
-
     private final SqlDistribution distribution;
 
     private final SqlNodeList partitionKeyList;
@@ -101,7 +97,8 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
                 watermark,
                 comment,
                 isTemporary,
-                ifNotExists);
+                ifNotExists,
+                false);
     }
 
     protected SqlCreateTable(
@@ -116,8 +113,9 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
             @Nullable SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
             boolean isTemporary,
-            boolean ifNotExists) {
-        super(operator, pos, false, ifNotExists);
+            boolean ifNotExists,
+            boolean replace) {
+        super(operator, pos, replace, ifNotExists);
         this.tableName = requireNonNull(tableName, "tableName should not be 
null");
         this.columnList = requireNonNull(columnList, "columnList should not be 
null");
         this.tableConstraints =
@@ -156,6 +154,10 @@ public class SqlCreateTable extends SqlCreate implements 
ExtendedSqlNode {
         return columnList;
     }
 
+    public final SqlDistribution getDistribution() {
+        return distribution;
+    }
+
     public SqlNodeList getPropertyList() {
         return propertyList;
     }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
index c48fbfdf19e..4ee67b8c115 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableAs.java
@@ -98,7 +98,8 @@ public class SqlCreateTableAs extends SqlCreateTable {
                 watermark,
                 comment,
                 isTemporary,
-                ifNotExists);
+                ifNotExists,
+                false);
         this.asQuery =
                 requireNonNull(asQuery, "As clause is required for CREATE 
TABLE AS SELECT DDL");
     }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
index 599a93829d4..f7fa4306258 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTableLike.java
@@ -100,7 +100,8 @@ public class SqlCreateTableLike extends SqlCreateTable {
                 watermark,
                 comment,
                 isTemporary,
-                ifNotExists);
+                ifNotExists,
+                false);
         this.tableLike =
                 requireNonNull(tableLike, "LIKE clause is required for CREATE 
TABLE LIKE DDL");
     }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 47d7b78b211..03c38d396b0 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.error.SqlValidateException;
 
 import org.apache.calcite.sql.SqlCharStringLiteral;
-import org.apache.calcite.sql.SqlCreate;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
@@ -39,9 +38,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.List;
-import java.util.Optional;
-
-import static java.util.Objects.requireNonNull;
 
 /**
  * {@link SqlNode} to describe the [CREATE OR] REPLACE TABLE AS (RTAS) syntax. 
The RTAS would create
@@ -73,7 +69,7 @@ import static java.util.Objects.requireNonNull;
  * AS SELECT * FROM base_table;
  * }</pre>
  */
-public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode {
+public class SqlReplaceTableAs extends SqlCreateTable implements 
ExtendedSqlNode {
 
     public static final SqlSpecialOperator REPLACE_OPERATOR =
             new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL);
@@ -81,28 +77,6 @@ public class SqlReplaceTableAs extends SqlCreate implements 
ExtendedSqlNode {
     public static final SqlSpecialOperator CREATE_OR_REPLACE_OPERATOR =
             new SqlSpecialOperator("CREATE OR REPLACE TABLE AS", 
SqlKind.OTHER_DDL);
 
-    private final SqlIdentifier tableName;
-
-    private final SqlNodeList columnList;
-
-    private final SqlNodeList propertyList;
-
-    private final List<SqlTableConstraint> tableConstraints;
-
-    public SqlDistribution getDistribution() {
-        return distribution;
-    }
-
-    private final SqlDistribution distribution;
-
-    private final SqlNodeList partitionKeyList;
-
-    private final SqlWatermark watermark;
-
-    private final SqlCharStringLiteral comment;
-
-    private final boolean isTemporary;
-
     private final boolean isCreateOrReplace;
 
     private final SqlNode asQuery;
@@ -124,20 +98,17 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
         super(
                 isCreateOrReplace ? CREATE_OR_REPLACE_OPERATOR : 
REPLACE_OPERATOR,
                 pos,
-                true,
-                ifNotExists);
-
-        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
-        this.columnList = requireNonNull(columnList, "columnList should not be 
null");
-        this.tableConstraints =
-                requireNonNull(tableConstraints, "table constraints should not 
be null");
-        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
-        this.distribution = distribution;
-        this.partitionKeyList =
-                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
-        this.watermark = watermark;
-        this.comment = comment;
-        this.isTemporary = isTemporary;
+                tableName,
+                columnList,
+                tableConstraints,
+                propertyList,
+                distribution,
+                partitionKeyList,
+                watermark,
+                comment,
+                isTemporary,
+                ifNotExists,
+                true);
 
         this.asQuery = asQuery;
         this.isCreateOrReplace = isCreateOrReplace;
@@ -146,20 +117,21 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
     @Override
     public @Nonnull List<SqlNode> getOperandList() {
         return ImmutableNullableList.of(
-                tableName,
-                columnList,
-                new SqlNodeList(tableConstraints, SqlParserPos.ZERO),
-                propertyList,
-                partitionKeyList,
-                watermark,
-                comment,
+                getTableName(),
+                getColumnList(),
+                new SqlNodeList(getTableConstraints(), SqlParserPos.ZERO),
+                getPropertyList(),
+                getPartitionKeyList(),
+                getWatermark().get(),
+                getComment().get(),
                 asQuery);
     }
 
     @Override
     public void validate() throws SqlValidateException {
         if (!isSchemaWithColumnsIdentifiersOnly()) {
-            
SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, 
columnList);
+            SqlConstraintValidator.validateAndChangeColumnNullability(
+                    getTableConstraints(), getColumnList());
         }
 
         // The following features are not currently supported by RTAS, but may 
be supported in the
@@ -188,52 +160,17 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
         return isCreateOrReplace;
     }
 
-    public SqlIdentifier getTableName() {
-        return tableName;
-    }
-
-    public SqlNodeList getColumnList() {
-        return columnList;
-    }
-
-    public SqlNodeList getPropertyList() {
-        return propertyList;
-    }
-
-    public SqlNodeList getPartitionKeyList() {
-        return partitionKeyList;
-    }
-
-    public List<SqlTableConstraint> getTableConstraints() {
-        return tableConstraints;
-    }
-
-    public Optional<SqlWatermark> getWatermark() {
-        return Optional.ofNullable(watermark);
-    }
-
-    public Optional<SqlCharStringLiteral> getComment() {
-        return Optional.ofNullable(comment);
-    }
-
-    public boolean isIfNotExists() {
-        return ifNotExists;
-    }
-
-    public boolean isTemporary() {
-        return isTemporary;
-    }
-
     public boolean isSchemaWithColumnsIdentifiersOnly() {
         // REPLACE table supports passing only column identifiers in the 
column list. If
         // the first column in the list is an identifier, then we assume the 
rest of the
         // columns are identifiers as well.
+        SqlNodeList columnList = getColumnList();
         return !columnList.isEmpty() && columnList.get(0) instanceof 
SqlIdentifier;
     }
 
     /** Returns the column constraints plus the table constraints. */
     public List<SqlTableConstraint> getFullConstraints() {
-        return SqlConstraintValidator.getFullConstraints(tableConstraints, 
columnList);
+        return 
SqlConstraintValidator.getFullConstraints(getTableConstraints(), 
getColumnList());
     }
 
     @Override
@@ -242,15 +179,17 @@ public class SqlReplaceTableAs extends SqlCreate 
implements ExtendedSqlNode {
             writer.keyword("CREATE OR");
         }
         writer.keyword("REPLACE TABLE");
-        tableName.unparse(writer, leftPrec, rightPrec);
+        getTableName().unparse(writer, leftPrec, rightPrec);
 
+        SqlCharStringLiteral comment = getComment().orElse(null);
         if (comment != null) {
             writer.newlineAndIndent();
             writer.keyword("COMMENT");
             comment.unparse(writer, leftPrec, rightPrec);
         }
 
-        if (this.propertyList.size() > 0) {
+        SqlNodeList propertyList = getPropertyList();
+        if (!propertyList.isEmpty()) {
             writer.keyword("WITH");
             SqlWriter.Frame withFrame = writer.startList("(", ")");
             for (SqlNode property : propertyList) {
@@ -268,6 +207,6 @@ public class SqlReplaceTableAs extends SqlCreate implements 
ExtendedSqlNode {
     }
 
     public String[] fullTableName() {
-        return tableName.names.toArray(new String[0]);
+        return getTableName().names.toArray(new String[0]);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
deleted file mode 100644
index 56129869fc8..00000000000
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.operations;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
-import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
-import org.apache.flink.sql.parser.ddl.SqlTableLike;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ContextResolvedTable;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedCatalogTable;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
-import org.apache.flink.table.operations.CreateTableASOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
-
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-/** Helper class for converting {@link SqlCreateTable} to {@link 
CreateTableOperation}. */
-class SqlCreateTableConverter {
-
-    private final MergeTableLikeUtil mergeTableLikeUtil;
-    private final MergeTableAsUtil mergeTableAsUtil;
-    private final CatalogManager catalogManager;
-    private final FlinkTypeFactory typeFactory;
-    private final SqlRewriterUtils rewriterUtils;
-
-    SqlCreateTableConverter(
-            FlinkCalciteSqlValidator sqlValidator,
-            CatalogManager catalogManager,
-            Function<SqlNode, String> escapeExpression) {
-        this.mergeTableLikeUtil =
-                new MergeTableLikeUtil(
-                        sqlValidator, escapeExpression, 
catalogManager.getDataTypeFactory());
-        this.mergeTableAsUtil =
-                new MergeTableAsUtil(
-                        sqlValidator, escapeExpression, 
catalogManager.getDataTypeFactory());
-        this.catalogManager = catalogManager;
-        this.typeFactory = (FlinkTypeFactory) sqlValidator.getTypeFactory();
-        this.rewriterUtils = new SqlRewriterUtils(sqlValidator);
-    }
-
-    /** Convert the {@link SqlCreateTable} node. */
-    Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
-        ResolvedCatalogTable catalogTable = createCatalogTable(sqlCreateTable);
-
-        UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
-        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-        return new CreateTableOperation(
-                identifier,
-                catalogTable,
-                sqlCreateTable.isIfNotExists(),
-                sqlCreateTable.isTemporary());
-    }
-
-    /** Convert the {@link SqlCreateTableAs} node. */
-    Operation convertCreateTableAS(
-            FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
-        UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(sqlCreateTableAs.fullTableName());
-        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-        SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
-        SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
-
-        PlannerQueryOperation query =
-                (PlannerQueryOperation)
-                        SqlNodeToOperationConversion.convert(
-                                        flinkPlanner, catalogManager, 
validatedAsQuery)
-                                .orElseThrow(
-                                        () ->
-                                                new TableException(
-                                                        "CTAS unsupported node 
type "
-                                                                + 
validatedAsQuery
-                                                                        
.getClass()
-                                                                        
.getSimpleName()));
-        ResolvedCatalogTable tableWithResolvedSchema =
-                createCatalogTable(sqlCreateTableAs, 
query.getResolvedSchema());
-
-        // If needed, rewrite the query to include the new sink fields in the 
select list
-        query =
-                mergeTableAsUtil.maybeRewriteQuery(
-                        catalogManager,
-                        flinkPlanner,
-                        query,
-                        validatedAsQuery,
-                        tableWithResolvedSchema);
-
-        CreateTableOperation createTableOperation =
-                new CreateTableOperation(
-                        identifier,
-                        tableWithResolvedSchema,
-                        sqlCreateTableAs.isIfNotExists(),
-                        sqlCreateTableAs.isTemporary());
-
-        return new CreateTableASOperation(
-                createTableOperation, Collections.emptyMap(), query, false);
-    }
-
-    private ResolvedCatalogTable createCatalogTable(
-            SqlCreateTableAs sqlCreateTableAs, ResolvedSchema querySchema) {
-        Map<String, String> tableOptions =
-                sqlCreateTableAs.getPropertyList().getList().stream()
-                        .collect(
-                                Collectors.toMap(
-                                        p -> ((SqlTableOption) 
p).getKeyString(),
-                                        p -> ((SqlTableOption) 
p).getValueString()));
-
-        String tableComment =
-                
OperationConverterUtils.getTableComment(sqlCreateTableAs.getComment());
-
-        Schema mergedSchema;
-        if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
-            // If only column identifiers are provided, then these are used to
-            // order the columns in the schema.
-            mergedSchema =
-                    
mergeTableAsUtil.reorderSchema(sqlCreateTableAs.getColumnList(), querySchema);
-        } else {
-            mergedSchema =
-                    mergeTableAsUtil.mergeSchemas(
-                            sqlCreateTableAs.getColumnList(),
-                            sqlCreateTableAs.getWatermark().orElse(null),
-                            sqlCreateTableAs.getFullConstraints(),
-                            querySchema);
-        }
-
-        Optional<TableDistribution> tableDistribution =
-                Optional.ofNullable(sqlCreateTableAs.getDistribution())
-                        
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-
-        List<String> partitionKeys =
-                
getPartitionKeyColumnNames(sqlCreateTableAs.getPartitionKeyList());
-        verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
-        CatalogTable catalogTable =
-                CatalogTable.newBuilder()
-                        .schema(mergedSchema)
-                        .comment(tableComment)
-                        .distribution(tableDistribution.orElse(null))
-                        .options(tableOptions)
-                        .partitionKeys(partitionKeys)
-                        .build();
-
-        return catalogManager.resolveCatalogTable(catalogTable);
-    }
-
-    private ResolvedCatalogTable createCatalogTable(SqlCreateTable 
sqlCreateTable) {
-
-        final Schema sourceTableSchema;
-        final Optional<TableDistribution> sourceTableDistribution;
-        final List<String> sourcePartitionKeys;
-        final List<SqlTableLike.SqlTableLikeOption> likeOptions;
-        final Map<String, String> sourceProperties;
-        if (sqlCreateTable instanceof SqlCreateTableLike) {
-            SqlTableLike sqlTableLike = ((SqlCreateTableLike) 
sqlCreateTable).getTableLike();
-            CatalogTable table = lookupLikeSourceTable(sqlTableLike);
-            sourceTableSchema = table.getUnresolvedSchema();
-            sourceTableDistribution = table.getDistribution();
-            sourcePartitionKeys = table.getPartitionKeys();
-            likeOptions = sqlTableLike.getOptions();
-            sourceProperties = table.getOptions();
-        } else {
-            sourceTableSchema = Schema.newBuilder().build();
-            sourceTableDistribution = Optional.empty();
-            sourcePartitionKeys = Collections.emptyList();
-            likeOptions = Collections.emptyList();
-            sourceProperties = Collections.emptyMap();
-        }
-
-        Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> 
mergingStrategies =
-                mergeTableLikeUtil.computeMergingStrategies(likeOptions);
-
-        Map<String, String> mergedOptions =
-                mergeOptions(sqlCreateTable, sourceProperties, 
mergingStrategies);
-
-        // It is assumed only a primary key constraint may be defined in the 
table. The
-        // SqlCreateTableAs has validations to ensure this before the object 
is created.
-        Optional<SqlTableConstraint> primaryKey =
-                sqlCreateTable.getFullConstraints().stream()
-                        .filter(SqlTableConstraint::isPrimaryKey)
-                        .findAny();
-
-        Schema mergedSchema =
-                mergeTableLikeUtil.mergeTables(
-                        mergingStrategies,
-                        sourceTableSchema,
-                        sqlCreateTable.getColumnList().getList(),
-                        sqlCreateTable
-                                .getWatermark()
-                                .map(Collections::singletonList)
-                                .orElseGet(Collections::emptyList),
-                        primaryKey.orElse(null));
-
-        Optional<TableDistribution> mergedTableDistribution =
-                mergeDistribution(sourceTableDistribution, sqlCreateTable, 
mergingStrategies);
-
-        List<String> partitionKeys =
-                mergePartitions(
-                        sourcePartitionKeys,
-                        sqlCreateTable.getPartitionKeyList(),
-                        mergingStrategies);
-        verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
-        String tableComment = 
OperationConverterUtils.getTableComment(sqlCreateTable.getComment());
-
-        CatalogTable catalogTable =
-                CatalogTable.newBuilder()
-                        .schema(mergedSchema)
-                        .comment(tableComment)
-                        .distribution(mergedTableDistribution.orElse(null))
-                        .options(new HashMap<>(mergedOptions))
-                        .partitionKeys(partitionKeys)
-                        .build();
-
-        return catalogManager.resolveCatalogTable(catalogTable);
-    }
-
-    private CatalogTable lookupLikeSourceTable(SqlTableLike sqlTableLike) {
-        UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
-        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-        ContextResolvedTable lookupResult =
-                catalogManager
-                        .getTable(identifier)
-                        .orElseThrow(
-                                () ->
-                                        new ValidationException(
-                                                String.format(
-                                                        "Source table '%s' of 
the LIKE clause not found in the catalog, at %s",
-                                                        identifier,
-                                                        sqlTableLike
-                                                                
.getSourceTable()
-                                                                
.getParserPosition())));
-        if (!(lookupResult.getResolvedTable() instanceof CatalogTable)) {
-            throw new ValidationException(
-                    String.format(
-                            "Source table '%s' of the LIKE clause can not be a 
VIEW, at %s",
-                            identifier, 
sqlTableLike.getSourceTable().getParserPosition()));
-        }
-        return lookupResult.getResolvedTable();
-    }
-
-    private void verifyPartitioningColumnsExist(Schema mergedSchema, 
List<String> partitionKeys) {
-        Set<String> columnNames =
-                mergedSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String partitionKey : partitionKeys) {
-            if (!columnNames.contains(partitionKey)) {
-                throw new ValidationException(
-                        String.format(
-                                "Partition column '%s' not defined in the 
table schema. Available columns: [%s]",
-                                partitionKey,
-                                columnNames.stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
-            }
-        }
-    }
-
-    private Optional<TableDistribution> mergeDistribution(
-            Optional<TableDistribution> sourceTableDistribution,
-            SqlCreateTable sqlCreateTable,
-            Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> 
mergingStrategies) {
-
-        Optional<TableDistribution> derivedTabledDistribution = 
Optional.empty();
-        if (sqlCreateTable.getDistribution() != null) {
-            TableDistribution distribution =
-                    OperationConverterUtils.getDistributionFromSqlDistribution(
-                            sqlCreateTable.getDistribution());
-            derivedTabledDistribution = Optional.of(distribution);
-        }
-
-        return mergeTableLikeUtil.mergeDistribution(
-                mergingStrategies.get(SqlTableLike.FeatureOption.DISTRIBUTION),
-                sourceTableDistribution,
-                derivedTabledDistribution);
-    }
-
-    private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
-        return partitionKey.getList().stream()
-                .map(p -> ((SqlIdentifier) p).getSimple())
-                .collect(Collectors.toList());
-    }
-
-    private List<String> mergePartitions(
-            List<String> sourcePartitionKeys,
-            SqlNodeList derivedPartitionKeys,
-            Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> 
mergingStrategies) {
-        // set partition key
-        return mergeTableLikeUtil.mergePartitions(
-                mergingStrategies.get(SqlTableLike.FeatureOption.PARTITIONS),
-                sourcePartitionKeys,
-                getPartitionKeyColumnNames(derivedPartitionKeys));
-    }
-
-    private Map<String, String> mergeOptions(
-            SqlCreateTable sqlCreateTable,
-            Map<String, String> sourceProperties,
-            Map<SqlTableLike.FeatureOption, SqlTableLike.MergingStrategy> 
mergingStrategies) {
-        // set with properties
-        Map<String, String> properties = new HashMap<>();
-        sqlCreateTable
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                properties.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
-        return mergeTableLikeUtil.mergeOptions(
-                mergingStrategies.get(SqlTableLike.FeatureOption.OPTIONS),
-                sourceProperties,
-                properties);
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
index a4563a2671c..8208bd2a666 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java
@@ -36,7 +36,6 @@ import org.apache.flink.sql.parser.ddl.SqlAnalyzeTable;
 import org.apache.flink.sql.parser.ddl.SqlCompilePlan;
 import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
 import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
 import org.apache.flink.sql.parser.ddl.SqlDropCatalog;
 import org.apache.flink.sql.parser.ddl.SqlDropDatabase;
@@ -221,7 +220,6 @@ import java.util.stream.Collectors;
 public class SqlNodeToOperationConversion {
     private final FlinkPlannerImpl flinkPlanner;
     private final CatalogManager catalogManager;
-    private final SqlCreateTableConverter createTableConverter;
     private final AlterSchemaConverter alterSchemaConverter;
 
     // ~ Constructors 
-----------------------------------------------------------
@@ -230,11 +228,6 @@ public class SqlNodeToOperationConversion {
             FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager) {
         this.flinkPlanner = flinkPlanner;
         this.catalogManager = catalogManager;
-        this.createTableConverter =
-                new SqlCreateTableConverter(
-                        flinkPlanner.getOrCreateSqlValidator(),
-                        catalogManager,
-                        this::getQuotedSqlString);
         this.alterSchemaConverter =
                 new AlterSchemaConverter(
                         flinkPlanner.getOrCreateSqlValidator(),
@@ -299,14 +292,6 @@ public class SqlNodeToOperationConversion {
                     
converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated));
         } else if (validated instanceof SqlUseDatabase) {
             return Optional.of(converter.convertUseDatabase((SqlUseDatabase) 
validated));
-        } else if (validated instanceof SqlCreateTable) {
-            if (validated instanceof SqlCreateTableAs) {
-                return Optional.of(
-                        converter.createTableConverter.convertCreateTableAS(
-                                flinkPlanner, (SqlCreateTableAs) validated));
-            }
-            return Optional.of(
-                    
converter.createTableConverter.convertCreateTable((SqlCreateTable) validated));
         } else if (validated instanceof SqlDropTable) {
             return Optional.of(converter.convertDropTable((SqlDropTable) 
validated));
         } else if (validated instanceof SqlAlterTable) {
@@ -488,7 +473,7 @@ public class SqlNodeToOperationConversion {
                                                             tableIdentifier)));
             Map<String, String> newProps = new 
HashMap<>(catalogPartition.getProperties());
             newProps.putAll(
-                    
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList()));
+                    
OperationConverterUtils.getProperties(alterTableOptions.getPropertyList()));
             return new AlterPartitionPropertiesOperation(
                     tableIdentifier,
                     partitionSpec,
@@ -496,7 +481,7 @@ public class SqlNodeToOperationConversion {
         } else {
             // it's altering a table
             Map<String, String> changeOptions =
-                    
OperationConverterUtils.extractProperties(alterTableOptions.getPropertyList());
+                    
OperationConverterUtils.getProperties(alterTableOptions.getPropertyList());
             Map<String, String> newOptions = new 
HashMap<>(oldTable.getOptions());
             newOptions.putAll(changeOptions);
             return new AlterTableChangeOperation(
@@ -786,15 +771,8 @@ public class SqlNodeToOperationConversion {
                         .map(comment -> 
comment.getValueAs(NlsString.class).getValue())
                         .orElse(null);
         // set with properties
-        Map<String, String> properties = new HashMap<>();
-        sqlCreateDatabase
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                properties.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
+        final Map<String, String> properties =
+                
OperationConverterUtils.getProperties(sqlCreateDatabase.getPropertyList());
         CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, 
databaseComment);
         return new CreateDatabaseOperation(
                 catalogName, databaseName, catalogDatabase, ignoreIfExists);
@@ -846,14 +824,8 @@ public class SqlNodeToOperationConversion {
             throw new ValidationException(String.format("Catalog %s not 
exists", catalogName));
         }
         // set with properties
-        sqlAlterDatabase
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                properties.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
+        properties.putAll(
+                
OperationConverterUtils.getProperties(sqlAlterDatabase.getPropertyList()));
         CatalogDatabase catalogDatabase =
                 new CatalogDatabaseImpl(properties, 
originCatalogDatabase.getComment());
         return new AlterDatabaseOperation(catalogName, databaseName, 
catalogDatabase);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
new file mode 100644
index 00000000000..f83062103dd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/AbstractCreateTableConverter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract class for converting {@link SqlCreateTable} and it's children to 
create table
+ * operations.
+ */
+public abstract class AbstractCreateTableConverter<T extends SqlCreateTable>
+        implements SqlNodeConverter<T> {
+
+    /** Context of create table converters while mering source and derived 
items. */
+    protected interface MergeContext {
+        Schema getMergedSchema(ResolvedSchema schemaToMerge);
+
+        Map<String, String> getMergedTableOptions();
+
+        List<String> getMergedPartitionKeys();
+
+        Optional<TableDistribution> getMergedTableDistribution();
+    }
+
+    protected abstract MergeContext getMergeContext(T sqlCreateTable, 
ConvertContext context);
+
+    protected final Optional<TableDistribution> getDerivedTableDistribution(T 
sqlCreateTable) {
+        return Optional.ofNullable(sqlCreateTable.getDistribution())
+                
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
+    }
+
+    protected final List<String> getDerivedPartitionKeys(T sqlCreateTable) {
+        return 
OperationConverterUtils.getColumnNames(sqlCreateTable.getPartitionKeyList());
+    }
+
+    protected final Map<String, String> getDerivedTableOptions(T 
sqlCreateTable) {
+        return 
OperationConverterUtils.getProperties(sqlCreateTable.getPropertyList());
+    }
+
+    protected final String getComment(T sqlCreateTable) {
+        return OperationConverterUtils.getComment(sqlCreateTable.getComment());
+    }
+
+    protected final ResolvedCatalogTable getResolvedCatalogTable(
+            T sqlCreateTable, ConvertContext context, ResolvedSchema 
schemaToMerge) {
+        final MergeContext mergeContext = getMergeContext(sqlCreateTable, 
context);
+        final List<String> partitionKeys = 
mergeContext.getMergedPartitionKeys();
+        final Schema schema = mergeContext.getMergedSchema(schemaToMerge);
+        verifyPartitioningColumnsExist(schema, partitionKeys);
+
+        final Map<String, String> tableOptions = 
mergeContext.getMergedTableOptions();
+        final TableDistribution distribution =
+                mergeContext.getMergedTableDistribution().orElse(null);
+        final String comment = getComment(sqlCreateTable);
+        final CatalogTable catalogTable =
+                CatalogTable.newBuilder()
+                        .schema(schema)
+                        .comment(comment)
+                        .distribution(distribution)
+                        .options(tableOptions)
+                        .partitionKeys(partitionKeys)
+                        .build();
+        return context.getCatalogManager().resolveCatalogTable(catalogTable);
+    }
+
+    protected final ObjectIdentifier getIdentifier(SqlCreateTable node, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(node.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    protected final CreateTableOperation getCreateTableOperation(
+            ObjectIdentifier identifier,
+            ResolvedCatalogTable tableWithResolvedSchema,
+            T sqlCreateTable) {
+        return new CreateTableOperation(
+                identifier,
+                tableWithResolvedSchema,
+                sqlCreateTable.isIfNotExists(),
+                sqlCreateTable.isTemporary());
+    }
+
+    private void verifyPartitioningColumnsExist(Schema schema, List<String> 
partitionKeys) {
+        final Set<String> columnNames =
+                schema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toCollection(LinkedHashSet::new));
+        for (String partitionKey : partitionKeys) {
+            if (!columnNames.contains(partitionKey)) {
+                throw new ValidationException(
+                        String.format(
+                                "Partition column '%s' not defined in the 
table schema. Available columns: [%s]",
+                                partitionKey,
+                                columnNames.stream()
+                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
similarity index 96%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
index 3509b62cb1a..cdecc263a6a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableAsUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
@@ -38,6 +38,9 @@ import 
org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
+import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -79,6 +82,13 @@ public class MergeTableAsUtil {
         this.dataTypeFactory = dataTypeFactory;
     }
 
+    public MergeTableAsUtil(ConvertContext context) {
+        this(
+                context.getSqlValidator(),
+                context::toQuotedSqlString,
+                context.getCatalogManager().getDataTypeFactory());
+    }
+
     /**
      * Rewrites the query operation to include only the fields that may be 
persisted in the sink.
      */
@@ -204,9 +214,7 @@ public class MergeTableAsUtil {
         Optional<SqlTableConstraint> primaryKey =
                 
sqlTableConstraints.stream().filter(SqlTableConstraint::isPrimaryKey).findAny();
 
-        if (primaryKey.isPresent()) {
-            schemaBuilder.setPrimaryKey(primaryKey.get());
-        }
+        primaryKey.ifPresent(schemaBuilder::setPrimaryKey);
 
         return schemaBuilder.build();
     }
@@ -233,9 +241,9 @@ public class MergeTableAsUtil {
      */
     private static class SchemaBuilder extends SchemaBuilderUtil {
         // Mapping required when evaluating compute expressions and watermark 
columns.
-        private Map<String, RelDataType> regularAndMetadataFieldNamesToTypes =
+        private final Map<String, RelDataType> 
regularAndMetadataFieldNamesToTypes =
                 new LinkedHashMap<>();
-        private Map<String, RelDataType> computeFieldNamesToTypes = new 
LinkedHashMap<>();
+        private final Map<String, RelDataType> computeFieldNamesToTypes = new 
LinkedHashMap<>();
 
         FlinkTypeFactory typeFactory;
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
similarity index 98%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
index 3dc43b36cfe..9c2958f4b0a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
@@ -83,6 +83,13 @@ class MergeTableLikeUtil {
         this.dataTypeFactory = dataTypeFactory;
     }
 
+    MergeTableLikeUtil(SqlNodeConverter.ConvertContext context) {
+        this(
+                context.getSqlValidator(),
+                context::toQuotedSqlString,
+                context.getCatalogManager().getDataTypeFactory());
+    }
+
     /**
      * Calculates merging strategies for all options. It applies options given 
by a user to the
      * {@link #defaultMergingStrategies}. The {@link MergingStrategy} 
specified for {@link
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
similarity index 97%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
index 61e94edcbb9..0af303a404b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SchemaBuilderUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SchemaBuilderUtil.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
@@ -243,10 +243,7 @@ public class SchemaBuilderUtil {
 
     /** Converts a {@link SqlTableConstraint} to an {@link 
UnresolvedPrimaryKey} object. */
     public UnresolvedPrimaryKey toUnresolvedPrimaryKey(SqlTableConstraint 
primaryKey) {
-        List<String> columnNames =
-                primaryKey.getColumns().getList().stream()
-                        .map(n -> ((SqlIdentifier) n).getSimple())
-                        .collect(Collectors.toList());
+        List<String> columnNames = List.of(primaryKey.getColumnNames());
 
         String constraintName =
                 primaryKey
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
index afb7f272e7b..a395dd20929 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java
@@ -19,13 +19,12 @@
 package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /** A converter for {@link SqlAlterMaterializedTableResume}. */
@@ -41,15 +40,9 @@ public class SqlAlterMaterializedTableResumeConverter
                 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
 
         // get table options
-        Map<String, String> options = new HashMap<>();
-        sqlAlterMaterializedTableResume
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                options.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
+        final Map<String, String> options =
+                OperationConverterUtils.getProperties(
+                        sqlAlterMaterializedTableResume.getPropertyList());
         return new AlterMaterializedTableResumeOperation(identifier, options);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
index 742f9573b57..8c6f465faef 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterModelSetConverter.java
@@ -44,7 +44,7 @@ public class SqlAlterModelSetConverter extends 
AbstractSqlAlterModelConverter<Sq
                         sqlAlterModelSet.ifModelExists());
 
         Map<String, String> changeModelOptions =
-                
OperationConverterUtils.extractProperties(sqlAlterModelSet.getOptionList());
+                
OperationConverterUtils.getProperties(sqlAlterModelSet.getOptionList());
         if (changeModelOptions.isEmpty()) {
             throw new ValidationException("ALTER MODEL SET does not support 
empty option.");
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
index 2e30060bce1..7a4d8fa9932 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterTableAddPartitionConverter.java
@@ -46,8 +46,7 @@ public class SqlAlterTableAddPartitionConverter implements 
SqlNodeConverter<SqlA
         for (int i = 0; i < sqlAddPartitions.getPartSpecs().size(); i++) {
             specs.add(new 
CatalogPartitionSpec(sqlAddPartitions.getPartitionKVs(i)));
             Map<String, String> props =
-                    OperationConverterUtils.extractProperties(
-                            sqlAddPartitions.getPartProps().get(i));
+                    
OperationConverterUtils.getProperties(sqlAddPartitions.getPartProps().get(i));
             partitions.add(new CatalogPartitionImpl(props, null));
         }
         return new AddPartitionsOperation(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
index 8d57143e0a9..6d1f8520038 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterViewPropertiesConverter.java
@@ -41,7 +41,7 @@ public class SqlAlterViewPropertiesConverter implements 
SqlNodeConverter<SqlAlte
                 context.getCatalogManager()
                         
.qualifyIdentifier(UnresolvedIdentifier.of(alterView.fullViewName()));
         Map<String, String> newOptions = new HashMap<>(oldView.getOptions());
-        
newOptions.putAll(OperationConverterUtils.extractProperties(alterView.getPropertyList()));
+        
newOptions.putAll(OperationConverterUtils.getProperties(alterView.getPropertyList()));
         CatalogView newView =
                 CatalogView.of(
                         oldView.getUnresolvedSchema(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
index ea65f8f4ed8..d02c56b57a6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateCatalogConverter.java
@@ -19,14 +19,13 @@
 package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.util.NlsString;
 
-import java.util.HashMap;
 import java.util.Map;
 
 /** A converter for {@link SqlCreateCatalog}. */
@@ -35,14 +34,8 @@ public class SqlCreateCatalogConverter implements 
SqlNodeConverter<SqlCreateCata
     @Override
     public Operation convertSqlNode(SqlCreateCatalog node, ConvertContext 
context) {
         // set with properties
-        Map<String, String> properties = new HashMap<>();
-        node.getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                properties.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
+        final Map<String, String> properties =
+                OperationConverterUtils.getProperties(node.getPropertyList());
 
         return new CreateCatalogOperation(
                 node.catalogName(),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
index f474e9fdf36..d836bfdcfdc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateMaterializedTableConverter.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.planner.operations.converters;
 import org.apache.flink.sql.parser.SqlConstraintValidator;
 import org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable;
 import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.error.SqlValidateException;
 import org.apache.flink.table.api.Schema;
@@ -41,11 +40,9 @@ import 
org.apache.flink.table.planner.utils.OperationConverterUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 
-import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -72,18 +69,11 @@ public class SqlCreateMaterializedTableConverter
 
         // get comment
         String tableComment =
-                
OperationConverterUtils.getTableComment(sqlCreateMaterializedTable.getComment());
+                
OperationConverterUtils.getComment(sqlCreateMaterializedTable.getComment());
 
         // get options
-        Map<String, String> options = new HashMap<>();
-        sqlCreateMaterializedTable
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                options.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
+        final Map<String, String> tableOptions =
+                
OperationConverterUtils.getProperties(sqlCreateMaterializedTable.getPropertyList());
 
         // get freshness
         IntervalFreshness intervalFreshness =
@@ -138,13 +128,12 @@ public class SqlCreateMaterializedTableConverter
 
         // get and verify partition key
         List<String> partitionKeys =
-                
sqlCreateMaterializedTable.getPartitionKeyList().getList().stream()
-                        .map(p -> ((SqlIdentifier) p).getSimple())
-                        .collect(Collectors.toList());
+                OperationConverterUtils.getColumnNames(
+                        sqlCreateMaterializedTable.getPartitionKeyList());
         verifyPartitioningColumnsExist(
                 resolvedSchema,
                 partitionKeys,
-                options.keySet().stream()
+                tableOptions.keySet().stream()
                         .filter(k -> k.startsWith(PARTITION_FIELDS))
                         .collect(Collectors.toSet()));
 
@@ -166,7 +155,7 @@ public class SqlCreateMaterializedTableConverter
                         .comment(tableComment)
                         .distribution(tableDistribution.orElse(null))
                         .partitionKeys(partitionKeys)
-                        .options(options)
+                        .options(tableOptions)
                         .definitionQuery(definitionQuery)
                         .freshness(intervalFreshness)
                         .logicalRefreshMode(logicalRefreshMode)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
index 8a4523b6527..48d56ccbf00 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateModelConverter.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateModelOperation;
-import org.apache.flink.table.planner.operations.SchemaBuilderUtil;
 
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
new file mode 100644
index 00000000000..7cb06b8a451
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.CreateTableASOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTableAs} to {@link 
CreateTableASOperation}. */
+public class SqlCreateTableAsConverter extends 
AbstractCreateTableConverter<SqlCreateTableAs> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, 
ConvertContext context) {
+        final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
+        final CatalogManager catalogManager = context.getCatalogManager();
+        SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
+        SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
+
+        PlannerQueryOperation query =
+                (PlannerQueryOperation)
+                        SqlNodeToOperationConversion.convert(
+                                        flinkPlanner, catalogManager, 
validatedAsQuery)
+                                .orElseThrow(
+                                        () ->
+                                                new TableException(
+                                                        "CTAS unsupported node 
type "
+                                                                + 
validatedAsQuery
+                                                                        
.getClass()
+                                                                        
.getSimpleName()));
+        ResolvedCatalogTable tableWithResolvedSchema =
+                getResolvedCatalogTable(sqlCreateTableAs, context, 
query.getResolvedSchema());
+
+        // If needed, rewrite the query to include the new sink fields in the 
select list
+        query =
+                new MergeTableAsUtil(context)
+                        .maybeRewriteQuery(
+                                catalogManager,
+                                flinkPlanner,
+                                query,
+                                validatedAsQuery,
+                                tableWithResolvedSchema);
+
+        ObjectIdentifier identifier = getIdentifier(sqlCreateTableAs, context);
+        CreateTableOperation createTableOperation =
+                getCreateTableOperation(identifier, tableWithResolvedSchema, 
sqlCreateTableAs);
+
+        return new CreateTableASOperation(createTableOperation, Map.of(), 
query, false);
+    }
+
+    @Override
+    protected MergeContext getMergeContext(
+            SqlCreateTableAs sqlCreateTableAs, ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableAsUtil mergeTableAsUtil = new 
MergeTableAsUtil(context);
+
+            @Override
+            public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+                if (sqlCreateTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+                    // If only column identifiers are provided, then these are 
used to
+                    // order the columns in the schema.
+                    return mergeTableAsUtil.reorderSchema(
+                            sqlCreateTableAs.getColumnList(), schemaToMerge);
+                } else {
+                    return mergeTableAsUtil.mergeSchemas(
+                            sqlCreateTableAs.getColumnList(),
+                            sqlCreateTableAs.getWatermark().orElse(null),
+                            sqlCreateTableAs.getFullConstraints(),
+                            schemaToMerge);
+                }
+            }
+
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                return 
SqlCreateTableAsConverter.this.getDerivedTableOptions(sqlCreateTableAs);
+            }
+
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return 
SqlCreateTableAsConverter.this.getDerivedPartitionKeys(sqlCreateTableAs);
+            }
+
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return 
SqlCreateTableAsConverter.this.getDerivedTableDistribution(sqlCreateTableAs);
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
new file mode 100644
index 00000000000..1cf4d24ebee
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTable} to {@link 
CreateTableOperation}. */
+public class SqlCreateTableConverter extends 
AbstractCreateTableConverter<SqlCreateTable> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateTable sqlCreateTable, 
ConvertContext context) {
+        // no schema to merge for CREATE TABLE
+        ResolvedCatalogTable catalogTable = 
getResolvedCatalogTable(sqlCreateTable, context, null);
+
+        ObjectIdentifier identifier = getIdentifier(sqlCreateTable, context);
+        return getCreateTableOperation(identifier, catalogTable, 
sqlCreateTable);
+    }
+
+    @Override
+    protected MergeContext getMergeContext(SqlCreateTable sqlCreateTable, 
ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableLikeUtil mergeTableLikeUtil = new 
MergeTableLikeUtil(context);
+
+            @Override
+            public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+                final Optional<SqlTableConstraint> tableConstraint =
+                        sqlCreateTable.getFullConstraints().stream()
+                                .filter(SqlTableConstraint::isPrimaryKey)
+                                .findAny();
+                return mergeTableLikeUtil.mergeTables(
+                        Map.of(),
+                        Schema.newBuilder().build(),
+                        sqlCreateTable.getColumnList().getList(),
+                        sqlCreateTable
+                                .getWatermark()
+                                .map(Collections::singletonList)
+                                .orElseGet(Collections::emptyList),
+                        tableConstraint.orElse(null));
+            }
+
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                return 
SqlCreateTableConverter.this.getDerivedTableOptions(sqlCreateTable);
+            }
+
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return 
SqlCreateTableConverter.this.getDerivedPartitionKeys(sqlCreateTable);
+            }
+
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return 
SqlCreateTableConverter.this.getDerivedTableDistribution(sqlCreateTable);
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
new file mode 100644
index 00000000000..8e949eb0d14
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
+import org.apache.flink.sql.parser.ddl.SqlTableLike;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.planner.utils.OperationConverterUtils;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Helper class for converting {@link SqlCreateTableLike} to {@link 
CreateTableOperation}. */
+public class SqlCreateTableLikeConverter extends 
AbstractCreateTableConverter<SqlCreateTableLike> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateTableLike sqlCreateTableLike, 
ConvertContext context) {
+        ResolvedCatalogTable catalogTable =
+                // no schema definition to merge for CREATE TABLE ... LIKE, 
schema from source table
+                // will be merged
+                getResolvedCatalogTable(sqlCreateTableLike, context, null);
+        final ObjectIdentifier identifier = getIdentifier(sqlCreateTableLike, 
context);
+        return getCreateTableOperation(identifier, catalogTable, 
sqlCreateTableLike);
+    }
+
+    private CatalogTable lookupLikeSourceTable(
+            SqlTableLike sqlTableLike, CatalogManager catalogManager) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlTableLike.getSourceTable().names);
+        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+        ContextResolvedTable lookupResult =
+                catalogManager
+                        .getTable(identifier)
+                        .orElseThrow(
+                                () ->
+                                        new ValidationException(
+                                                String.format(
+                                                        "Source table '%s' of 
the LIKE clause not found in the catalog, at %s",
+                                                        identifier,
+                                                        sqlTableLike
+                                                                
.getSourceTable()
+                                                                
.getParserPosition())));
+        if (!(lookupResult.getResolvedTable() instanceof CatalogTable)) {
+            throw new ValidationException(
+                    String.format(
+                            "Source table '%s' of the LIKE clause can not be a 
VIEW, at %s",
+                            identifier, 
sqlTableLike.getSourceTable().getParserPosition()));
+        }
+        return lookupResult.getResolvedTable();
+    }
+
+    @Override
+    protected MergeContext getMergeContext(
+            SqlCreateTableLike sqlCreateTableLike, ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableLikeUtil mergeTableLikeUtil = new 
MergeTableLikeUtil(context);
+            private final SqlTableLike sqlTableLike = 
sqlCreateTableLike.getTableLike();
+            private final CatalogTable table =
+                    lookupLikeSourceTable(sqlTableLike, 
context.getCatalogManager());
+            private final Map<SqlTableLike.FeatureOption, 
SqlTableLike.MergingStrategy>
+                    mergingStrategies =
+                            
mergeTableLikeUtil.computeMergingStrategies(sqlTableLike.getOptions());
+
+            @Override
+            public Schema getMergedSchema(ResolvedSchema schemaToMerge) {
+                final Optional<SqlTableConstraint> tableConstraint =
+                        sqlCreateTableLike.getFullConstraints().stream()
+                                .filter(SqlTableConstraint::isPrimaryKey)
+                                .findAny();
+                return mergeTableLikeUtil.mergeTables(
+                        mergingStrategies,
+                        table.getUnresolvedSchema(),
+                        sqlCreateTableLike.getColumnList().getList(),
+                        sqlCreateTableLike
+                                .getWatermark()
+                                .map(Collections::singletonList)
+                                .orElseGet(Collections::emptyList),
+                        tableConstraint.orElse(null));
+            }
+
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                final Map<String, String> derivedTableOptions =
+                        
OperationConverterUtils.getProperties(sqlCreateTableLike.getPropertyList());
+                return mergeTableLikeUtil.mergeOptions(
+                        
mergingStrategies.get(SqlTableLike.FeatureOption.OPTIONS),
+                        table.getOptions(),
+                        derivedTableOptions);
+            }
+
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return mergeTableLikeUtil.mergePartitions(
+                        
mergingStrategies.get(SqlTableLike.FeatureOption.PARTITIONS),
+                        table.getPartitionKeys(),
+                        
SqlCreateTableLikeConverter.this.getDerivedPartitionKeys(
+                                sqlCreateTableLike));
+            }
+
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return mergeTableLikeUtil.mergeDistribution(
+                        
mergingStrategies.get(SqlTableLike.FeatureOption.DISTRIBUTION),
+                        table.getDistribution(),
+                        
SqlCreateTableLikeConverter.this.getDerivedTableDistribution(
+                                sqlCreateTableLike));
+            }
+        };
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
index 4e98455c1b1..88455f91e80 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateViewConverter.java
@@ -51,8 +51,7 @@ public class SqlCreateViewConverter implements 
SqlNodeConverter<SqlCreateView> {
                         .map(c -> c.getValueAs(NlsString.class).getValue())
                         .orElse(null);
         Map<String, String> viewOptions =
-                OperationConverterUtils.extractProperties(
-                        sqlCreateView.getProperties().orElse(null));
+                
OperationConverterUtils.getProperties(sqlCreateView.getProperties().orElse(null));
         CatalogView catalogView =
                 SqlNodeConvertUtils.toCatalogView(
                         query, viewFields, viewOptions, viewComment, context);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
index 76c8f1b1f87..92ac82a8b75 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java
@@ -79,6 +79,9 @@ public class SqlNodeConverters {
         register(new SqlShowCatalogsConverter());
         register(new SqlDescribeFunctionConverter());
         register(new SqlDescribeModelConverter());
+        register(new SqlCreateTableAsConverter());
+        register(new SqlCreateTableConverter());
+        register(new SqlCreateTableLikeConverter());
     }
 
     /**
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
index b2b5d8f1039..12bfa23dbc1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java
@@ -19,54 +19,32 @@
 package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs;
-import org.apache.flink.sql.parser.ddl.SqlTableOption;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ReplaceTableAsOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
-import org.apache.flink.table.planner.operations.MergeTableAsUtil;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
-import org.apache.flink.table.planner.utils.OperationConverterUtils;
 
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlNodeList;
-
-import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /** A converter for {@link SqlReplaceTableAs}. */
-public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTableAs> {
+public class SqlReplaceTableAsConverter extends 
AbstractCreateTableConverter<SqlReplaceTableAs> {
 
     @Override
     public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, 
ConvertContext context) {
         CatalogManager catalogManager = context.getCatalogManager();
-        UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName());
-        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
         FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
 
-        MergeTableAsUtil mergeTableAsUtil =
-                new MergeTableAsUtil(
-                        context.getSqlValidator(),
-                        sqlNode -> sqlNode.toString(),
-                        catalogManager.getDataTypeFactory());
-
         PlannerQueryOperation query =
                 (PlannerQueryOperation)
                         SqlNodeToOperationConversion.convert(
@@ -84,109 +62,63 @@ public class SqlReplaceTableAsConverter implements 
SqlNodeConverter<SqlReplaceTa
 
         // get table
         ResolvedCatalogTable tableWithResolvedSchema =
-                createCatalogTable(
-                        context, mergeTableAsUtil, sqlReplaceTableAs, 
query.getResolvedSchema());
+                getResolvedCatalogTable(sqlReplaceTableAs, context, 
query.getResolvedSchema());
 
         // If needed, rewrite the query to include the new sink fields in the 
select list
         query =
-                mergeTableAsUtil.maybeRewriteQuery(
-                        context.getCatalogManager(),
-                        flinkPlanner,
-                        query,
-                        sqlReplaceTableAs.getAsQuery(),
-                        tableWithResolvedSchema);
-
+                new MergeTableAsUtil(context)
+                        .maybeRewriteQuery(
+                                context.getCatalogManager(),
+                                flinkPlanner,
+                                query,
+                                sqlReplaceTableAs.getAsQuery(),
+                                tableWithResolvedSchema);
+
+        ObjectIdentifier identifier = getIdentifier(sqlReplaceTableAs, 
context);
         CreateTableOperation createTableOperation =
-                new CreateTableOperation(
-                        identifier,
-                        tableWithResolvedSchema,
-                        sqlReplaceTableAs.isIfNotExists(),
-                        sqlReplaceTableAs.isTemporary());
+                getCreateTableOperation(identifier, tableWithResolvedSchema, 
sqlReplaceTableAs);
 
         return new ReplaceTableAsOperation(
                 createTableOperation, query, 
sqlReplaceTableAs.isCreateOrReplace());
     }
 
-    private ResolvedCatalogTable createCatalogTable(
-            ConvertContext context,
-            MergeTableAsUtil mergeTableAsUtil,
-            SqlReplaceTableAs sqlReplaceTableAs,
-            ResolvedSchema querySchema) {
-        CatalogManager catalogManager = context.getCatalogManager();
-
-        // get table comment
-        String tableComment =
-                
OperationConverterUtils.getTableComment(sqlReplaceTableAs.getComment());
-
-        // get table properties
-        Map<String, String> properties = new HashMap<>();
-        sqlReplaceTableAs
-                .getPropertyList()
-                .getList()
-                .forEach(
-                        p ->
-                                properties.put(
-                                        ((SqlTableOption) p).getKeyString(),
-                                        ((SqlTableOption) 
p).getValueString()));
-
-        Schema mergedSchema;
-        if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
-            // If only column identifiers are provided, then these are used to
-            // order the columns in the schema.
-            mergedSchema =
-                    
mergeTableAsUtil.reorderSchema(sqlReplaceTableAs.getColumnList(), querySchema);
-        } else {
-            // merge schemas
-            mergedSchema =
-                    mergeTableAsUtil.mergeSchemas(
+    @Override
+    protected MergeContext getMergeContext(
+            SqlReplaceTableAs sqlReplaceTableAs, ConvertContext context) {
+        return new MergeContext() {
+            private final MergeTableAsUtil mergeTableAsUtil = new 
MergeTableAsUtil(context);
+
+            @Override
+            public Schema getMergedSchema(ResolvedSchema querySchema) {
+                if (sqlReplaceTableAs.isSchemaWithColumnsIdentifiersOnly()) {
+                    // If only column identifiers are provided, then these are 
used to
+                    // order the columns in the schema.
+                    return mergeTableAsUtil.reorderSchema(
+                            sqlReplaceTableAs.getColumnList(), querySchema);
+                } else {
+                    return mergeTableAsUtil.mergeSchemas(
                             sqlReplaceTableAs.getColumnList(),
                             sqlReplaceTableAs.getWatermark().orElse(null),
                             sqlReplaceTableAs.getFullConstraints(),
                             querySchema);
-        }
-
-        // get distribution
-        Optional<TableDistribution> tableDistribution =
-                Optional.ofNullable(sqlReplaceTableAs.getDistribution())
-                        
.map(OperationConverterUtils::getDistributionFromSqlDistribution);
-
-        // get partition key
-        List<String> partitionKeys =
-                
getPartitionKeyColumnNames(sqlReplaceTableAs.getPartitionKeyList());
-        verifyPartitioningColumnsExist(mergedSchema, partitionKeys);
-
-        CatalogTable catalogTable =
-                CatalogTable.newBuilder()
-                        .schema(mergedSchema)
-                        .comment(tableComment)
-                        .distribution(tableDistribution.orElse(null))
-                        .options(properties)
-                        .partitionKeys(partitionKeys)
-                        .build();
+                }
+            }
 
-        return catalogManager.resolveCatalogTable(catalogTable);
-    }
+            @Override
+            public Map<String, String> getMergedTableOptions() {
+                return 
SqlReplaceTableAsConverter.this.getDerivedTableOptions(sqlReplaceTableAs);
+            }
 
-    private List<String> getPartitionKeyColumnNames(SqlNodeList partitionKey) {
-        return partitionKey.getList().stream()
-                .map(p -> ((SqlIdentifier) p).getSimple())
-                .collect(Collectors.toList());
-    }
+            @Override
+            public List<String> getMergedPartitionKeys() {
+                return 
SqlReplaceTableAsConverter.this.getDerivedPartitionKeys(sqlReplaceTableAs);
+            }
 
-    private void verifyPartitioningColumnsExist(Schema mergedSchema, 
List<String> partitionKeys) {
-        Set<String> columnNames =
-                mergedSchema.getColumns().stream()
-                        .map(Schema.UnresolvedColumn::getName)
-                        .collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String partitionKey : partitionKeys) {
-            if (!columnNames.contains(partitionKey)) {
-                throw new ValidationException(
-                        String.format(
-                                "Partition column '%s' not defined in the 
table schema. Available columns: [%s]",
-                                partitionKey,
-                                columnNames.stream()
-                                        .collect(Collectors.joining("', '", 
"'", "'"))));
+            @Override
+            public Optional<TableDistribution> getMergedTableDistribution() {
+                return 
SqlReplaceTableAsConverter.this.getDerivedTableDistribution(
+                        sqlReplaceTableAs);
             }
-        }
+        };
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 1b1d224cf85..d475d9f33c6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -92,11 +92,11 @@ public class OperationConverterUtils {
                 .orElse(null);
     }
 
-    public static @Nullable String 
getTableComment(Optional<SqlCharStringLiteral> tableComment) {
+    public static @Nullable String getComment(Optional<SqlCharStringLiteral> 
tableComment) {
         return tableComment.map(comment -> 
comment.getValueAs(String.class)).orElse(null);
     }
 
-    public static Map<String, String> extractProperties(SqlNodeList propList) {
+    public static Map<String, String> getProperties(SqlNodeList propList) {
         Map<String, String> properties = new HashMap<>();
         if (propList != null) {
             propList.getList()
@@ -109,6 +109,12 @@ public class OperationConverterUtils {
         return properties;
     }
 
+    public static List<String> getColumnNames(SqlNodeList sqlNodeList) {
+        return sqlNodeList.getList().stream()
+                .map(p -> ((SqlIdentifier) p).getSimple())
+                .collect(Collectors.toList());
+    }
+
     public static TableDistribution getDistributionFromSqlDistribution(
             SqlDistribution distribution) {
         TableDistribution.Kind kind =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
similarity index 99%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
index 785b08ca18c..50e14fb2caa 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/MergeTableLikeUtilTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtilTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.operations;
+package org.apache.flink.table.planner.operations.converters;
 
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlComputedColumn;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn.SqlMetadataColumn;


Reply via email to