This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 423fffda724 [Chore](nereids) remove CreateTableStmt and CreateMTMVStmt 
(#55522)
423fffda724 is described below

commit 423fffda72483abe2cd9694004bc3277c12c748a
Author: csding <[email protected]>
AuthorDate: Tue Sep 16 15:40:33 2025 +0800

    [Chore](nereids) remove CreateTableStmt and CreateMTMVStmt (#55522)
---
 .../main/java/org/apache/doris/catalog/Env.java    |   28 -
 .../org/apache/doris/catalog/OlapTableFactory.java |   28 -
 .../apache/doris/common/util/IdGeneratorUtil.java  |   20 -
 .../org/apache/doris/datasource/CatalogIf.java     |    7 -
 .../apache/doris/datasource/ExternalCatalog.java   |   26 -
 .../apache/doris/datasource/InternalCatalog.java   | 1254 +-------------------
 .../doris/datasource/hive/HiveMetadataOps.java     |  117 --
 .../iceberg/IcebergDLFExternalCatalog.java         |    6 -
 .../datasource/iceberg/IcebergMetadataOps.java     |   58 -
 .../datasource/operations/ExternalMetadataOps.java |   17 -
 .../main/java/org/apache/doris/qe/DdlExecutor.java |   54 -
 .../doris/datasource/hive/HiveMetadataOpsTest.java |  170 ---
 12 files changed, 55 insertions(+), 1730 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5df939f2e50..963ee148b19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -28,7 +28,6 @@ import org.apache.doris.analysis.AddPartitionClause;
 import org.apache.doris.analysis.AddPartitionLikeClause;
 import org.apache.doris.analysis.AlterMultiPartitionClause;
 import org.apache.doris.analysis.ColumnRenameClause;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.Expr;
@@ -3472,33 +3471,6 @@ public class Env {
         return catalogIf.createTable(createTableInfo);
     }
 
-    /**
-     * Following is the step to create an olap table:
-     * 1. create columns
-     * 2. create partition info
-     * 3. create distribution info
-     * 4. set table id and base index id
-     * 5. set bloom filter columns
-     * 6. set and build TableProperty includes:
-     * 6.1. dynamicProperty
-     * 6.2. replicationNum
-     * 6.3. inMemory
-     * 6.4. storageFormat
-     * 6.5. compressionType
-     * 7. set index meta
-     * 8. check colocation properties
-     * 9. create tablet in BE
-     * 10. add this table to FE's meta
-     * 11. add this table to ColocateGroup if necessary
-     * @return if CreateTableStmt.isIfNotExists is true, return true if table 
already exists
-     * otherwise return false
-     */
-    public boolean createTable(CreateTableStmt stmt) throws UserException {
-        CatalogIf<?> catalogIf = 
catalogMgr.getCatalogOrException(stmt.getCatalogName(),
-                catalog -> new DdlException(("Unknown catalog " + catalog)));
-        return catalogIf.createTable(stmt);
-    }
-
     /**
      * Adds a partition to a table
      *
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
index fa8adad96ea..d68178b53aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTableFactory.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.analysis.CreateMTMVStmt;
-import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.DdlStmt;
 import org.apache.doris.catalog.TableIf.TableType;
 import org.apache.doris.mtmv.MTMVPartitionInfo;
 import org.apache.doris.mtmv.MTMVRefreshInfo;
@@ -73,16 +70,6 @@ public class OlapTableFactory {
         }
     }
 
-    public static TableType getTableType(DdlStmt stmt) {
-        if (stmt instanceof CreateMTMVStmt) {
-            return TableType.MATERIALIZED_VIEW;
-        } else if (stmt instanceof CreateTableStmt) {
-            return TableType.OLAP;
-        } else {
-            throw new IllegalArgumentException("Invalid DDL statement: " + 
stmt.toSql());
-        }
-    }
-
     public OlapTableFactory init(TableType type, boolean isTemporary) {
         if (type == TableType.OLAP) {
             params = new OlapTableParams(isTemporary);
@@ -204,19 +191,4 @@ public class OlapTableFactory {
                 .withMvRelation(createMTMVInfo.getRelation());
         }
     }
-
-    public OlapTableFactory withExtraParams(DdlStmt stmt) {
-        boolean isMaterializedView = stmt instanceof CreateMTMVStmt;
-        if (!isMaterializedView) {
-            CreateTableStmt createOlapTableStmt = (CreateTableStmt) stmt;
-            return withIndexes(new 
TableIndexes(createOlapTableStmt.getIndexes()));
-        } else {
-            CreateMTMVStmt createMTMVStmt = (CreateMTMVStmt) stmt;
-            return withRefreshInfo(createMTMVStmt.getRefreshInfo())
-                    .withQuerySql(createMTMVStmt.getQuerySql())
-                    .withMvProperties(createMTMVStmt.getMvProperties())
-                    .withMvPartitionInfo(createMTMVStmt.getMvPartitionInfo())
-                    .withMvRelation(createMTMVStmt.getRelation());
-        }
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java
index c3eaa4e57c6..2aa7086b0ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.common.util;
 
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.SinglePartitionDesc;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
@@ -52,25 +51,6 @@ public class IdGeneratorUtil {
         return bufferSize;
     }
 
-    public static long getBufferSizeForCreateTable(CreateTableStmt stmt, 
ReplicaAllocation replicaAlloc)
-            throws DdlException {
-        long bufferSize = 1;
-        long partitionNum = stmt.getPartitionDesc() == null ? 1 :
-                stmt.getPartitionDesc().getSinglePartitionDescs().size();
-        long indexNum = stmt.getRollupAlterClauseList().size() + 1;
-        long bucketNum = 
stmt.getDistributionDesc().toDistributionInfo(stmt.getColumns()).getBucketNum();
-        bufferSize = bufferSize + partitionNum + indexNum;
-        if (stmt.getPartitionDesc() == null) {
-            bufferSize = bufferSize + (replicaAlloc.getTotalReplicaNum() + 1) 
* indexNum * bucketNum;
-        } else {
-            for (SinglePartitionDesc partitionDesc : 
stmt.getPartitionDesc().getSinglePartitionDescs()) {
-                long replicaNum = 
partitionDesc.getReplicaAlloc().getTotalReplicaNum();
-                bufferSize = bufferSize + (replicaNum + 1) * indexNum * 
bucketNum;
-            }
-        }
-        return bufferSize;
-    }
-
     public static long getBufferSizeForTruncateTable(OlapTable table, 
Collection<Long> partitionIds) {
         long bufferSize = 0;
         for (Long partitionId : partitionIds) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index 2ce8f3c22b6..eef6b96e23d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -18,7 +18,6 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.analysis.ColumnPosition;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
@@ -198,12 +197,6 @@ public interface CatalogIf<T extends DatabaseIf> {
      */
     boolean createTable(CreateTableInfo createTableInfo) throws UserException;
 
-    /**
-     * @return if org.apache.doris.analysis.CreateTableStmt.ifNotExists is 
true, return true if table exists,
-     * return false otherwise
-     */
-    boolean createTable(CreateTableStmt stmt) throws UserException;
-
     void dropTable(String dbName, String tableName, boolean isView, boolean 
isMtmv, boolean ifExists,
             boolean mustTemporary, boolean force) throws DdlException;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index d04ac136212..239f7e3afe1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -18,7 +18,6 @@
 package org.apache.doris.datasource;
 
 import org.apache.doris.analysis.ColumnPosition;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
@@ -1121,31 +1120,6 @@ public abstract class ExternalCatalog
         }
     }
 
-    @Override
-    public boolean createTable(CreateTableStmt stmt) throws UserException {
-        makeSureInitialized();
-        if (metadataOps == null) {
-            throw new DdlException("Create table is not supported for catalog: 
" + getName());
-        }
-        try {
-            boolean res = metadataOps.createTable(stmt);
-            if (!res) {
-                // res == false means the table does not exist before, and we 
create it.
-                // we should get the table stored in Doris, and use local name 
in edit log.
-                org.apache.doris.persist.CreateTableInfo info = new 
org.apache.doris.persist.CreateTableInfo(
-                        getName(),
-                        stmt.getDbName(),
-                        stmt.getTableName());
-                Env.getCurrentEnv().getEditLog().logCreateTable(info);
-                LOG.info("finished to create table {}.{}.{}", getName(), 
stmt.getDbName(), stmt.getTableName());
-            }
-            return res;
-        } catch (Exception e) {
-            LOG.warn("Failed to create a table.", e);
-            throw e;
-        }
-    }
-
     public void replayCreateTable(String dbName, String tblName) {
         if (metadataOps != null) {
             metadataOps.afterCreateTable(dbName, tblName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index dc633922942..d623f07a43e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -23,7 +23,6 @@ import org.apache.doris.analysis.AddPartitionLikeClause;
 import org.apache.doris.analysis.AddRollupClause;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.AlterMultiPartitionClause;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DataSortInfo;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.DropPartitionClause;
@@ -123,7 +122,6 @@ import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.es.EsRepository;
-import org.apache.doris.encryption.EncryptionKey;
 import org.apache.doris.event.DropPartitionEvent;
 import org.apache.doris.mtmv.MTMVUtil;
 import org.apache.doris.mysql.privilege.PrivPredicate;
@@ -1265,93 +1263,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return false;
     }
 
-    /**
-     * Following is the step to create an olap table:
-     * 1. create columns
-     * 2. create partition info
-     * 3. create distribution info
-     * 4. set table id and base index id
-     * 5. set bloom filter columns
-     * 6. set and build TableProperty includes:
-     * 6.1. dynamicProperty
-     * 6.2. replicationNum
-     * 6.3. inMemory
-     * 6.4. storageFormat
-     * 6.5. compressionType
-     * 7. set index meta
-     * 8. check colocation properties
-     * 9. create tablet in BE
-     * 10. add this table to FE's meta
-     * 11. add this table to ColocateGroup if necessary
-     */
-    public boolean createTable(CreateTableStmt stmt) throws UserException {
-        String engineName = stmt.getEngineName();
-        String dbName = stmt.getDbName();
-        String tableName = stmt.getTableName();
-        String tableShowName = tableName;
-        if (stmt.isTemp()) {
-            tableName = Util.generateTempTableInnerName(tableName);
-        }
-
-        // check if db exists
-        Database db = getDbOrDdlException(dbName);
-        // InfoSchemaDb and MysqlDb can not create table manually
-        if (db instanceof MysqlCompatibleDatabase) {
-            ErrorReport.reportDdlException(ErrorCode.ERR_CANT_CREATE_TABLE, 
tableShowName,
-                    ErrorCode.ERR_CANT_CREATE_TABLE.getCode(), "not supported 
create table in this database");
-        }
-
-        // only internal table should check quota and cluster capacity
-        if (!stmt.isExternal()) {
-            checkAvailableCapacity(db);
-        }
-
-        // check if table exists in db
-        boolean isTableExist = db.isTableExist(tableName);
-        if (isTableExist) {
-            if (stmt.isSetIfNotExists()) {
-                LOG.info("create table[{}] which already exists", 
tableShowName);
-                return true;
-            } else {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
-            }
-        }
-        if 
(db.getTable(RestoreJob.tableAliasWithAtomicRestore(tableName)).isPresent()) {
-            ErrorReport.reportDdlException(
-                    "table[{}] is in atomic restore, please cancel the restore 
operation firstly",
-                    ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-        }
-
-        if (engineName.equals("olap")) {
-            return createOlapTable(db, stmt);
-        }
-        if (engineName.equals("odbc")) {
-            return createOdbcTable(db, stmt);
-        }
-        if (engineName.equals("mysql")) {
-            return createMysqlTable(db, stmt);
-        }
-        if (engineName.equals("broker")) {
-            return createBrokerTable(db, stmt);
-        }
-        if (engineName.equalsIgnoreCase("elasticsearch") || 
engineName.equalsIgnoreCase("es")) {
-            return createEsTable(db, stmt);
-        }
-        if (engineName.equalsIgnoreCase("hive")) {
-            // should use hive catalog to create external hive table
-            throw new UserException("Cannot create hive table in internal 
catalog, should switch to hive catalog.");
-        }
-        if (engineName.equalsIgnoreCase("jdbc")) {
-            return createJdbcTable(db, stmt);
-
-        } else {
-            
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, 
engineName);
-        }
-
-        Preconditions.checkState(false);
-        return false;
-    }
-
     public void replayCreateTable(String dbName, long dbId, Table table) 
throws MetaNotFoundException {
         if (dbId != -1L) {
             Database db = getDbOrMetaException(dbId);
@@ -2323,17 +2234,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         throw new AnalysisException("Cannot find column `" + name + "` in 
table's columns");
     }
 
-    private Type getChildTypeByName(String name, CreateTableStmt stmt)
-            throws AnalysisException {
-        List<Column> columns = stmt.getColumns();
-        for (Column col : columns) {
-            if (col.nameEquals(name, false)) {
-                return col.getType();
-            }
-        }
-        throw new AnalysisException("Cannot find column `" + name + "` in 
table's columns");
-    }
-
     private boolean findAllowNullforSlotRef(List<Column> baseSchema, SlotRef 
slot) throws AnalysisException {
         for (Column col : baseSchema) {
             if (col.nameEquals(slot.getColumnName(), true)) {
@@ -2436,47 +2336,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
     }
 
-    private void checkLegalityofPartitionExprs(CreateTableStmt stmt, 
PartitionDesc partitionDesc)
-            throws AnalysisException {
-        for (Expr expr : partitionDesc.getPartitionExprs()) {
-            if (expr instanceof FunctionCallExpr) { // test them
-                if (!partitionDesc.isAutoCreatePartitions() || 
partitionDesc.getType() != PartitionType.RANGE) {
-                    throw new AnalysisException("only Auto Range Partition 
support FunctionCallExpr");
-                }
-
-                FunctionCallExpr func = (FunctionCallExpr) expr;
-                ArrayList<Expr> children = func.getChildren();
-                Type[] childTypes = new Type[children.size()];
-                for (int i = 0; i < children.size(); i++) {
-                    if (children.get(i) instanceof LiteralExpr) {
-                        childTypes[i] = children.get(i).getType();
-                    } else if (children.get(i) instanceof SlotRef) {
-                        childTypes[i] = 
getChildTypeByName(children.get(i).getExprName(), stmt);
-                    } else {
-                        throw new AnalysisException(String.format(
-                                "partition expr %s has unrecognized parameter 
in slot %d", func.getExprName(), i));
-                    }
-                }
-                Function fn = null;
-                try {
-                    fn = 
func.getBuiltinFunction(func.getFnName().getFunction(), childTypes,
-                            Function.CompareMode.IS_INDISTINGUISHABLE); // 
only for test
-                } catch (Exception e) {
-                    throw new AnalysisException("partition expr " + 
func.getExprName() + " is illegal!");
-                }
-                if (fn == null) {
-                    throw new AnalysisException("partition expr " + 
func.getExprName() + " is illegal!");
-                }
-            } else if (expr instanceof SlotRef) {
-                if (partitionDesc.isAutoCreatePartitions() && 
partitionDesc.getType() == PartitionType.RANGE) {
-                    throw new AnalysisException("Auto Range Partition need 
FunctionCallExpr");
-                }
-            } else {
-                throw new AnalysisException("partition expr " + 
expr.getExprName() + " is illegal!");
-            }
-        }
-    }
-
     // Create olap table and related base index synchronously.
     private boolean createOlapTable(Database db, CreateTableInfo 
createTableInfo) throws UserException {
         String tableName = createTableInfo.getTableName();
@@ -3386,1073 +3245,83 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return tableHasExist;
     }
 
-    // Create olap table and related base index synchronously.
-    private boolean createOlapTable(Database db, CreateTableStmt stmt) throws 
UserException {
-        String tableName = stmt.getTableName();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("begin create olap table: {}", tableName);
-        }
-        String tableShowName = tableName;
-        if (stmt.isTemp()) {
-            tableName = Util.generateTempTableInnerName(tableName);
-        }
-
-        boolean tableHasExist = false;
-        BinlogConfig dbBinlogConfig;
-        db.readLock();
-        try {
-            dbBinlogConfig = new BinlogConfig(db.getBinlogConfig());
-        } finally {
-            db.readUnlock();
-        }
-        BinlogConfig createTableBinlogConfig = new 
BinlogConfig(dbBinlogConfig);
-        createTableBinlogConfig.mergeFromProperties(stmt.getProperties());
-        if (dbBinlogConfig.isEnable() && !createTableBinlogConfig.isEnable() 
&& !stmt.isTemp()) {
-            throw new DdlException("Cannot create table with binlog disabled 
when database binlog enable");
-        }
-        if (stmt.isTemp() && createTableBinlogConfig.isEnable()) {
-            throw new DdlException("Cannot create temporary table with binlog 
enable");
-        }
-        stmt.getProperties().putAll(createTableBinlogConfig.toProperties());
-
-        // get keys type
-        KeysDesc keysDesc = stmt.getKeysDesc();
-        Preconditions.checkNotNull(keysDesc);
-        KeysType keysType = keysDesc.getKeysType();
-        int keysColumnSize = keysDesc.keysColumnSize();
-        boolean isKeysRequired = !(keysType == KeysType.DUP_KEYS && 
keysColumnSize == 0);
+    private boolean createMysqlTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
+        String tableName = createTableInfo.getTableName();
+        List<Column> columns = createTableInfo.getColumns();
+        long tableId = Env.getCurrentEnv().getNextId();
+        MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, 
createTableInfo.getProperties());
+        mysqlTable.setComment(createTableInfo.getComment());
+        Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, 
false, createTableInfo.isIfNotExists());
+        return checkCreateTableResult(tableName, tableId, result);
+    }
 
-        // create columns
-        List<Column> baseSchema = stmt.getColumns();
-        validateColumns(baseSchema, isKeysRequired);
-        checkAutoIncColumns(baseSchema, keysType);
+    private boolean createOdbcTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
+        String tableName = createTableInfo.getTableName();
+        List<Column> columns = createTableInfo.getColumns();
 
-        // analyze replica allocation
-        ReplicaAllocation replicaAlloc = 
PropertyAnalyzer.analyzeReplicaAllocation(stmt.getProperties(), "");
-        if (replicaAlloc.isNotSet()) {
-            replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
-        }
+        long tableId = Env.getCurrentEnv().getNextId();
+        OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, 
createTableInfo.getProperties());
+        odbcTable.setComment(createTableInfo.getComment());
+        Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, 
false, createTableInfo.isIfNotExists());
+        return checkCreateTableResult(tableName, tableId, result);
+    }
 
-        long bufferSize = IdGeneratorUtil.getBufferSizeForCreateTable(stmt, 
replicaAlloc);
-        IdGeneratorBuffer idGeneratorBuffer = 
Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
+    private boolean createEsTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException, AnalysisException {
+        String tableName = createTableInfo.getTableName();
 
-        // create partition info
-        PartitionDesc partitionDesc = stmt.getPartitionDesc();
+        // validate props to get column from es.
+        EsTable esTable = new EsTable(tableName, 
createTableInfo.getProperties());
 
-        ConnectContext ctx = ConnectContext.get();
-        Env env = Env.getCurrentEnv();
+        // create columns
+        List<Column> baseSchema = createTableInfo.getColumns();
 
-        // check legality of partiton exprs.
-        if (ctx != null && env != null && partitionDesc != null && 
partitionDesc.getPartitionExprs() != null) {
-            checkLegalityofPartitionExprs(stmt, partitionDesc);
+        if (baseSchema.isEmpty()) {
+            baseSchema = esTable.genColumnsFromEs();
         }
+        validateColumns(baseSchema, true);
+        esTable.setNewFullSchema(baseSchema);
 
-        PartitionInfo partitionInfo = null;
+        // create partition info
+        PartitionDesc partitionDesc = createTableInfo.getPartitionDesc();
+        PartitionInfo partitionInfo;
         Map<String, Long> partitionNameToId = Maps.newHashMap();
         if (partitionDesc != null) {
-            for (SinglePartitionDesc desc : 
partitionDesc.getSinglePartitionDescs()) {
-                // check legality of nullity of partition items.
-                checkPartitionNullity(baseSchema, partitionDesc, desc);
-
-                long partitionId = idGeneratorBuffer.getNextId();
-                partitionNameToId.put(desc.getPartitionName(), partitionId);
-            }
             partitionInfo = partitionDesc.toPartitionInfo(baseSchema, 
partitionNameToId, false);
         } else {
-            if 
(DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties()))
 {
-                throw new DdlException("Only support dynamic partition 
properties on range partition table");
-            }
-            long partitionId = idGeneratorBuffer.getNextId();
+            long partitionId = Env.getCurrentEnv().getNextId();
             // use table name as single partition name
-            partitionNameToId.put(Util.getTempTableDisplayName(tableName), 
partitionId);
+            partitionNameToId.put(tableName, partitionId);
             partitionInfo = new SinglePartitionInfo();
         }
+        esTable.setPartitionInfo(partitionInfo);
 
-        // create distribution info
-        DistributionDesc distributionDesc = stmt.getDistributionDesc();
-        Preconditions.checkNotNull(distributionDesc);
-        DistributionInfo defaultDistributionInfo = 
distributionDesc.toDistributionInfo(baseSchema);
+        long tableId = Env.getCurrentEnv().getNextId();
+        esTable.setId(tableId);
+        esTable.setComment(createTableInfo.getComment());
+        esTable.syncTableMetaData();
+        Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, 
createTableInfo.isIfNotExists());
+        return checkCreateTableResult(tableName, tableId, result);
+    }
 
-        if (defaultDistributionInfo instanceof HashDistributionInfo
-                    && ((HashDistributionInfo) 
defaultDistributionInfo).getDistributionColumns()
-                        .stream().anyMatch(column -> 
column.getType().isVariantType())) {
-            throw new DdlException("Hash distribution info should not contain 
variant columns");
-        }
+    private boolean createBrokerTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
+        String tableName = createTableInfo.getTableName();
 
-        // calc short key column count
-        short shortKeyColumnCount = Env.calcShortKeyColumnCount(baseSchema, 
stmt.getProperties(), isKeysRequired);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("create table[{}] short key column count: {}", 
tableName, shortKeyColumnCount);
-        }
+        List<Column> columns = createTableInfo.getColumns();
 
-        // create table
-        long tableId = idGeneratorBuffer.getNextId();
-        TableType tableType = OlapTableFactory.getTableType(stmt);
-        OlapTable olapTable = (OlapTable) new OlapTableFactory()
-                .init(tableType, stmt.isTemp())
-                .withTableId(tableId)
-                .withTableName(tableName)
-                .withSchema(baseSchema)
-                .withKeysType(keysType)
-                .withPartitionInfo(partitionInfo)
-                .withDistributionInfo(defaultDistributionInfo)
-                .withExtraParams(stmt)
-                .build();
-        olapTable.setComment(stmt.getComment());
+        long tableId = Env.getCurrentEnv().getNextId();
+        BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, 
createTableInfo.getProperties());
+        brokerTable.setComment(createTableInfo.getComment());
+        brokerTable.setBrokerProperties(createTableInfo.getExtProperties());
+        Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, 
false, createTableInfo.isIfNotExists());
+        return checkCreateTableResult(tableName, tableId, result);
 
-        // set base index id
-        long baseIndexId = idGeneratorBuffer.getNextId();
-        olapTable.setBaseIndexId(baseIndexId);
+    }
 
-        // set base index info to table
-        // this should be done before create partition.
-        Map<String, String> properties = stmt.getProperties();
+    private boolean createJdbcTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
+        String tableName = createTableInfo.getTableName();
+        List<Column> columns = createTableInfo.getColumns();
 
-        if (stmt.isTemp()) {
-            properties.put("binlog.enable", "false");
-        }
-
-        short minLoadReplicaNum = -1;
-        try {
-            minLoadReplicaNum = 
PropertyAnalyzer.analyzeMinLoadReplicaNum(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        if (minLoadReplicaNum > replicaAlloc.getTotalReplicaNum()) {
-            throw new DdlException("Failed to check min load replica num [" + 
minLoadReplicaNum + "]  <= "
-                    + "default replica num [" + 
replicaAlloc.getTotalReplicaNum() + "]");
-        }
-        olapTable.setMinLoadReplicaNum(minLoadReplicaNum);
-
-        // get use light schema change
-        Boolean enableLightSchemaChange;
-        try {
-            enableLightSchemaChange = 
PropertyAnalyzer.analyzeUseLightSchemaChange(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        // use light schema change optimization
-        olapTable.setEnableLightSchemaChange(enableLightSchemaChange);
-
-        // check if light schema change is disabled, variant type rely on 
light schema change
-        if (!enableLightSchemaChange) {
-            for (Column column : baseSchema) {
-                if (column.getType().isVariantType()) {
-                    throw new DdlException("Variant type rely on light schema 
change, "
-                            + " please use light_schema_change = true.");
-                }
-            }
-        }
-
-        boolean disableAutoCompaction = false;
-        try {
-            disableAutoCompaction = 
PropertyAnalyzer.analyzeDisableAutoCompaction(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        // use light schema change optimization
-        olapTable.setDisableAutoCompaction(disableAutoCompaction);
-
-        // set compaction policy
-        String compactionPolicy = 
PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
-        try {
-            compactionPolicy = 
PropertyAnalyzer.analyzeCompactionPolicy(properties, olapTable.getKeysType());
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setCompactionPolicy(compactionPolicy);
-
-        if 
(!compactionPolicy.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)
-                && 
(properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES)
-                || 
properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)
-                || properties
-                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS)
-                || properties
-                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)
-                || properties
-                        
.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)))
 {
-            throw new DdlException("only time series compaction policy support 
for time series config");
-        }
-
-        // set time series compaction goal size
-        long timeSeriesCompactionGoalSizeMbytes
-                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
-        try {
-            timeSeriesCompactionGoalSizeMbytes = PropertyAnalyzer
-                                        
.analyzeTimeSeriesCompactionGoalSizeMbytes(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setTimeSeriesCompactionGoalSizeMbytes(timeSeriesCompactionGoalSizeMbytes);
-
-        // set time series compaction file count threshold
-        long timeSeriesCompactionFileCountThreshold
-                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
-        try {
-            timeSeriesCompactionFileCountThreshold = PropertyAnalyzer
-                                    
.analyzeTimeSeriesCompactionFileCountThreshold(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold);
-
-        // set time series compaction time threshold
-        long timeSeriesCompactionTimeThresholdSeconds
-                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
-        try {
-            timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer
-                                    
.analyzeTimeSeriesCompactionTimeThresholdSeconds(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds);
-
-        // set time series compaction empty rowsets threshold
-        long timeSeriesCompactionEmptyRowsetsThreshold
-                                    = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE;
-        try {
-            timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer
-                                    
.analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold);
-
-        // set time series compaction level threshold
-        long timeSeriesCompactionLevelThreshold
-                                     = 
PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE;
-        try {
-            timeSeriesCompactionLevelThreshold = PropertyAnalyzer
-                                    
.analyzeTimeSeriesCompactionLevelThreshold(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold);
-
-        boolean variantEnableFlattenNested  = false;
-        try {
-            variantEnableFlattenNested = 
PropertyAnalyzer.analyzeVariantFlattenNested(properties);
-            // only if session variable: enable_variant_flatten_nested = true 
and
-            // table property: variant_enable_flatten_nested = true
-            // we can enable variant flatten nested otherwise throw error
-            if (ctx != null && 
ctx.getSessionVariable().getEnableVariantFlattenNested()
-                    && variantEnableFlattenNested) {
-                
olapTable.setVariantEnableFlattenNested(variantEnableFlattenNested);
-            } else if (variantEnableFlattenNested) {
-                throw new DdlException("If you want to enable variant flatten 
nested, "
-                        + "please set session variable: 
enable_variant_flatten_nested = true");
-            } else {
-                // keep table property: variant_enable_flatten_nested = false
-                olapTable.setVariantEnableFlattenNested(false);
-            }
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // get storage format
-        TStorageFormat storageFormat = TStorageFormat.V2; // default is 
segment v2
-        try {
-            storageFormat = PropertyAnalyzer.analyzeStorageFormat(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setStorageFormat(storageFormat);
-
-        TInvertedIndexFileStorageFormat invertedIndexFileStorageFormat;
-        try {
-            invertedIndexFileStorageFormat = 
PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        
olapTable.setInvertedIndexFileStorageFormat(invertedIndexFileStorageFormat);
-
-        // get compression type
-        TCompressionType compressionType = TCompressionType.LZ4;
-        try {
-            compressionType = 
PropertyAnalyzer.analyzeCompressionType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setCompressionType(compressionType);
-
-        // get row_store_page_size
-        long rowStorePageSize = 
PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
-        try {
-            rowStorePageSize = 
PropertyAnalyzer.analyzeRowStorePageSize(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        olapTable.setRowStorePageSize(rowStorePageSize);
-
-        long storagePageSize = 
PropertyAnalyzer.STORAGE_PAGE_SIZE_DEFAULT_VALUE;
-        try {
-            storagePageSize = 
PropertyAnalyzer.analyzeStoragePageSize(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setStoragePageSize(storagePageSize);
-
-        long storageDictPageSize = 
PropertyAnalyzer.STORAGE_DICT_PAGE_SIZE_DEFAULT_VALUE;
-        try {
-            storageDictPageSize = 
PropertyAnalyzer.analyzeStorageDictPageSize(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        olapTable.setStorageDictPageSize(storageDictPageSize);
-
-        // check data sort properties
-        int keyColumnSize = 
CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnNames()) ? 
keysDesc.keysColumnSize() :
-                keysDesc.getClusterKeysColumnNames().size();
-        DataSortInfo dataSortInfo = 
PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
-                keyColumnSize, storageFormat);
-        olapTable.setDataSortInfo(dataSortInfo);
-
-        boolean enableUniqueKeyMergeOnWrite = false;
-        if (keysType == KeysType.UNIQUE_KEYS) {
-            try {
-                enableUniqueKeyMergeOnWrite = 
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(properties);
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-            if (enableUniqueKeyMergeOnWrite && !enableLightSchemaChange && 
!CollectionUtils.isEmpty(
-                    keysDesc.getClusterKeysColumnNames())) {
-                throw new DdlException(
-                        "Unique merge-on-write tables with cluster keys 
require light schema change to be enabled.");
-            }
-        }
-        olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
-
-        if (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite) {
-            try {
-                // don't store this property, check and remove it from 
`properties`
-                PropertyAnalyzer.analyzeUniqueKeySkipBitmapColumn(properties);
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-        }
-
-        boolean enableDeleteOnDeletePredicate = false;
-        try {
-            enableDeleteOnDeletePredicate = 
PropertyAnalyzer.analyzeEnableDeleteOnDeletePredicate(properties,
-                    enableUniqueKeyMergeOnWrite);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        if (enableDeleteOnDeletePredicate && !enableUniqueKeyMergeOnWrite) {
-            throw new 
DdlException(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE
-                    + " property is only supported for unique merge-on-write 
table");
-        }
-        olapTable.setEnableMowLightDelete(enableDeleteOnDeletePredicate);
-
-        boolean enableSingleReplicaCompaction = false;
-        try {
-            enableSingleReplicaCompaction = 
PropertyAnalyzer.analyzeEnableSingleReplicaCompaction(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        if (enableUniqueKeyMergeOnWrite && enableSingleReplicaCompaction) {
-            throw new 
DdlException(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION
-                    + " property is not supported for merge-on-write table");
-        }
-        
olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
-
-        if (Config.isCloudMode() && ((CloudEnv) env).getEnableStorageVault()) {
-            // <storageVaultName, storageVaultId>
-            Pair<String, String> storageVaultInfoPair = 
PropertyAnalyzer.analyzeStorageVault(properties, db);
-
-            // Check if user has storage vault usage privilege
-            if (ConnectContext.get() != null && !env.getAccessManager()
-                    .checkStorageVaultPriv(ctx.getCurrentUserIdentity(),
-                            storageVaultInfoPair.first, PrivPredicate.USAGE)) {
-                throw new DdlException("USAGE denied to user '" + 
ConnectContext.get().getQualifiedUser()
-                        + "'@'" + ConnectContext.get().getRemoteIP()
-                        + "' for storage vault '" + storageVaultInfoPair.first 
+ "'");
-            }
-            
Preconditions.checkArgument(StringUtils.isNumeric(storageVaultInfoPair.second),
-                    "Invalid storage vault id :%s", 
storageVaultInfoPair.second);
-            olapTable.setStorageVaultId(storageVaultInfoPair.second);
-        }
-
-        // check `update on current_timestamp`
-        if (!enableUniqueKeyMergeOnWrite) {
-            for (Column column : baseSchema) {
-                if (column.hasOnUpdateDefaultValue()) {
-                    throw new DdlException("'ON UPDATE CURRENT_TIMESTAMP' is 
only supportted"
-                            + " in unique table with merge-on-write enabled.");
-                }
-            }
-        }
-
-        // analyze bloom filter columns
-        Set<String> bfColumns = null;
-        double bfFpp = 0;
-        try {
-            bfColumns = PropertyAnalyzer.analyzeBloomFilterColumns(properties, 
baseSchema, keysType);
-            if (bfColumns != null && bfColumns.isEmpty()) {
-                bfColumns = null;
-            }
-
-            bfFpp = PropertyAnalyzer.analyzeBloomFilterFpp(properties);
-            if (bfColumns != null && bfFpp == 0) {
-                bfFpp = FeConstants.default_bloom_filter_fpp;
-            } else if (bfColumns == null) {
-                bfFpp = 0;
-            }
-
-            olapTable.setBloomFilterInfo(bfColumns, bfFpp);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        Index.checkConflict(stmt.getIndexes(), bfColumns);
-
-        olapTable.setReplicationAllocation(replicaAlloc);
-
-        // set auto bucket
-        boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, 
PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
-                false);
-        olapTable.setIsAutoBucket(isAutoBucket);
-
-        // set estimate partition size
-        if (isAutoBucket) {
-            String estimatePartitionSize = 
PropertyAnalyzer.analyzeEstimatePartitionSize(properties);
-            olapTable.setEstimatePartitionSize(estimatePartitionSize);
-        }
-
-        // set in memory
-        boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, 
PropertyAnalyzer.PROPERTIES_INMEMORY,
-                false);
-        if (isInMemory) {
-            throw new AnalysisException("Not support set 'in_memory'='true' 
now!");
-        }
-        olapTable.setIsInMemory(false);
-
-        boolean isBeingSynced = 
PropertyAnalyzer.analyzeIsBeingSynced(properties, false);
-        olapTable.setIsBeingSynced(isBeingSynced);
-        if (isBeingSynced) {
-            // erase colocate table, storage policy
-            olapTable.ignoreInvalidPropertiesWhenSynced(properties);
-            // remark auto bucket
-            if (isAutoBucket) {
-                olapTable.markAutoBucket();
-            }
-        }
-
-        // analyze row store columns
-        try {
-            boolean storeRowColumn = false;
-            storeRowColumn = 
PropertyAnalyzer.analyzeStoreRowColumn(properties);
-            if (storeRowColumn && !enableLightSchemaChange) {
-                throw new DdlException(
-                        "Row store column rely on light schema change, enable 
light schema change first");
-            }
-            olapTable.setStoreRowColumn(storeRowColumn);
-            List<String> rowStoreColumns;
-            try {
-                rowStoreColumns = 
PropertyAnalyzer.analyzeRowStoreColumns(properties,
-                            
baseSchema.stream().map(Column::getName).collect(Collectors.toList()));
-                if (rowStoreColumns != null && rowStoreColumns.isEmpty()) {
-                    rowStoreColumns = null;
-                }
-                olapTable.setRowStoreColumns(rowStoreColumns);
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // set skip inverted index on load
-        boolean skipWriteIndexOnLoad = 
PropertyAnalyzer.analyzeSkipWriteIndexOnLoad(properties);
-        olapTable.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
-
-        boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, 
PropertyAnalyzer.PROPERTIES_MUTABLE, true);
-
-        Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties);
-        olapTable.setTTLSeconds(ttlSeconds);
-
-        // set storage policy
-        String storagePolicy = 
PropertyAnalyzer.analyzeStoragePolicy(properties);
-        
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
-        if (olapTable.getEnableUniqueKeyMergeOnWrite()
-                && !Strings.isNullOrEmpty(storagePolicy)) {
-            throw new AnalysisException(
-                    "Can not create UNIQUE KEY table that enables 
Merge-On-write"
-                     + " with storage policy(" + storagePolicy + ")");
-        }
-        // Consider one situation: if the table has no storage policy but some 
partitions
-        // have their own storage policy then it might be erased by the 
following function.
-        // So we only set the storage policy if the table's policy is not null 
or empty
-        if (!Strings.isNullOrEmpty(storagePolicy)) {
-            olapTable.setStoragePolicy(storagePolicy);
-        }
-
-        TTabletType tabletType;
-        try {
-            tabletType = PropertyAnalyzer.analyzeTabletType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // set binlog config
-        try {
-            Map<String, String> binlogConfigMap = 
PropertyAnalyzer.analyzeBinlogConfig(properties);
-            if (binlogConfigMap != null) {
-                BinlogConfig binlogConfig = new BinlogConfig();
-                binlogConfig.mergeFromProperties(binlogConfigMap);
-                olapTable.setBinlogConfig(binlogConfig);
-            }
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        BinlogConfig binlogConfigForTask = new 
BinlogConfig(olapTable.getBinlogConfig());
-
-        if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
-            // if this is an unpartitioned table, we should analyze data 
property and replication num here.
-            // if this is a partitioned table, there properties are already 
analyzed
-            // in RangePartitionDesc analyze phase.
-
-            // use table name as this single partition name
-            long partitionId = -1;
-            partitionId = 
partitionNameToId.get(Util.getTempTableDisplayName(tableName));
-            DataProperty dataProperty = null;
-            try {
-                dataProperty = 
PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
-                        new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
-                olapTable.setStorageMedium(dataProperty.getStorageMedium());
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-            Preconditions.checkNotNull(dataProperty);
-            partitionInfo.setDataProperty(partitionId, dataProperty);
-            partitionInfo.setReplicaAllocation(partitionId, replicaAlloc);
-            partitionInfo.setIsInMemory(partitionId, isInMemory);
-            partitionInfo.setTabletType(partitionId, tabletType);
-            partitionInfo.setIsMutable(partitionId, isMutable);
-            if (isBeingSynced) {
-                partitionInfo.refreshTableStoragePolicy("");
-            }
-        }
-        // check colocation properties
-        try {
-            String colocateGroup = 
PropertyAnalyzer.analyzeColocate(properties);
-            if (colocateGroup != null) {
-                if (defaultDistributionInfo.getType() == 
DistributionInfoType.RANDOM) {
-                    throw new AnalysisException("Random distribution for 
colocate table is unsupported");
-                }
-                if (isAutoBucket) {
-                    throw new AnalysisException("Auto buckets for colocate 
table is unsupported");
-                }
-                String fullGroupName = GroupId.getFullGroupName(db.getId(), 
colocateGroup);
-                ColocateGroupSchema groupSchema = 
Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
-                if (groupSchema != null) {
-                    // group already exist, check if this table can be added 
to this group
-                    groupSchema.checkColocateSchema(olapTable);
-                    groupSchema.checkDynamicPartition(properties, 
olapTable.getDefaultDistributionInfo());
-                }
-                // add table to this group, if group does not exist, create a 
new one
-                Env.getCurrentColocateIndex()
-                        .addTableToGroup(db.getId(), olapTable, fullGroupName, 
null /* generate group id inside */);
-                olapTable.setColocateGroup(colocateGroup);
-            }
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // get base index storage type. default is COLUMN
-        TStorageType baseIndexStorageType = null;
-        try {
-            baseIndexStorageType = 
PropertyAnalyzer.analyzeStorageType(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        Preconditions.checkNotNull(baseIndexStorageType);
-        // set base index meta
-        int schemaVersion = 0;
-        try {
-            schemaVersion = PropertyAnalyzer.analyzeSchemaVersion(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        int schemaHash = Util.generateSchemaHash();
-        olapTable.setIndexMeta(baseIndexId, tableName, baseSchema, 
schemaVersion, schemaHash, shortKeyColumnCount,
-                baseIndexStorageType, keysType, olapTable.getIndexes());
-
-        for (AlterClause alterClause : stmt.getRollupAlterClauseList()) {
-            if (olapTable.isDuplicateWithoutKey()) {
-                throw new DdlException("Duplicate table without keys do not 
support add rollup!");
-            }
-            AddRollupClause addRollupClause = (AddRollupClause) alterClause;
-
-            Long baseRollupIndex = olapTable.getIndexIdByName(tableName);
-
-            // get storage type for rollup index
-            TStorageType rollupIndexStorageType = null;
-            try {
-                rollupIndexStorageType = 
PropertyAnalyzer.analyzeStorageType(addRollupClause.getProperties());
-            } catch (AnalysisException e) {
-                throw new DdlException(e.getMessage());
-            }
-            Preconditions.checkNotNull(rollupIndexStorageType);
-            // set rollup index meta to olap table
-            List<Column> rollupColumns = 
Env.getCurrentEnv().getMaterializedViewHandler()
-                    .checkAndPrepareMaterializedView(addRollupClause, 
olapTable, baseRollupIndex, false);
-            short rollupShortKeyColumnCount = 
Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties(),
-                    true/*isKeysRequired*/);
-            int rollupSchemaHash = Util.generateSchemaHash();
-            long rollupIndexId = idGeneratorBuffer.getNextId();
-            olapTable.setIndexMeta(rollupIndexId, 
addRollupClause.getRollupName(), rollupColumns, schemaVersion,
-                    rollupSchemaHash, rollupShortKeyColumnCount, 
rollupIndexStorageType, keysType);
-        }
-
-        // analyse sequence map column
-        String sequenceMapCol = null;
-        try {
-            sequenceMapCol = 
PropertyAnalyzer.analyzeSequenceMapCol(properties, olapTable.getKeysType());
-            if (sequenceMapCol != null) {
-                Column col = olapTable.getColumn(sequenceMapCol);
-                if (col == null) {
-                    throw new DdlException("The specified sequence column[" + 
sequenceMapCol + "] not exists");
-                }
-                if (!col.getType().isFixedPointType() && 
!col.getType().isDateType()) {
-                    throw new DdlException("Sequence type only support integer 
types and date types");
-                }
-                olapTable.setSequenceMapCol(col.getName());
-                olapTable.setSequenceInfo(col.getType(), col);
-            }
-        } catch (Exception e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        // analyse sequence type
-        Type sequenceColType = null;
-        try {
-            sequenceColType = PropertyAnalyzer.analyzeSequenceType(properties, 
olapTable.getKeysType());
-            if (sequenceMapCol != null && sequenceColType != null) {
-                throw new DdlException("The sequence_col and sequence_type 
cannot be set at the same time");
-            }
-            if (sequenceColType != null) {
-                olapTable.setSequenceInfo(sequenceColType, null);
-            }
-        } catch (Exception e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        try {
-            int groupCommitIntervalMs = 
PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties);
-            olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs);
-        } catch (Exception e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        try {
-            int groupCommitDataBytes = 
PropertyAnalyzer.analyzeGroupCommitDataBytes(properties);
-            olapTable.setGroupCommitDataBytes(groupCommitDataBytes);
-        } catch (Exception e) {
-            throw new DdlException(e.getMessage());
-        }
-
-        try {
-            TEncryptionAlgorithm tdeAlgorithm = 
PropertyAnalyzer.analyzeTDEAlgorithm(properties);
-            if (tdeAlgorithm != TEncryptionAlgorithm.PLAINTEXT) {
-                List<EncryptionKey> masterKeys = 
Env.getCurrentEnv().getKeyManager().getAllMasterKeys();
-                if (masterKeys == null || masterKeys.isEmpty()) {
-                    throw new DdlException("The TDE master key does not exist, 
so encrypted table cannot be created. "
-                        + "Please check whether the root key is correctly 
set");
-                }
-
-                for (EncryptionKey masterKey : masterKeys) {
-                    if (masterKey.algorithm.toThrift() == tdeAlgorithm && 
!masterKey.isDecrypted()) {
-                        throw new DdlException("The master key has not been 
decrypted. Please check whether"
-                            + " the root key is functioning properly or 
configured correctly.");
-                    }
-                }
-            }
-            olapTable.setEncryptionAlgorithm(tdeAlgorithm);
-        } catch (Exception e) {
-            throw new DdlException("Failed to set TDE algorithm: " + 
e.getMessage(), e);
-        }
-
-        olapTable.initSchemaColumnUniqueId();
-        olapTable.initAutoIncrementGenerator(db.getId());
-        olapTable.rebuildFullSchema();
-
-        // analyze version info
-        Long versionInfo = null;
-        try {
-            versionInfo = PropertyAnalyzer.analyzeVersionInfo(properties);
-        } catch (AnalysisException e) {
-            throw new DdlException(e.getMessage());
-        }
-        Preconditions.checkNotNull(versionInfo);
-
-        // a set to record every new tablet created when create table
-        // if failed in any step, use this set to do clear things
-        Set<Long> tabletIdSet = new HashSet<>();
-        // create partition
-        boolean hadLogEditCreateTable = false;
-        try {
-            if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
-                if (properties != null && !properties.isEmpty()) {
-                    // here, all properties should be checked
-                    throw new DdlException("Unknown properties: " + 
properties);
-                }
-                // this is a 1-level partitioned table
-                // use table name as partition name
-                DistributionInfo partitionDistributionInfo = 
distributionDesc.toDistributionInfo(baseSchema);
-                String partitionName = tableName;
-                if (stmt.isTemp()) {
-                    partitionName = Util.getTempTableDisplayName(tableName);
-                }
-                long partitionId = partitionNameToId.get(partitionName);
-
-                // check replica quota if this operation done
-                long indexNum = olapTable.getIndexIdToMeta().size();
-                long bucketNum = partitionDistributionInfo.getBucketNum();
-                long replicaNum = 
partitionInfo.getReplicaAllocation(partitionId).getTotalReplicaNum();
-                long totalReplicaNum = indexNum * bucketNum * replicaNum;
-                if (Config.isNotCloudMode() && totalReplicaNum >= 
db.getReplicaQuotaLeftWithLock()) {
-                    throw new DdlException(
-                            "Database " + db.getFullName() + " create 
unpartitioned table " + tableShowName
-                                    + " increasing " + totalReplicaNum + " of 
replica exceeds quota["
-                                    + db.getReplicaQuota() + "]");
-                }
-                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList(), true);
-                Partition partition = createPartitionWithIndices(db.getId(), 
olapTable,
-                        partitionId, partitionName,
-                        olapTable.getIndexIdToMeta(), 
partitionDistributionInfo,
-                        partitionInfo.getDataProperty(partitionId),
-                        partitionInfo.getReplicaAllocation(partitionId), 
versionInfo, bfColumns, tabletIdSet,
-                        isInMemory, tabletType,
-                        storagePolicy,
-                        idGeneratorBuffer,
-                        binlogConfigForTask,
-                        
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
-                afterCreatePartitions(db.getId(), olapTable.getId(), null,
-                        olapTable.getIndexIdList(), true);
-                olapTable.addPartition(partition);
-            } else if (partitionInfo.getType() == PartitionType.RANGE
-                    || partitionInfo.getType() == PartitionType.LIST) {
-                try {
-                    DataProperty dataProperty = 
PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(),
-                            new 
DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
-                    Map<String, String> propertiesCheck = new 
HashMap<>(properties);
-                    propertiesCheck.entrySet().removeIf(entry -> 
entry.getKey().contains("dynamic_partition"));
-                    if (propertiesCheck != null && !propertiesCheck.isEmpty()) 
{
-                        // here, all properties should be checked
-                        throw new DdlException("Unknown properties: " + 
propertiesCheck);
-                    }
-                    // just for remove entries in stmt.getProperties(),
-                    // and then check if there still has unknown properties
-                    
olapTable.setStorageMedium(dataProperty.getStorageMedium());
-                    if (partitionInfo.getType() == PartitionType.RANGE) {
-                        
DynamicPartitionUtil.checkDynamicPartitionPropertyKeysValid(properties);
-                        
DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(olapTable, properties, 
db);
-                    } else if (partitionInfo.getType() == PartitionType.LIST) {
-                        if 
(DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
-                            throw new DdlException(
-                                    "Only support dynamic partition properties 
on range partition table");
-                        }
-                    }
-                    // check the interval same between dynamic & auto range 
partition
-                    DynamicPartitionProperty dynamicProperty = 
olapTable.getTableProperty()
-                            .getDynamicPartitionProperty();
-                    if (dynamicProperty.isExist() && 
dynamicProperty.getEnable()
-                            && partitionDesc.isAutoCreatePartitions()) {
-                        String dynamicUnit = dynamicProperty.getTimeUnit();
-                        ArrayList<Expr> autoExprs = 
partitionDesc.getPartitionExprs();
-                        // check same interval. fail will leading to 
AnalysisException
-                        
DynamicPartitionUtil.partitionIntervalCompatible(dynamicUnit, autoExprs);
-                    }
-                } catch (AnalysisException e) {
-                    throw new DdlException(e.getMessage());
-                }
-
-                // check replica quota if this operation done
-                long totalReplicaNum = 0;
-                for (Map.Entry<String, Long> entry : 
partitionNameToId.entrySet()) {
-                    long indexNum = olapTable.getIndexIdToMeta().size();
-                    long bucketNum = defaultDistributionInfo.getBucketNum();
-                    long replicaNum = 
partitionInfo.getReplicaAllocation(entry.getValue()).getTotalReplicaNum();
-                    totalReplicaNum += indexNum * bucketNum * replicaNum;
-                }
-                if (Config.isNotCloudMode() && totalReplicaNum >= 
db.getReplicaQuotaLeftWithLock()) {
-                    throw new DdlException(
-                            "Database " + db.getFullName() + " create table " 
+ tableShowName + " increasing "
-                                    + totalReplicaNum + " of replica exceeds 
quota[" + db.getReplicaQuota() + "]");
-                }
-
-                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList(), true);
-
-                // this is a 2-level partitioned tables
-                for (Map.Entry<String, Long> entry : 
partitionNameToId.entrySet()) {
-                    DataProperty dataProperty = 
partitionInfo.getDataProperty(entry.getValue());
-                    DistributionInfo partitionDistributionInfo = 
distributionDesc.toDistributionInfo(baseSchema);
-                    // use partition storage policy if it exist.
-                    String partionStoragePolicy = 
partitionInfo.getStoragePolicy(entry.getValue());
-                    if (olapTable.getEnableUniqueKeyMergeOnWrite()
-                            && !Strings.isNullOrEmpty(partionStoragePolicy)) {
-                        throw new AnalysisException(
-                                "Can not create UNIQUE KEY table that enables 
Merge-On-write"
-                                        + " with storage policy(" + 
partionStoragePolicy + ")");
-                    }
-                    // The table's storage policy has higher priority than 
partition's policy,
-                    // so we'll directly use table's policy when it's set. 
Otherwise we use the
-                    // partition's policy
-                    if (!storagePolicy.isEmpty()) {
-                        partionStoragePolicy = storagePolicy;
-                    }
-                    
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy);
-                    Partition partition = 
createPartitionWithIndices(db.getId(),
-                            olapTable, entry.getValue(),
-                            entry.getKey(), olapTable.getIndexIdToMeta(), 
partitionDistributionInfo,
-                            dataProperty, 
partitionInfo.getReplicaAllocation(entry.getValue()),
-                            versionInfo, bfColumns, tabletIdSet, isInMemory,
-                            partitionInfo.getTabletType(entry.getValue()),
-                            partionStoragePolicy, idGeneratorBuffer,
-                            binlogConfigForTask,
-                            dataProperty.isStorageMediumSpecified());
-                    olapTable.addPartition(partition);
-                    
olapTable.getPartitionInfo().getDataProperty(partition.getId())
-                            .setStoragePolicy(partionStoragePolicy);
-                }
-                afterCreatePartitions(db.getId(), olapTable.getId(), null,
-                        olapTable.getIndexIdList(), true);
-            } else {
-                throw new DdlException("Unsupported partition method: " + 
partitionInfo.getType().name());
-            }
-            Pair<Boolean, Boolean> result;
-            db.writeLockOrDdlException();
-            try {
-                // db name not changed
-                if 
(!db.getName().equals(ClusterNamespace.getNameFromFullName(stmt.getDbName()))) {
-                    throw new DdlException("Database name renamed, please 
check the database name");
-                }
-                // register table, write create table edit log
-                result = db.createTableWithoutLock(olapTable, false, 
stmt.isSetIfNotExists());
-            } finally {
-                db.writeUnlock();
-            }
-            if (!result.first) {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableShowName);
-            }
-
-            if (result.second) {
-                if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                    // if this is a colocate table, its table id is already 
added to colocate group
-                    // so we should remove the tableId here
-                    Env.getCurrentColocateIndex().removeTable(tableId);
-                }
-                for (Long tabletId : tabletIdSet) {
-                    Env.getCurrentInvertedIndex().deleteTablet(tabletId);
-                }
-                LOG.info("duplicate create table[{};{}], skip next steps", 
tableName, tableId);
-            } else {
-                // if table not exists, then db.createTableWithLock will write 
an editlog.
-                hadLogEditCreateTable = true;
-
-                // we have added these index to memory, only need to persist 
here
-                if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                    GroupId groupId = 
Env.getCurrentColocateIndex().getGroup(tableId);
-                    Map<Tag, List<List<Long>>> backendsPerBucketSeq = 
Env.getCurrentColocateIndex()
-                            .getBackendsPerBucketSeq(groupId);
-                    ColocatePersistInfo info = 
ColocatePersistInfo.createForAddTable(groupId, tableId,
-                            backendsPerBucketSeq);
-                    Env.getCurrentEnv().getEditLog().logColocateAddTable(info);
-                }
-                LOG.info("successfully create table[{};{}]", tableName, 
tableId);
-                Env.getCurrentEnv().getDynamicPartitionScheduler()
-                        .executeDynamicPartitionFirstTime(db.getId(), 
olapTable.getId());
-                // register or remove table from DynamicPartition after table 
created
-                
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), 
olapTable, false);
-                Env.getCurrentEnv().getDynamicPartitionScheduler()
-                        .createOrUpdateRuntimeInfo(tableId, 
DynamicPartitionScheduler.LAST_UPDATE_TIME,
-                                TimeUtils.getCurrentFormatTime());
-            }
-
-            if (DebugPointUtil.isEnable("FE.createOlapTable.exception")) {
-                LOG.info("debug point FE.createOlapTable.exception, throw e");
-                throw new DdlException("debug point 
FE.createOlapTable.exception");
-            }
-        } catch (DdlException e) {
-            LOG.warn("create table failed {} - {}", tabletIdSet, 
e.getMessage());
-            for (Long tabletId : tabletIdSet) {
-                Env.getCurrentInvertedIndex().deleteTablet(tabletId);
-            }
-            // edit log write DropTableInfo will result in deleting colocate 
group,
-            // but follow fe may need wait 30s (recycle bin mgr run every 30s).
-            if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
-                Env.getCurrentColocateIndex().removeTable(tableId);
-            }
-            try {
-                dropTable(db, tableId, true, false, 0L);
-                if (hadLogEditCreateTable) {
-                    DropInfo info = new DropInfo(db.getId(), tableId, 
olapTable.getName(), false, true, 0L);
-                    Env.getCurrentEnv().getEditLog().logDropTable(info);
-                }
-            } catch (Exception ex) {
-                LOG.warn("drop table", ex);
-            }
-
-            throw e;
-        }
-        if (olapTable instanceof MTMV) {
-            Env.getCurrentEnv().getMtmvService().postCreateMTMV((MTMV) 
olapTable);
-        }
-        return tableHasExist;
-    }
-
-    private boolean createMysqlTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
-        String tableName = createTableInfo.getTableName();
-        List<Column> columns = createTableInfo.getColumns();
-        long tableId = Env.getCurrentEnv().getNextId();
-        MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, 
createTableInfo.getProperties());
-        mysqlTable.setComment(createTableInfo.getComment());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, 
false, createTableInfo.isIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createMysqlTable(Database db, CreateTableStmt stmt) throws 
DdlException {
-        String tableName = stmt.getTableName();
-
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, 
stmt.getProperties());
-        mysqlTable.setComment(stmt.getComment());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, 
false, stmt.isSetIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createOdbcTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
-        String tableName = createTableInfo.getTableName();
-        List<Column> columns = createTableInfo.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, 
createTableInfo.getProperties());
-        odbcTable.setComment(createTableInfo.getComment());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, 
false, createTableInfo.isIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createOdbcTable(Database db, CreateTableStmt stmt) throws 
DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, 
stmt.getProperties());
-        odbcTable.setComment(stmt.getComment());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, 
false, stmt.isSetIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createEsTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException, AnalysisException {
-        String tableName = createTableInfo.getTableName();
-
-        // validate props to get column from es.
-        EsTable esTable = new EsTable(tableName, 
createTableInfo.getProperties());
-
-        // create columns
-        List<Column> baseSchema = createTableInfo.getColumns();
-
-        if (baseSchema.isEmpty()) {
-            baseSchema = esTable.genColumnsFromEs();
-        }
-        validateColumns(baseSchema, true);
-        esTable.setNewFullSchema(baseSchema);
-
-        // create partition info
-        PartitionDesc partitionDesc = createTableInfo.getPartitionDesc();
-        PartitionInfo partitionInfo;
-        Map<String, Long> partitionNameToId = Maps.newHashMap();
-        if (partitionDesc != null) {
-            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, 
partitionNameToId, false);
-        } else {
-            long partitionId = Env.getCurrentEnv().getNextId();
-            // use table name as single partition name
-            partitionNameToId.put(tableName, partitionId);
-            partitionInfo = new SinglePartitionInfo();
-        }
-        esTable.setPartitionInfo(partitionInfo);
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        esTable.setId(tableId);
-        esTable.setComment(createTableInfo.getComment());
-        esTable.syncTableMetaData();
-        Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, 
createTableInfo.isIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createEsTable(Database db, CreateTableStmt stmt) throws 
DdlException, AnalysisException {
-        String tableName = stmt.getTableName();
-
-        // validate props to get column from es.
-        EsTable esTable = new EsTable(tableName, stmt.getProperties());
-
-        // create columns
-        List<Column> baseSchema = stmt.getColumns();
-
-        if (baseSchema.isEmpty()) {
-            baseSchema = esTable.genColumnsFromEs();
-        }
-        validateColumns(baseSchema, true);
-        esTable.setNewFullSchema(baseSchema);
-
-        // create partition info
-        PartitionDesc partitionDesc = stmt.getPartitionDesc();
-        PartitionInfo partitionInfo;
-        Map<String, Long> partitionNameToId = Maps.newHashMap();
-        if (partitionDesc != null) {
-            partitionInfo = partitionDesc.toPartitionInfo(baseSchema, 
partitionNameToId, false);
-        } else {
-            long partitionId = Env.getCurrentEnv().getNextId();
-            // use table name as single partition name
-            partitionNameToId.put(tableName, partitionId);
-            partitionInfo = new SinglePartitionInfo();
-        }
-        esTable.setPartitionInfo(partitionInfo);
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        esTable.setId(tableId);
-        esTable.setComment(stmt.getComment());
-        esTable.syncTableMetaData();
-        Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, 
stmt.isSetIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createBrokerTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
-        String tableName = createTableInfo.getTableName();
-
-        List<Column> columns = createTableInfo.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, 
createTableInfo.getProperties());
-        brokerTable.setComment(createTableInfo.getComment());
-        brokerTable.setBrokerProperties(createTableInfo.getExtProperties());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, 
false, createTableInfo.isIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-
-    }
-
-    private boolean createBrokerTable(Database db, CreateTableStmt stmt) 
throws DdlException {
-        String tableName = stmt.getTableName();
-
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-        BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, 
stmt.getProperties());
-        brokerTable.setComment(stmt.getComment());
-        brokerTable.setBrokerProperties(stmt.getExtProperties());
-        Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, 
false, stmt.isSetIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
-    private boolean createJdbcTable(Database db, CreateTableInfo 
createTableInfo) throws DdlException {
-        String tableName = createTableInfo.getTableName();
-        List<Column> columns = createTableInfo.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
+        long tableId = Env.getCurrentEnv().getNextId();
 
         JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, 
createTableInfo.getProperties());
         jdbcTable.setComment(createTableInfo.getComment());
@@ -4461,19 +3330,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return checkCreateTableResult(tableName, tableId, result);
     }
 
-    private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws 
DdlException {
-        String tableName = stmt.getTableName();
-        List<Column> columns = stmt.getColumns();
-
-        long tableId = Env.getCurrentEnv().getNextId();
-
-        JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, 
stmt.getProperties());
-        jdbcTable.setComment(stmt.getComment());
-        // check table if exists
-        Pair<Boolean, Boolean> result = db.createTableWithLock(jdbcTable, 
false, stmt.isSetIfNotExists());
-        return checkCreateTableResult(tableName, tableId, result);
-    }
-
     private boolean checkCreateTableResult(String tableName, long tableId, 
Pair<Boolean, Boolean> result)
                 throws DdlException {
         if (Boolean.FALSE.equals(result.first)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 654e4deac0d..b1ede3926be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.datasource.hive;
 
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.DistributionDesc;
 import org.apache.doris.analysis.HashDistributionDesc;
 import org.apache.doris.analysis.PartitionDesc;
@@ -286,122 +285,6 @@ public class HiveMetadataOps implements 
ExternalMetadataOps {
         }
     }
 
-    @Override
-    public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
-        String dbName = stmt.getDbName();
-        String tblName = stmt.getTableName();
-        ExternalDatabase<?> db = catalog.getDbNullable(dbName);
-        if (db == null) {
-            throw new UserException("Failed to get database: '" + dbName + "' 
in catalog: " + catalog.getName());
-        }
-        if (tableExist(db.getRemoteName(), tblName)) {
-            if (stmt.isSetIfNotExists()) {
-                LOG.info("create table[{}] which already exists", tblName);
-                return true;
-            } else {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName);
-            }
-        }
-        try {
-            Map<String, String> props = stmt.getProperties();
-            // set default owner
-            if (!props.containsKey("owner")) {
-                if (ConnectContext.get() != null) {
-                    props.put("owner", 
ConnectContext.get().getCurrentUserIdentity().getUser());
-                }
-            }
-
-            if (props.containsKey("transactional") && 
props.get("transactional").equalsIgnoreCase("true")) {
-                throw new UserException("Not support create hive transactional 
table.");
-                /*
-                    CREATE TABLE trans6(
-                      `col1` int,
-                      `col2` int
-                    )  ENGINE=hive
-                    PROPERTIES (
-                      'file_format'='orc',
-                      'compression'='zlib',
-                      'bucketing_version'='2',
-                      'transactional'='true',
-                      'transactional_properties'='default'
-                    );
-                    In hive, this table only can insert not update(not report 
error,but not actually updated).
-                 */
-            }
-
-            String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, 
Config.hive_default_file_format);
-            Map<String, String> ddlProps = new HashMap<>();
-            for (Map.Entry<String, String> entry : props.entrySet()) {
-                String key = entry.getKey().toLowerCase();
-                if (DORIS_HIVE_KEYS.contains(entry.getKey().toLowerCase())) {
-                    ddlProps.put("doris." + key, entry.getValue());
-                } else {
-                    ddlProps.put(key, entry.getValue());
-                }
-            }
-            List<String> partitionColNames = new ArrayList<>();
-            if (stmt.getPartitionDesc() != null) {
-                PartitionDesc partitionDesc = stmt.getPartitionDesc();
-                if (partitionDesc.getType() == PartitionType.RANGE) {
-                    throw new UserException("Only support 'LIST' partition 
type in hive catalog.");
-                }
-                partitionColNames.addAll(partitionDesc.getPartitionColNames());
-                if (!partitionDesc.getSinglePartitionDescs().isEmpty()) {
-                    throw new UserException("Partition values expressions is 
not supported in hive catalog.");
-                }
-
-            }
-            Map<String, String> properties = catalog.getProperties();
-            if (properties.containsKey(HMSBaseProperties.HIVE_METASTORE_TYPE)
-                    && 
properties.get(HMSBaseProperties.HIVE_METASTORE_TYPE).equals(HMSBaseProperties.DLF_TYPE))
 {
-                for (Column column : stmt.getColumns()) {
-                    if (column.hasDefaultValue()) {
-                        throw new UserException("Default values are not 
supported with `DLF` catalog.");
-                    }
-                }
-            }
-            String comment = stmt.getComment();
-            Optional<String> location = 
Optional.ofNullable(props.getOrDefault(LOCATION_URI_KEY, null));
-            HiveTableMetadata hiveTableMeta;
-            DistributionDesc bucketInfo = stmt.getDistributionDesc();
-            if (bucketInfo != null) {
-                if (Config.enable_create_hive_bucket_table) {
-                    if (bucketInfo instanceof HashDistributionDesc) {
-                        hiveTableMeta = 
HiveTableMetadata.of(db.getRemoteName(),
-                                tblName,
-                                location,
-                                stmt.getColumns(),
-                                partitionColNames,
-                                bucketInfo.getDistributionColumnNames(),
-                                bucketInfo.getBuckets(),
-                                ddlProps,
-                                fileFormat,
-                                comment);
-                    } else {
-                        throw new UserException("External hive table only 
supports hash bucketing");
-                    }
-                } else {
-                    throw new UserException("Create hive bucket table need"
-                            + " set enable_create_hive_bucket_table to true");
-                }
-            } else {
-                hiveTableMeta = HiveTableMetadata.of(db.getRemoteName(),
-                        tblName,
-                        location,
-                        stmt.getColumns(),
-                        partitionColNames,
-                        ddlProps,
-                        fileFormat,
-                        comment);
-            }
-            client.createTable(hiveTableMeta, stmt.isSetIfNotExists());
-            return false;
-        } catch (Exception e) {
-            throw new UserException(e.getMessage(), e);
-        }
-    }
-
-    @Override
     public void afterCreateTable(String dbName, String tblName) {
         Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
         if (db.isPresent()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java
index c64e766aacd..8dcdfbd5dd9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
@@ -52,11 +51,6 @@ public class IcebergDLFExternalCatalog extends 
IcebergExternalCatalog {
         throw new NotSupportedException("iceberg catalog with dlf type not 
supports 'create table'");
     }
 
-    @Override
-    public boolean createTable(CreateTableStmt stmt) throws UserException {
-        throw new NotSupportedException("iceberg catalog with dlf type not 
supports 'create table'");
-    }
-
     @Override
     public void dropTable(String dbName, String tableName, boolean isView, 
boolean isMtmv, boolean ifExists,
             boolean mustTemporary, boolean force) throws DdlException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index ed4f2b5c04b..f57392a4f4c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -18,7 +18,6 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.analysis.ColumnPosition;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.StructField;
@@ -314,63 +313,6 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         return false;
     }
 
-    @Override
-    public boolean createTableImpl(CreateTableStmt stmt) throws UserException {
-        try {
-            return executionAuthenticator.execute(() -> 
performCreateTable(stmt));
-        } catch (Exception e) {
-            throw new DdlException(
-                "Failed to create table: " + stmt.getTableName() + ", error 
message is:" + e.getMessage(), e);
-        }
-    }
-
-    public boolean performCreateTable(CreateTableStmt stmt) throws 
UserException {
-        String dbName = stmt.getDbName();
-        ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
-        if (db == null) {
-            throw new UserException("Failed to get database: '" + dbName + "' 
in catalog: " + dorisCatalog.getName());
-        }
-        String tableName = stmt.getTableName();
-        // 1. first, check if table exist in remote
-        if (tableExist(db.getRemoteName(), tableName)) {
-            if (stmt.isSetIfNotExists()) {
-                LOG.info("create table[{}] which already exists", tableName);
-                return true;
-            } else {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-            }
-        }
-        // 2. second, check fi table exist in local.
-        // This is because case sensibility issue, eg:
-        // 1. lower_case_table_name = 1
-        // 2. create table tbl1;
-        // 3. create table TBL1;  TBL1 does not exist in remote because the 
remote system is case-sensitive.
-        //    but because lower_case_table_name = 1, the table can not be 
created in Doris because it is conflict with
-        //    tbl1
-        ExternalTable dorisTable = db.getTableNullable(tableName);
-        if (dorisTable != null) {
-            if (stmt.isSetIfNotExists()) {
-                LOG.info("create table[{}] which already exists", tableName);
-                return true;
-            } else {
-                
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
-            }
-        }
-        List<Column> columns = stmt.getColumns();
-        List<StructField> collect = columns.stream()
-                .map(col -> new StructField(col.getName(), col.getType(), 
col.getComment(), col.isAllowNull()))
-                .collect(Collectors.toList());
-        StructType structType = new StructType(new ArrayList<>(collect));
-        Type visit =
-                DorisTypeVisitor.visit(structType, new 
DorisTypeToIcebergType(structType));
-        Schema schema = new 
Schema(visit.asNestedType().asStructType().fields());
-        Map<String, String> properties = stmt.getProperties();
-        properties.put(ExternalCatalog.DORIS_VERSION, 
ExternalCatalog.DORIS_VERSION_VALUE);
-        PartitionSpec partitionSpec = 
IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
-        catalog.createTable(getTableIdentifier(dbName, tableName), schema, 
partitionSpec, properties);
-        return false;
-    }
-
     @Override
     public void afterCreateTable(String dbName, String tblName) {
         Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index d7ddf219f2a..35fcea59048 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -18,7 +18,6 @@
 package org.apache.doris.datasource.operations;
 
 import org.apache.doris.analysis.ColumnPosition;
-import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
@@ -100,24 +99,8 @@ public interface ExternalMetadataOps {
         return res;
     }
 
-    /**
-     *
-     * @param stmt
-     * @return return false means table does not exist and is created this time
-     * @throws UserException
-     */
-    default boolean createTable(CreateTableStmt stmt) throws UserException {
-        boolean res = createTableImpl(stmt);
-        if (!res) {
-            afterCreateTable(stmt.getDbName(), stmt.getTableName());
-        }
-        return res;
-    }
-
     boolean createTableImpl(CreateTableInfo createTableInfo) throws 
UserException;
 
-    boolean createTableImpl(CreateTableStmt stmt) throws UserException;
-
     default void afterCreateTable(String dbName, String tblName) {
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
deleted file mode 100644
index e23165cd113..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.qe;
-
-import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.DdlStmt;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Use for execute ddl.
- **/
-public class DdlExecutor {
-    private static final Logger LOG = LogManager.getLogger(DdlExecutor.class);
-
-    /**
-     * Execute ddl.
-     **/
-    public static void execute(Env env, DdlStmt ddlStmt) throws Exception {
-        checkDdlStmtSupported(ddlStmt);
-        if (ddlStmt instanceof CreateTableStmt) {
-            env.createTable((CreateTableStmt) ddlStmt);
-        } else {
-            LOG.warn("Unkown statement " + ddlStmt.getClass());
-            throw new DdlException("Unknown statement.");
-        }
-    }
-
-    private static void checkDdlStmtSupported(DdlStmt ddlStmt) throws 
DdlException {
-        // check stmt has been supported in cloud mode
-        if (Config.isNotCloudMode()) {
-            return;
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
deleted file mode 100644
index ab7b076a866..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HiveMetadataOpsTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.hive;
-
-import org.apache.doris.analysis.CreateTableStmt;
-import org.apache.doris.analysis.DistributionDesc;
-import org.apache.doris.analysis.HashDistributionDesc;
-import org.apache.doris.analysis.KeysDesc;
-import org.apache.doris.analysis.PartitionDesc;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.Type;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.DatabaseMetadata;
-import org.apache.doris.datasource.ExternalDatabase;
-import org.apache.doris.datasource.ExternalTable;
-import org.apache.doris.datasource.TableMetadata;
-
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * just overlay all metadata operations here.
- * @see HiveDDLAndDMLPlanTest Use it if you need to verify correctness.
- */
-public class HiveMetadataOpsTest {
-
-    private HiveMetadataOps metadataOps;
-
-    @Mocked
-    private HMSCachedClient mockedClient;
-    @Mocked
-    private HMSExternalCatalog mockedCatalog;
-
-    @BeforeEach
-    public void init() {
-        metadataOps = new HiveMetadataOps(mockedCatalog, mockedClient);
-        new MockUp<HMSExternalCatalog>(HMSExternalCatalog.class) {
-            @Mock
-            public ExternalDatabase<? extends ExternalTable> 
getDbNullable(String dbName) {
-                return new HMSExternalDatabase(mockedCatalog, 0L, "mockedDb", 
"mockedDb");
-            }
-
-            @Mock
-            public void onRefresh(boolean invalidCache) {
-                // mocked
-            }
-        };
-        new MockUp<HMSCachedClient>(HMSCachedClient.class) {
-            @Mock
-            public void createDatabase(DatabaseMetadata catalogDatabase) {
-                // mocked
-            }
-
-            @Mock
-            public void dropDatabase(String dbName) {
-                // mocked
-            }
-
-            @Mock
-            public void dropTable(String dbName, String tableName) {
-                // mocked
-            }
-
-            @Mock
-            public void createTable(TableMetadata catalogTable, boolean 
ignoreIfExists) {
-                // mocked
-            }
-        };
-    }
-
-    private void createDb(String dbName, Map<String, String> props) throws 
DdlException {
-        metadataOps.createDb(dbName, true, props);
-    }
-
-    private void dropDb(String dbName, boolean forceDrop) throws DdlException {
-        metadataOps.dropDb(dbName, true, forceDrop);
-    }
-
-    private void createTable(TableName tableName,
-                             List<Column> cols,
-                             List<String> parts,
-                             List<String> buckets,
-                             Map<String, String> props)
-            throws UserException {
-        PartitionDesc partitionDesc = new PartitionDesc(parts, null);
-        DistributionDesc distributionDesc = null;
-        if (!buckets.isEmpty()) {
-            distributionDesc = new HashDistributionDesc(10, buckets);
-        }
-        List<String> colsName = 
cols.stream().map(Column::getName).collect(Collectors.toList());
-        CreateTableStmt stmt = new CreateTableStmt(true, false, false,
-                tableName,
-                cols, null,
-                "hive",
-                new KeysDesc(KeysType.AGG_KEYS, colsName),
-                partitionDesc,
-                distributionDesc,
-                props,
-                props,
-                "comment",
-                null, null);
-        metadataOps.createTable(stmt);
-    }
-
-    private void dropTable(TableName tableName, boolean forceDrop) throws 
DdlException {
-        HMSExternalDatabase externalDatabase = new HMSExternalDatabase(
-                mockedCatalog, 0, tableName.getDb(), tableName.getDb());
-        HMSExternalTable externalTable = new HMSExternalTable(
-                1, tableName.getTbl(), tableName.getTbl(), mockedCatalog, 
externalDatabase);
-        metadataOps.dropTable(externalTable, true);
-    }
-
-    @Test
-    public void testCreateAndDropAll() throws UserException {
-        new MockUp<HMSExternalDatabase>(HMSExternalDatabase.class) {
-            // create table if getTableNullable return null
-            @Mock
-            HMSExternalTable getTableNullable(String tableName) {
-                return null;
-            }
-        };
-        Map<String, String> dbProps = new HashMap<>();
-        dbProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/db");
-        createDb("mockedDb", dbProps);
-        Map<String, String> tblProps = new HashMap<>();
-        tblProps.put(HiveMetadataOps.FILE_FORMAT_KEY, "orc");
-        tblProps.put(HiveMetadataOps.LOCATION_URI_KEY, "file://loc/tbl");
-        tblProps.put("fs.defaultFS", "hdfs://ha");
-        TableName tableName = new TableName("mockedCtl", "mockedDb", 
"mockedTbl");
-        List<Column> cols = new ArrayList<>();
-        cols.add(new Column("id", Type.BIGINT));
-        cols.add(new Column("pt", Type.STRING));
-        cols.add(new Column("rate", Type.DOUBLE));
-        cols.add(new Column("time", Type.DATETIME));
-        List<String> parts = new ArrayList<>();
-        parts.add("pt");
-        List<String> bucks = new ArrayList<>();
-        // bucks.add("id");
-        createTable(tableName, cols, parts, bucks, tblProps);
-        dropTable(tableName, true);
-        dropDb("mockedDb", true);
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to