This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dee89d2c4ad [refactor](Nereids) let create table compatible with 
legacy planner (#28078)
dee89d2c4ad is described below

commit dee89d2c4ada85648233d8619dc68ea0644b7f2e
Author: starocean999 <[email protected]>
AuthorDate: Wed Dec 13 16:35:40 2023 +0800

    [refactor](Nereids) let create table compatible with legacy planner (#28078)
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  40 +-
 .../org/apache/doris/analysis/CreateMTMVStmt.java  |   2 +-
 .../org/apache/doris/analysis/CreateTableStmt.java |   2 +
 .../java/org/apache/doris/analysis/KeysDesc.java   |   6 +
 .../main/java/org/apache/doris/catalog/Column.java |  11 +
 .../doris/nereids/parser/LogicalPlanBuilder.java   |  77 +-
 .../parser/spark/SparkSql3LogicalPlanBuilder.java  |   4 +-
 .../plans/commands/info/ColumnDefinition.java      | 146 +++-
 .../trees/plans/commands/info/CreateMTMVInfo.java  |   2 +-
 .../trees/plans/commands/info/CreateTableInfo.java | 794 ++++++++++++++++-----
 .../commands/info/DistributionDescriptor.java      |  10 +-
 .../plans/commands/info/FixedRangePartition.java   |  17 +-
 .../trees/plans/commands/info/InPartition.java     |   5 +
 .../trees/plans/commands/info/IndexDefinition.java | 179 ++++-
 .../jdbc/test_doris_jdbc_catalog.out               |  30 +-
 .../data/insert_p0/insert_with_null.out            |   9 +-
 .../unique/test_unique_table_auto_inc.groovy       |   2 +-
 .../suites/ddl_p0/test_create_table_like.groovy    |   2 +-
 .../external_table_p0/tvf/test_s3_tvf.groovy       |   2 +-
 .../suites/index_p0/test_index_meta.groovy         |  12 +-
 .../load_p0/stream_load/test_csv_split_line.groovy |   2 +-
 .../cast_function/test_cast_map_function.groovy    |   2 +-
 .../session_variable/test_default_limit.groovy     |   4 +-
 .../test_multi_column_partition.groovy             |  30 +-
 regression-test/suites/point_query_p0/load.groovy  |  19 +
 .../cast_function/test_cast_map_function.groovy    |   2 +-
 .../test_alter_table_drop_column.groovy            |   2 +-
 .../test_alter_table_modify_column.groovy          |   4 +-
 .../cluster_key/test_create_table.groovy           |  58 +-
 29 files changed, 1159 insertions(+), 316 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index ed81a277429..e88773c56b7 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -40,15 +40,17 @@ statement
         AS type=(RESTRICTIVE | PERMISSIVE)
         TO (user=userIdentify | ROLE roleName=identifier)
         USING LEFT_PAREN booleanExpression RIGHT_PAREN                 
#createRowPolicy
-    | CREATE TABLE (IF NOT EXISTS)? name=multipartIdentifier
-        ((ctasCols=identifierList)? | (LEFT_PAREN columnDefs indexDefs? 
RIGHT_PAREN))
+    | CREATE (EXTERNAL)? TABLE (IF NOT EXISTS)? name=multipartIdentifier
+        ((ctasCols=identifierList)? | (LEFT_PAREN columnDefs (COMMA 
indexDefs)? RIGHT_PAREN))
         (ENGINE EQ engine=identifier)?
-        ((AGGREGATE | UNIQUE | DUPLICATE) KEY keys=identifierList)?
+        ((AGGREGATE | UNIQUE | DUPLICATE) KEY keys=identifierList (CLUSTER BY 
clusterKeys=identifierList)?)?
         (COMMENT STRING_LITERAL)?
-        (PARTITION BY (RANGE | LIST) partitionKeys=identifierList LEFT_PAREN 
partitions=partitionsDef RIGHT_PAREN)?
-        (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS 
INTEGER_VALUE | AUTO)?)?
+        ((autoPartition=AUTO)? PARTITION BY (RANGE | LIST) 
(partitionKeys=identifierList | partitionExpr=functionCallExpression)
+          LEFT_PAREN (partitions=partitionsDef)? RIGHT_PAREN)?
+        (DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM) (BUCKETS 
(INTEGER_VALUE | autoBucket=AUTO))?)?
         (ROLLUP LEFT_PAREN rollupDefs RIGHT_PAREN)?
-        propertyClause?
+        properties=propertyClause?
+        (BROKER extProperties=propertyClause)?
         (AS query)?                                                    
#createTable
     | explain? INSERT (INTO | OVERWRITE TABLE)
         (tableName=multipartIdentifier | DORIS_INTERNAL_TABLE_ID LEFT_PAREN 
tableId=INTEGER_VALUE RIGHT_PAREN)
@@ -482,7 +484,7 @@ columnDefs
     
 columnDef
     : colName=identifier type=dataType
-        KEY? (aggType=aggTypeDef)? ((NOT NULL) | NULL)?
+        KEY? (aggType=aggTypeDef)? ((NOT NULL) | NULL)? (AUTO_INCREMENT)?
         (DEFAULT (nullValue=NULL | INTEGER_VALUE | stringValue=STRING_LITERAL
             | CURRENT_TIMESTAMP (LEFT_PAREN defaultValuePrecision=number 
RIGHT_PAREN)?))?
         (ON UPDATE CURRENT_TIMESTAMP (LEFT_PAREN onUpdateValuePrecision=number 
RIGHT_PAREN)?)?
@@ -494,7 +496,7 @@ indexDefs
     ;
     
 indexDef
-    : INDEX indexName=identifier cols=identifierList (USING BITMAP)? 
(comment=STRING_LITERAL)?
+    : INDEX indexName=identifier cols=identifierList (USING indexType=(BITMAP 
| INVERTED | NGRAM_BF))? (PROPERTIES LEFT_PAREN properties=propertyItemList 
RIGHT_PAREN)? (COMMENT comment=STRING_LITERAL)?
     ;
     
 partitionsDef
@@ -518,8 +520,8 @@ stepPartitionDef
     ;
 
 inPartitionDef
-    : PARTITION (IF NOT EXISTS)? partitionName=identifier VALUES IN 
((LEFT_PAREN constantSeqs+=constantSeq
-        (COMMA constantSeqs+=constantSeq)* RIGHT_PAREN) | 
constants=constantSeq)
+    : PARTITION (IF NOT EXISTS)? partitionName=identifier (VALUES IN 
((LEFT_PAREN constantSeqs+=constantSeq
+        (COMMA constantSeqs+=constantSeq)* RIGHT_PAREN) | 
constants=constantSeq))?
     ;
     
 constantSeq
@@ -668,13 +670,7 @@ primaryExpression
           RIGHT_PAREN                                                          
               #charFunction
     | CONVERT LEFT_PAREN argument=expression USING charSet=identifierOrText 
RIGHT_PAREN       #convertCharSet
     | CONVERT LEFT_PAREN argument=expression COMMA type=dataType RIGHT_PAREN   
               #convertType
-    | functionIdentifier 
-        LEFT_PAREN (
-            (DISTINCT|ALL)? 
-            arguments+=expression (COMMA arguments+=expression)*
-            (ORDER BY sortItem (COMMA sortItem)*)?
-        )? RIGHT_PAREN
-      (OVER windowSpec)?                                                       
                #functionCall
+    | functionCallExpression                                                   
                #functionCall
     | value=primaryExpression LEFT_BRACKET index=valueExpression RIGHT_BRACKET 
                #elementAt
     | value=primaryExpression LEFT_BRACKET begin=valueExpression
       COLON (end=valueExpression)? RIGHT_BRACKET                               
                #arraySlice
@@ -690,6 +686,16 @@ primaryExpression
     | primaryExpression COLLATE (identifier | STRING_LITERAL | DEFAULT)        
                #collate
     ;
 
+functionCallExpression
+    : functionIdentifier
+              LEFT_PAREN (
+                  (DISTINCT|ALL)?
+                  arguments+=expression (COMMA arguments+=expression)*
+                  (ORDER BY sortItem (COMMA sortItem)*)?
+              )? RIGHT_PAREN
+            (OVER windowSpec)?
+    ;
+
 functionIdentifier 
     : (dbName=identifier DOT)? functionNameIdentifier
     ;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
index bb2efdd6281..a79447e9b46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateMTMVStmt.java
@@ -37,7 +37,7 @@ public class CreateMTMVStmt extends CreateTableStmt {
             Map<String, String> properties, Map<String, String> mvProperties, 
String querySql, String comment,
             EnvInfo envInfo) {
         super(ifNotExists, false, mvName, columns, new ArrayList<Index>(), 
DEFAULT_ENGINE_NAME, keyDesc, null,
-                distributionDesc, properties, null, comment, null, null);
+                distributionDesc, properties, null, comment, null, null, null);
         this.refreshInfo = refreshInfo;
         this.querySql = querySql;
         this.envInfo = envInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index a5c3e893405..dc6d504aaf4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -211,6 +211,7 @@ public class CreateTableStmt extends DdlStmt {
             Map<String, String> extProperties,
             String comment,
             List<AlterClause> rollupAlterClauseList,
+            String clusterName,
             Void unused) {
         this.ifNotExists = ifNotExists;
         this.isExternal = isExternal;
@@ -226,6 +227,7 @@ public class CreateTableStmt extends DdlStmt {
         this.columnDefs = Lists.newArrayList();
         this.comment = Strings.nullToEmpty(comment);
         this.rollupAlterClauseList = (rollupAlterClauseList == null) ? 
Lists.newArrayList() : rollupAlterClauseList;
+        this.setClusterName(clusterName);
     }
 
     public void addColumnDef(ColumnDef columnDef) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
index a3ed083ebdf..654db1b4a5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/KeysDesc.java
@@ -50,6 +50,12 @@ public class KeysDesc implements Writable {
         this.clusterKeysColumnNames = clusterKeyColumnNames;
     }
 
+    public KeysDesc(KeysType type, List<String> keysColumnNames, List<String> 
clusterKeyColumnNames,
+                    List<Integer> clusterKeysColumnIds) {
+        this(type, keysColumnNames, clusterKeyColumnNames);
+        this.clusterKeysColumnIds = clusterKeysColumnIds;
+    }
+
     public KeysType getKeysType() {
         return type;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 591b67d625c..dc94074cac6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -245,6 +245,17 @@ public class Column implements Writable, 
GsonPostProcessable {
         }
     }
 
+    public Column(String name, Type type, boolean isKey, AggregateType 
aggregateType,
+            boolean isAllowNull, boolean isAutoInc, String defaultValue, 
String comment,
+            boolean visible, DefaultValueExprDef defaultValueExprDef, int 
colUniqueId,
+            String realDefaultValue, boolean hasOnUpdateDefaultValue,
+            DefaultValueExprDef onUpdateDefaultValueExprDef, int clusterKeyId) 
{
+        this(name, type, isKey, aggregateType, isAllowNull, isAutoInc, 
defaultValue, comment,
+                visible, defaultValueExprDef, colUniqueId, realDefaultValue,
+                hasOnUpdateDefaultValue, onUpdateDefaultValueExprDef);
+        this.clusterKeyId = clusterKeyId;
+    }
+
     public Column(String name, Type type, boolean isKey, AggregateType 
aggregateType, boolean isAllowNull,
             boolean isAutoInc, String defaultValue, String comment, boolean 
visible,
             DefaultValueExprDef defaultValueExprDef, int colUniqueId, String 
realDefaultValue, int clusterKeyId) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 96f6c4322d1..6b644bc7238 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -194,6 +194,7 @@ import org.apache.doris.nereids.analyzer.UnboundTableSink;
 import org.apache.doris.nereids.analyzer.UnboundVariable;
 import org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
 import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.nereids.exceptions.ParseException;
 import org.apache.doris.nereids.properties.OrderKey;
 import org.apache.doris.nereids.properties.SelectHint;
@@ -1751,7 +1752,7 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     }
 
     @Override
-    public Expression visitFunctionCall(DorisParser.FunctionCallContext ctx) {
+    public Expression 
visitFunctionCallExpression(DorisParser.FunctionCallExpressionContext ctx) {
         return ParserUtils.withOrigin(ctx, () -> {
             String functionName = 
ctx.functionIdentifier().functionNameIdentifier().getText();
             boolean isDistinct = ctx.DISTINCT() != null;
@@ -2172,20 +2173,53 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             keysType = KeysType.UNIQUE_KEYS;
         }
         String engineName = ctx.engine != null ? 
ctx.engine.getText().toLowerCase() : "olap";
-        boolean isHash = ctx.HASH() != null || ctx.RANDOM() == null;
         int bucketNum = FeConstants.default_bucket_num;
-        if (isHash && ctx.INTEGER_VALUE() != null) {
+        if (ctx.INTEGER_VALUE() != null) {
             bucketNum = Integer.parseInt(ctx.INTEGER_VALUE().getText());
         }
-        DistributionDescriptor desc = new DistributionDescriptor(isHash, 
ctx.AUTO() != null,
-                bucketNum, ctx.HASH() != null ? 
visitIdentifierList(ctx.hashKeys) : null);
-        Map<String, String> properties = ctx.propertyClause() != null
+        DistributionDescriptor desc = null;
+        if (ctx.HASH() != null) {
+            desc = new DistributionDescriptor(true, ctx.autoBucket != null, 
bucketNum,
+                    visitIdentifierList(ctx.hashKeys));
+        } else if (ctx.RANDOM() != null) {
+            desc = new DistributionDescriptor(false, ctx.autoBucket != null, 
bucketNum, null);
+        }
+        Map<String, String> properties = ctx.properties != null
                 // NOTICE: we should not generate immutable map here, because 
it will be modified when analyzing.
-                ? Maps.newHashMap(visitPropertyClause(ctx.propertyClause())) : 
Maps.newHashMap();
+                ? Maps.newHashMap(visitPropertyClause(ctx.properties))
+                : Maps.newHashMap();
+        Map<String, String> extProperties = ctx.extProperties != null
+                // NOTICE: we should not generate immutable map here, because 
it will be modified when analyzing.
+                ? Maps.newHashMap(visitPropertyClause(ctx.extProperties))
+                : Maps.newHashMap();
         String partitionType = null;
         if (ctx.PARTITION() != null) {
             partitionType = ctx.RANGE() != null ? "RANGE" : "LIST";
         }
+        boolean isAutoPartition = ctx.autoPartition != null;
+        ImmutableList.Builder<Expression> autoPartitionExpr = new 
ImmutableList.Builder<>();
+        if (isAutoPartition) {
+            if (ctx.RANGE() != null) {
+                // AUTO PARTITION BY RANGE FUNC_CALL_EXPR
+                if (ctx.partitionExpr != null) {
+                    
autoPartitionExpr.add(visitFunctionCallExpression(ctx.partitionExpr));
+                } else {
+                    throw new AnalysisException(
+                            "AUTO PARTITION BY RANGE must provide a function 
expr");
+                }
+            } else {
+                // AUTO PARTITION BY LIST(`partition_col`)
+                if (ctx.partitionKeys != null) {
+                    // only support one column in auto partition
+                    
autoPartitionExpr.addAll(visitIdentifierList(ctx.partitionKeys).stream()
+                            .distinct().map(name -> UnboundSlot.quoted(name))
+                            .collect(Collectors.toList()));
+                } else {
+                    throw new AnalysisException(
+                            "AUTO PARTITION BY List must provide a partition 
column");
+                }
+            }
+        }
 
         if (ctx.columnDefs() != null) {
             if (ctx.AS() != null) {
@@ -2193,24 +2227,30 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             }
             return new CreateTableCommand(Optional.empty(), new 
CreateTableInfo(
                     ctx.EXISTS() != null,
+                    ctx.EXTERNAL() != null,
                     ctlName,
                     dbName,
                     tableName,
                     visitColumnDefs(ctx.columnDefs()),
-                    ImmutableList.of(),
+                    ctx.indexDefs() != null ? visitIndexDefs(ctx.indexDefs()) 
: ImmutableList.of(),
                     engineName,
                     keysType,
                     ctx.keys != null ? visitIdentifierList(ctx.keys) : 
ImmutableList.of(),
                     "",
+                    isAutoPartition,
+                    autoPartitionExpr.build(),
                     partitionType,
                     ctx.partitionKeys != null ? 
visitIdentifierList(ctx.partitionKeys) : null,
                     ctx.partitions != null ? 
visitPartitionsDef(ctx.partitions) : null,
                     desc,
                     ctx.rollupDefs() != null ? 
visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(),
-                    properties));
+                    properties,
+                    extProperties,
+                    ctx.clusterKeys != null ? 
visitIdentifierList(ctx.clusterKeys) : ImmutableList.of()));
         } else if (ctx.AS() != null) {
             return new 
CreateTableCommand(Optional.of(visitQuery(ctx.query())), new CreateTableInfo(
                     ctx.EXISTS() != null,
+                    ctx.EXTERNAL() != null,
                     ctlName,
                     dbName,
                     tableName,
@@ -2219,12 +2259,16 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                     keysType,
                     ctx.keys != null ? visitIdentifierList(ctx.keys) : 
ImmutableList.of(),
                     "",
+                    isAutoPartition,
+                    autoPartitionExpr.build(),
                     partitionType,
                     ctx.partitionKeys != null ? 
visitIdentifierList(ctx.partitionKeys) : null,
                     ctx.partitions != null ? 
visitPartitionsDef(ctx.partitions) : null,
                     desc,
                     ctx.rollupDefs() != null ? 
visitRollupDefs(ctx.rollupDefs()) : ImmutableList.of(),
-                    properties));
+                    properties,
+                    extProperties,
+                    ctx.clusterKeys != null ? 
visitIdentifierList(ctx.clusterKeys) : ImmutableList.of()));
         } else {
             throw new AnalysisException("Should contain at least one column in 
a table");
         }
@@ -2283,7 +2327,8 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
             }
         }
         String comment = ctx.comment != null ? ctx.comment.getText() : "";
-        return new ColumnDefinition(colName, colType, isKey, aggType, 
!isNotNull, defaultValue,
+        boolean isAutoInc = ctx.AUTO_INCREMENT() != null;
+        return new ColumnDefinition(colName, colType, isKey, aggType, 
!isNotNull, isAutoInc, defaultValue,
                 onUpdateDefaultValue, comment);
     }
 
@@ -2296,9 +2341,10 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     public IndexDefinition visitIndexDef(IndexDefContext ctx) {
         String indexName = ctx.indexName.getText();
         List<String> indexCols = visitIdentifierList(ctx.cols);
-        boolean isUseBitmap = ctx.USING() != null;
-        String comment = ctx.comment.getText();
-        return new IndexDefinition(indexName, indexCols, isUseBitmap, comment);
+        Map<String, String> properties = visitPropertyItemList(ctx.properties);
+        String indexType = ctx.indexType.getText();
+        String comment = ctx.comment != null ? ctx.comment.getText() : "";
+        return new IndexDefinition(indexName, indexCols, indexType, 
properties, comment);
     }
 
     @Override
@@ -2917,6 +2963,9 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
     public DataType visitPrimitiveDataType(PrimitiveDataTypeContext ctx) {
         return ParserUtils.withOrigin(ctx, () -> {
             String dataType = 
ctx.primitiveColType().type.getText().toLowerCase(Locale.ROOT);
+            if (dataType.equalsIgnoreCase("all")) {
+                throw new NotSupportedException("Disable to create table with 
`ALL` type columns");
+            }
             List<String> l = Lists.newArrayList(dataType);
             
ctx.INTEGER_VALUE().stream().map(ParseTree::getText).forEach(l::add);
             return DataType.convertPrimitiveFromStrings(l, 
ctx.primitiveColType().UNSIGNED() != null);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/spark/SparkSql3LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/spark/SparkSql3LogicalPlanBuilder.java
index d01f475871e..49eb2b74cc1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/spark/SparkSql3LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/spark/SparkSql3LogicalPlanBuilder.java
@@ -55,8 +55,8 @@ public class SparkSql3LogicalPlanBuilder extends 
LogicalPlanBuilder {
     }
 
     @Override
-    public Expression visitFunctionCall(DorisParser.FunctionCallContext ctx) {
-        Expression expression = super.visitFunctionCall(ctx);
+    public Expression 
visitFunctionCallExpression(DorisParser.FunctionCallExpressionContext ctx) {
+        Expression expression = super.visitFunctionCallExpression(ctx);
         if (!(expression instanceof UnboundFunction)) {
             return expression;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
index 9a3c92dcfb6..ff74bd22560 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java
@@ -34,6 +34,7 @@ import org.apache.doris.nereids.types.StructType;
 import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.types.VarcharType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import java.util.List;
@@ -55,15 +56,19 @@ public class ColumnDefinition {
     private final String comment;
     private final boolean isVisible;
     private boolean aggTypeImplicit = false;
+    private boolean isAutoInc = false;
+    private int clusterKeyId = -1;
 
     public ColumnDefinition(String name, DataType type, boolean isKey, 
AggregateType aggType, boolean isNullable,
             Optional<DefaultValue> defaultValue, String comment) {
         this(name, type, isKey, aggType, isNullable, defaultValue, comment, 
true);
     }
 
-    public ColumnDefinition(String name, DataType type, boolean isKey, 
AggregateType aggType, boolean isNullable,
-            Optional<DefaultValue> defaultValue, Optional<DefaultValue> 
onUpdateDefaultValue, String comment) {
-        this(name, type, isKey, aggType, isNullable, defaultValue, 
onUpdateDefaultValue, comment, true);
+    public ColumnDefinition(String name, DataType type, boolean isKey, 
AggregateType aggType,
+            boolean isNullable, boolean isAutoInc, Optional<DefaultValue> 
defaultValue,
+            Optional<DefaultValue> onUpdateDefaultValue, String comment) {
+        this(name, type, isKey, aggType, isNullable, isAutoInc, defaultValue, 
onUpdateDefaultValue,
+                comment, true);
     }
 
     /**
@@ -84,14 +89,15 @@ public class ColumnDefinition {
     /**
      * constructor
      */
-    public ColumnDefinition(String name, DataType type, boolean isKey, 
AggregateType aggType, boolean isNullable,
-            Optional<DefaultValue> defaultValue, Optional<DefaultValue> 
onUpdateDefaultValue, String comment,
-                    boolean isVisible) {
+    private ColumnDefinition(String name, DataType type, boolean isKey, 
AggregateType aggType,
+            boolean isNullable, boolean isAutoInc, Optional<DefaultValue> 
defaultValue,
+            Optional<DefaultValue> onUpdateDefaultValue, String comment, 
boolean isVisible) {
         this.name = name;
         this.type = type;
         this.isKey = isKey;
         this.aggType = aggType;
         this.isNullable = isNullable;
+        this.isAutoInc = isAutoInc;
         this.defaultValue = defaultValue;
         this.onUpdateDefaultValue = onUpdateDefaultValue;
         this.comment = comment;
@@ -118,6 +124,10 @@ public class ColumnDefinition {
         return aggType;
     }
 
+    public void setAggType(AggregateType aggType) {
+        this.aggType = aggType;
+    }
+
     public boolean isNullable() {
         return isNullable;
     }
@@ -126,6 +136,18 @@ public class ColumnDefinition {
         return isKey;
     }
 
+    public void setIsKey(boolean isKey) {
+        this.isKey = isKey;
+    }
+
+    public void setClusterKeyId(int clusterKeyId) {
+        this.clusterKeyId = clusterKeyId;
+    }
+
+    public boolean isAutoInc() {
+        return isAutoInc;
+    }
+
     private DataType updateCharacterTypeLength(DataType dataType) {
         if (dataType instanceof ArrayType) {
             return ArrayType.of(updateCharacterTypeLength(((ArrayType) 
dataType).getItemType()));
@@ -153,7 +175,7 @@ public class ColumnDefinition {
     /**
      * validate column definition and analyze
      */
-    public void validate(Set<String> keysSet, boolean isEnableMergeOnWrite, 
KeysType keysType) {
+    public void validate(boolean isOlap, Set<String> keysSet, boolean 
isEnableMergeOnWrite, KeysType keysType) {
         if (Config.disable_nested_complex_type && isNestedComplexType(type)) {
             throw new AnalysisException("Unsupported data type: " + 
type.toSql());
         }
@@ -183,18 +205,40 @@ public class ColumnDefinition {
                 isNullable = false;
             }
         }
+
+        // check keys type
         if (keysSet.contains(name)) {
             isKey = true;
             if (aggType != null) {
-                throw new AnalysisException(String.format("Key column %s can 
not set aggregation type", name));
+                throw new AnalysisException(
+                        String.format("Key column %s can not set aggregation 
type", name));
             }
-            if (type.isStringType()) {
-                throw new AnalysisException("String Type should not be used in 
key column[" + name + "]");
+            if (isOlap) {
+                if (type.isFloatLikeType()) {
+                    throw new AnalysisException(
+                            "Float or double can not used as a key, use 
decimal instead.");
+                } else if (type.isStringType()) {
+                    throw new AnalysisException(
+                            "String Type should not be used in key column[" + 
name + "]");
+                } else if (type.isArrayType()) {
+                    throw new AnalysisException("Array can only be used in the 
non-key column of"
+                            + " the duplicate table at present.");
+                }
             }
             if (type.isBitmapType() || type.isHllType() || 
type.isQuantileStateType()) {
                 throw new AnalysisException("Key column can not set complex 
type:" + name);
+            } else if (type.isJsonType()) {
+                throw new AnalysisException(
+                        "JsonType type should not be used in key column[" + 
getName() + "].");
+            } else if (type.isMapType()) {
+                throw new AnalysisException("Map can only be used in the 
non-key column of"
+                        + " the duplicate table at present.");
+            } else if (type.isStructType()) {
+                throw new AnalysisException("Struct can only be used in the 
non-key column of"
+                        + " the duplicate table at present.");
             }
-        } else if (aggType == null) {
+        } else if (aggType == null && isOlap) {
+            Preconditions.checkState(keysType != null, "keysType is null");
             if (keysType.equals(KeysType.DUP_KEYS)) {
                 aggType = AggregateType.NONE;
             } else if (keysType.equals(KeysType.UNIQUE_KEYS) && 
isEnableMergeOnWrite) {
@@ -205,16 +249,54 @@ public class ColumnDefinition {
                 throw new AnalysisException("should set aggregation type to 
non-key column when in aggregate key");
             }
         }
-        if (!isKey && keysType.equals(KeysType.UNIQUE_KEYS)) {
-            aggTypeImplicit = true;
+
+        if (isOlap) {
+            if (!isKey && keysType.equals(KeysType.UNIQUE_KEYS)) {
+                aggTypeImplicit = true;
+            }
+
+            // If aggregate type is REPLACE_IF_NOT_NULL, we set it nullable.
+            // If default value is not set, we set it NULL
+            if (aggType == AggregateType.REPLACE_IF_NOT_NULL) {
+                isNullable = true;
+                if (!defaultValue.isPresent()) {
+                    defaultValue = 
Optional.of(DefaultValue.NULL_DEFAULT_VALUE);
+                }
+            }
         }
+
+        // check default value
         if (type.isHllType()) {
+            if (defaultValue.isPresent()) {
+                throw new AnalysisException("Hll type column can not set 
default value");
+            }
             defaultValue = Optional.of(DefaultValue.HLL_EMPTY_DEFAULT_VALUE);
         } else if (type.isBitmapType()) {
+            if (defaultValue.isPresent() && defaultValue.get() != 
DefaultValue.NULL_DEFAULT_VALUE) {
+                throw new AnalysisException("Bitmap type column can not set 
default value");
+            }
             defaultValue = 
Optional.of(DefaultValue.BITMAP_EMPTY_DEFAULT_VALUE);
-        } else if (type.isArrayType() && !defaultValue.isPresent()) {
-            defaultValue = Optional.of(DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE);
+        } else if (type.isArrayType() && defaultValue.isPresent() && isOlap
+                && defaultValue.get() != DefaultValue.NULL_DEFAULT_VALUE && 
!defaultValue.get()
+                        
.getValue().equals(DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE.getValue())) {
+            throw new AnalysisException("Array type column default value only 
support null or "
+                    + DefaultValue.ARRAY_EMPTY_DEFAULT_VALUE);
+        } else if (type.isMapType()) {
+            if (defaultValue.isPresent() && defaultValue.get() != 
DefaultValue.NULL_DEFAULT_VALUE) {
+                throw new AnalysisException("Map type column default value 
just support null");
+            }
+        } else if (type.isStructType()) {
+            if (defaultValue.isPresent() && defaultValue.get() != 
DefaultValue.NULL_DEFAULT_VALUE) {
+                throw new AnalysisException("Struct type column default value 
just support null");
+            }
+        }
+
+        if (!isNullable && defaultValue.isPresent()
+                && defaultValue.get() == DefaultValue.NULL_DEFAULT_VALUE) {
+            throw new AnalysisException(
+                    "Can not set null default value to non nullable column: " 
+ name);
         }
+
         if (defaultValue.isPresent()
                 && defaultValue.get().getValue() != null
                 && type.toCatalogDataType().isScalarType()) {
@@ -253,6 +335,36 @@ public class ColumnDefinition {
                 }
             }
         }
+
+        // from old planner CreateTableStmt's analyze method, after call 
columnDef.analyze(engineName.equals("olap"));
+        if (isOlap && type.isComplexType()) {
+            if (aggType != null && aggType != AggregateType.NONE
+                    && aggType != AggregateType.REPLACE) {
+                throw new 
AnalysisException(type.toCatalogDataType().getPrimitiveType()
+                        + " column can't support aggregation " + aggType);
+            }
+            if (isKey) {
+                throw new 
AnalysisException(type.toCatalogDataType().getPrimitiveType()
+                        + " can only be used in the non-key column of the 
duplicate table at present.");
+            }
+        }
+
+        if (type.isTimeLikeType()) {
+            throw new AnalysisException("Time type is not supported for olap 
table");
+        }
+
+        if (type.isObjectType()) {
+            if (type.isBitmapType()) {
+                if (keysType == KeysType.DUP_KEYS) {
+                    throw new AnalysisException(
+                            "column:" + name + " must be used in AGG_KEYS or 
UNIQUE_KEYS.");
+                }
+            } else {
+                if (keysType != KeysType.AGG_KEYS) {
+                    throw new AnalysisException("column:" + name + " must be 
used in AGG_KEYS.");
+                }
+            }
+        }
     }
 
     /**
@@ -284,10 +396,10 @@ public class ColumnDefinition {
      */
     public Column translateToCatalogStyle() {
         Column column = new Column(name, type.toCatalogDataType(), isKey, 
aggType, isNullable,
-                false, 
defaultValue.map(DefaultValue::getRawValue).orElse(null), comment, isVisible,
+                isAutoInc, 
defaultValue.map(DefaultValue::getRawValue).orElse(null), comment, isVisible,
                 
defaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null), 
Column.COLUMN_UNIQUE_ID_INIT_VALUE,
                 defaultValue.map(DefaultValue::getValue).orElse(null), 
onUpdateDefaultValue.isPresent(),
-                
onUpdateDefaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null));
+                
onUpdateDefaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null), 
clusterKeyId);
         column.setAggregationTypeImplicit(aggTypeImplicit);
         return column;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
index 98e610aab9e..03c158f404f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java
@@ -122,7 +122,7 @@ public class CreateMTMVInfo {
         final boolean finalEnableMergeOnWrite = false;
         Set<String> keysSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         keysSet.addAll(keys);
-        columns.forEach(c -> c.validate(keysSet, finalEnableMergeOnWrite, 
KeysType.DUP_KEYS));
+        columns.forEach(c -> c.validate(true, keysSet, 
finalEnableMergeOnWrite, KeysType.DUP_KEYS));
 
         if (distribution == null) {
             throw new AnalysisException("Create MTMV should contain 
distribution desc");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 835cb7468e3..ea517593f8d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -20,10 +20,17 @@ package org.apache.doris.nereids.trees.plans.commands.info;
 import org.apache.doris.analysis.AllPartitionDesc;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.analysis.DistributionDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.FunctionParams;
+import org.apache.doris.analysis.IndexDef;
 import org.apache.doris.analysis.KeysDesc;
 import org.apache.doris.analysis.ListPartitionDesc;
 import org.apache.doris.analysis.PartitionDesc;
 import org.apache.doris.analysis.RangePartitionDesc;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
@@ -34,12 +41,25 @@ import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.AutoBucketUtils;
+import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.external.elasticsearch.EsUtil;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.analyzer.UnboundFunction;
+import org.apache.doris.nereids.analyzer.UnboundSlot;
 import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
 import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.qe.ConnectContext;
 
@@ -50,10 +70,14 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 /**
@@ -72,22 +96,35 @@ public class CreateTableInfo {
     private List<String> keys;
     private final String comment;
     private final String partitionType;
-    private final List<String> partitionColumns;
+    private List<String> partitionColumns;
     private final List<PartitionDefinition> partitions;
-    private final DistributionDescriptor distribution;
+    private DistributionDescriptor distribution;
     private final List<RollupDefinition> rollups;
     private Map<String, String> properties;
+    private Map<String, String> extProperties;
     private boolean isEnableMergeOnWrite = false;
 
+    private final boolean isAutoPartition;
+    private final List<Expression> autoPartitionExprs;
+
+    private boolean isExternal = false;
+    private String clusterName = null;
+    private List<String> clusterKeysColumnNames = null;
+    private List<Integer> clusterKeysColumnIds = null;
+
     /**
      * constructor for create table
      */
-    public CreateTableInfo(boolean ifNotExists, String ctlName, String dbName, 
String tableName,
-            List<ColumnDefinition> columns, List<IndexDefinition> indexes, 
String engineName,
-            KeysType keysType, List<String> keys, String comment,
-            String partitionType, List<String> partitionColumns, 
List<PartitionDefinition> partitions,
-            DistributionDescriptor distribution, List<RollupDefinition> 
rollups, Map<String, String> properties) {
+    public CreateTableInfo(boolean ifNotExists, boolean isExternal, String 
ctlName, String dbName,
+            String tableName, List<ColumnDefinition> columns, 
List<IndexDefinition> indexes,
+            String engineName, KeysType keysType, List<String> keys, String 
comment,
+            boolean isAutoPartition, List<Expression> autoPartitionExprs, 
String partitionType,
+            List<String> partitionColumns, List<PartitionDefinition> 
partitions,
+            DistributionDescriptor distribution, List<RollupDefinition> 
rollups,
+            Map<String, String> properties, Map<String, String> extProperties,
+            List<String> clusterKeyColumnNames) {
         this.ifNotExists = ifNotExists;
+        this.isExternal = isExternal;
         this.ctlName = ctlName;
         this.dbName = dbName;
         this.tableName = tableName;
@@ -98,22 +135,31 @@ public class CreateTableInfo {
         this.keysType = keysType;
         this.keys = Utils.copyRequiredList(keys);
         this.comment = comment;
+        this.isAutoPartition = isAutoPartition;
+        this.autoPartitionExprs = autoPartitionExprs;
         this.partitionType = partitionType;
         this.partitionColumns = partitionColumns;
         this.partitions = partitions;
         this.distribution = distribution;
         this.rollups = Utils.copyRequiredList(rollups);
         this.properties = properties;
+        this.extProperties = extProperties;
+        this.clusterKeysColumnNames = 
Utils.copyRequiredList(clusterKeyColumnNames);
     }
 
     /**
      * constructor for create table as select
      */
-    public CreateTableInfo(boolean ifNotExists, String ctlName, String dbName, 
String tableName, List<String> cols,
-            String engineName, KeysType keysType, List<String> keys, String 
comment,
-            String partitionType, List<String> partitionColumns, 
List<PartitionDefinition> partitions,
-            DistributionDescriptor distribution, List<RollupDefinition> 
rollups, Map<String, String> properties) {
+    public CreateTableInfo(boolean ifNotExists, boolean isExternal, String 
ctlName, String dbName,
+            String tableName, List<String> cols, String engineName, KeysType 
keysType,
+            List<String> keys, String comment, boolean isAutoPartition,
+            List<Expression> autoPartitionExprs, String partitionType,
+            List<String> partitionColumns, List<PartitionDefinition> 
partitions,
+            DistributionDescriptor distribution, List<RollupDefinition> 
rollups,
+            Map<String, String> properties, Map<String, String> extProperties,
+            List<String> clusterKeyColumnNames) {
         this.ifNotExists = ifNotExists;
+        this.isExternal = isExternal;
         this.ctlName = ctlName;
         this.dbName = dbName;
         this.tableName = tableName;
@@ -124,12 +170,16 @@ public class CreateTableInfo {
         this.keysType = keysType;
         this.keys = Utils.copyRequiredList(keys);
         this.comment = comment;
+        this.isAutoPartition = isAutoPartition;
+        this.autoPartitionExprs = autoPartitionExprs;
         this.partitionType = partitionType;
         this.partitionColumns = partitionColumns;
         this.partitions = partitions;
         this.distribution = distribution;
         this.rollups = Utils.copyRequiredList(rollups);
         this.properties = properties;
+        this.extProperties = extProperties;
+        this.clusterKeysColumnNames = 
Utils.copyRequiredList(clusterKeyColumnNames);
     }
 
     public List<String> getCtasColumns() {
@@ -163,22 +213,28 @@ public class CreateTableInfo {
         if (columns.isEmpty()) {
             throw new AnalysisException("table should contain at least one 
column");
         }
-        if (distribution == null) {
-            throw new AnalysisException("Create olap table should contain 
distribution desc");
-        }
-        if (!engineName.equals("olap")) {
-            throw new AnalysisException("currently Nereids support olap engine 
only");
-        }
+
+        checkEngineName();
+
         if (properties == null) {
             properties = Maps.newHashMap();
         }
 
+        if (Strings.isNullOrEmpty(engineName) || 
engineName.equalsIgnoreCase("olap")) {
+            if (distribution == null) {
+                throw new AnalysisException("Create olap table should contain 
distribution desc");
+            }
+            properties = maybeRewriteByAutoBucket(distribution, properties);
+        }
+
         try {
             FeNameFormat.checkTableName(tableName);
         } catch (Exception e) {
             throw new AnalysisException(e.getMessage(), e);
         }
 
+        clusterName = ctx.getClusterName();
+
         // analyze catalog name
         if (Strings.isNullOrEmpty(ctlName)) {
             if (ctx.getCurrentCatalog() != null) {
@@ -188,202 +244,354 @@ public class CreateTableInfo {
             }
         }
 
+        // disallow external catalog
+        try {
+            Util.prohibitExternalCatalog(ctlName, 
this.getClass().getSimpleName());
+        } catch (Exception ex) {
+            throw new AnalysisException(ex.getMessage(), ex.getCause());
+        }
+
         // analyze table name
         if (Strings.isNullOrEmpty(dbName)) {
-            dbName = ClusterNamespace.getFullName(ctx.getClusterName(), 
ctx.getDatabase());
+            dbName = ClusterNamespace.getFullName(clusterName, 
ctx.getDatabase());
         } else {
-            dbName = ClusterNamespace.getFullName(ctx.getClusterName(), 
dbName);
+            dbName = ClusterNamespace.getFullName(clusterName, dbName);
+        }
+
+        if 
(!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), 
dbName,
+                tableName, PrivPredicate.CREATE)) {
+            try {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR,
+                        "CREATE");
+            } catch (Exception ex) {
+                throw new AnalysisException(ex.getMessage(), ex.getCause());
+            }
         }
 
         Preconditions.checkState(!Strings.isNullOrEmpty(ctlName), "catalog 
name is null or empty");
         Preconditions.checkState(!Strings.isNullOrEmpty(dbName), "database 
name is null or empty");
-        properties = 
PropertyAnalyzer.rewriteReplicaAllocationProperties(ctlName, dbName, 
properties);
 
-        boolean enableDuplicateWithoutKeysByDefault = false;
-        if (properties != null) {
-            try {
-                enableDuplicateWithoutKeysByDefault =
-                        
PropertyAnalyzer.analyzeEnableDuplicateWithoutKeysByDefault(properties);
-            } catch (Exception e) {
-                throw new AnalysisException(e.getMessage(), e.getCause());
+        //check datev1 and decimalv2
+        for (ColumnDefinition columnDef : columns) {
+            if (columnDef.getType().isDateType() && Config.disable_datev1) {
+                throw new AnalysisException(
+                        "Disable to create table with `DATE` type columns, 
please use `DATEV2`.");
+            }
+            if (columnDef.getType().isDecimalV2Type() && 
Config.disable_decimalv2) {
+                throw new AnalysisException("Disable to create table with 
`DECIMAL` type columns,"
+                        + "please use `DECIMALV3`.");
             }
         }
 
-        if (keys.isEmpty()) {
-            boolean hasAggColumn = false;
-            for (ColumnDefinition column : columns) {
-                if (column.getAggType() != null) {
-                    hasAggColumn = true;
-                    break;
+        if (engineName.equalsIgnoreCase("olap")) {
+            properties = 
PropertyAnalyzer.rewriteReplicaAllocationProperties(ctlName, dbName,
+                    properties);
+            boolean enableDuplicateWithoutKeysByDefault = false;
+            if (properties != null) {
+                try {
+                    enableDuplicateWithoutKeysByDefault =
+                            
PropertyAnalyzer.analyzeEnableDuplicateWithoutKeysByDefault(properties);
+                } catch (Exception e) {
+                    throw new AnalysisException(e.getMessage(), e.getCause());
                 }
             }
-            keys = Lists.newArrayList();
-            if (hasAggColumn) {
+            if (keys.isEmpty()) {
+                boolean hasAggColumn = false;
                 for (ColumnDefinition column : columns) {
                     if (column.getAggType() != null) {
+                        hasAggColumn = true;
                         break;
                     }
-                    keys.add(column.getName());
                 }
-                keysType = KeysType.AGG_KEYS;
-            } else {
-                if (!enableDuplicateWithoutKeysByDefault) {
-                    int keyLength = 0;
+                keys = Lists.newArrayList();
+                if (hasAggColumn) {
                     for (ColumnDefinition column : columns) {
-                        DataType type = column.getType();
-                        Type catalogType = 
column.getType().toCatalogDataType();
-                        keyLength += catalogType.getIndexSize();
-                        if (keys.size() >= 
FeConstants.shortkey_max_column_count
-                                || keyLength > 
FeConstants.shortkey_maxsize_bytes) {
-                            if (keys.isEmpty() && type.isStringLikeType()) {
-                                keys.add(column.getName());
-                            }
-                            break;
-                        }
-                        if (type.isFloatLikeType() || type.isStringType() || 
type.isJsonType()
-                                || catalogType.isComplexType()) {
+                        if (column.getAggType() != null) {
                             break;
                         }
                         keys.add(column.getName());
-                        if (type.isVarcharType()) {
-                            break;
+                    }
+                    keysType = KeysType.AGG_KEYS;
+                } else {
+                    if (!enableDuplicateWithoutKeysByDefault) {
+                        int keyLength = 0;
+                        for (ColumnDefinition column : columns) {
+                            DataType type = column.getType();
+                            Type catalogType = 
column.getType().toCatalogDataType();
+                            keyLength += catalogType.getIndexSize();
+                            if (keys.size() >= 
FeConstants.shortkey_max_column_count
+                                    || keyLength > 
FeConstants.shortkey_maxsize_bytes) {
+                                if (keys.isEmpty() && type.isStringLikeType()) 
{
+                                    keys.add(column.getName());
+                                }
+                                break;
+                            }
+                            if (type.isFloatLikeType() || type.isStringType() 
|| type.isJsonType()
+                                    || catalogType.isComplexType()) {
+                                break;
+                            }
+                            keys.add(column.getName());
+                            if (type.isVarcharType()) {
+                                break;
+                            }
                         }
                     }
+                    keysType = KeysType.DUP_KEYS;
+                }
+                // The OLAP table must have at least one short key,
+                // and the float and double should not be short key,
+                // so the float and double could not be the first column in 
OLAP table.
+                if (keys.isEmpty() && (keysType != KeysType.DUP_KEYS
+                        || !enableDuplicateWithoutKeysByDefault)) {
+                    throw new AnalysisException(
+                            "The olap table first column could not be float, 
double, string"
+                                    + " or array, struct, map, please use 
decimal or varchar instead.");
+                }
+            } else if (enableDuplicateWithoutKeysByDefault) {
+                throw new AnalysisException(
+                        "table property 
'enable_duplicate_without_keys_by_default' only can"
+                                + " set 'true' when create olap table by 
default.");
+            }
+
+            if (properties != null
+                    && 
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
+                if (!keysType.equals(KeysType.UNIQUE_KEYS)) {
+                    throw new 
AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE
+                            + " property only support unique key table");
                 }
-                keysType = KeysType.DUP_KEYS;
             }
-            // The OLAP table must have at least one short key,
-            // and the float and double should not be short key,
-            // so the float and double could not be the first column in OLAP 
table.
-            if (keys.isEmpty() && (keysType != KeysType.DUP_KEYS || 
!enableDuplicateWithoutKeysByDefault)) {
-                throw new AnalysisException("The olap table first column could 
not be float, double, string"
-                        + " or array, struct, map, please use decimal or 
varchar instead.");
+
+            if (keysType == KeysType.UNIQUE_KEYS) {
+                isEnableMergeOnWrite = false;
+                if (properties != null) {
+                    // properties = 
PropertyAnalyzer.enableUniqueKeyMergeOnWriteIfNotExists(properties);
+                    // `analyzeXXX` would modify `properties`, which will be 
used later,
+                    // so we just clone a properties map here.
+                    try {
+                        isEnableMergeOnWrite = 
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(
+                                new HashMap<>(properties));
+                    } catch (Exception e) {
+                        throw new AnalysisException(e.getMessage(), 
e.getCause());
+                    }
+                }
             }
-        } else if (enableDuplicateWithoutKeysByDefault) {
-            throw new AnalysisException("table property 
'enable_duplicate_without_keys_by_default' only can"
-                    + " set 'true' when create olap table by default.");
-        }
 
-        if (properties != null && 
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
-            if (!keysType.equals(KeysType.UNIQUE_KEYS)) {
-                throw new 
AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE
-                        + " property only support unique key table");
+            validateKeyColumns();
+            if (!clusterKeysColumnNames.isEmpty() && !isEnableMergeOnWrite) {
+                throw new AnalysisException(
+                        "Cluster keys only support unique keys table which 
enabled "
+                                + 
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
             }
-            try {
-                isEnableMergeOnWrite = 
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(Maps.newHashMap(properties));
-            } catch (Exception e) {
-                throw new AnalysisException(e.getMessage(), e.getCause());
+            for (int i = 0; i < keys.size(); ++i) {
+                columns.get(i).setIsKey(true);
             }
-        }
 
-        // add hidden column
-        if (Config.enable_batch_delete_by_default && 
keysType.equals(KeysType.UNIQUE_KEYS)) {
-            if (isEnableMergeOnWrite) {
-                
columns.add(ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.NONE));
-            } else {
-                
columns.add(ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.REPLACE));
+            if (keysType != KeysType.AGG_KEYS) {
+                AggregateType type = AggregateType.REPLACE;
+                if (keysType == KeysType.DUP_KEYS) {
+                    type = AggregateType.NONE;
+                }
+                if (keysType == KeysType.UNIQUE_KEYS && isEnableMergeOnWrite) {
+                    type = AggregateType.NONE;
+                }
+                for (int i = keys.size(); i < columns.size(); ++i) {
+                    columns.get(i).setAggType(type);
+                }
             }
-        }
 
-        // add a hidden column as row store
-        boolean storeRowColumn = false;
-        if (properties != null) {
-            try {
-                storeRowColumn = 
PropertyAnalyzer.analyzeStoreRowColumn(Maps.newHashMap(properties));
-            } catch (Exception e) {
-                throw new AnalysisException(e.getMessage(), e.getCause());
+            // add hidden column
+            if (Config.enable_batch_delete_by_default && 
keysType.equals(KeysType.UNIQUE_KEYS)) {
+                if (isEnableMergeOnWrite) {
+                    
columns.add(ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.NONE));
+                } else {
+                    columns.add(
+                            
ColumnDefinition.newDeleteSignColumnDefinition(AggregateType.REPLACE));
+                }
             }
-        }
-        if (storeRowColumn) {
-            if (keysType.equals(KeysType.AGG_KEYS)) {
-                throw new AnalysisException("Aggregate table can't support row 
column now");
+
+            // add a hidden column as row store
+            boolean storeRowColumn = false;
+            if (properties != null) {
+                try {
+                    storeRowColumn =
+                            
PropertyAnalyzer.analyzeStoreRowColumn(Maps.newHashMap(properties));
+                } catch (Exception e) {
+                    throw new AnalysisException(e.getMessage(), e.getCause());
+                }
             }
-            if (keysType.equals(KeysType.UNIQUE_KEYS)) {
+            if (storeRowColumn) {
+                if (keysType.equals(KeysType.AGG_KEYS)) {
+                    throw new AnalysisException("Aggregate table can't support 
row column now");
+                }
+                if (keysType.equals(KeysType.UNIQUE_KEYS)) {
+                    if (isEnableMergeOnWrite) {
+                        columns.add(
+                                
ColumnDefinition.newRowStoreColumnDefinition(AggregateType.NONE));
+                    } else {
+                        columns.add(ColumnDefinition
+                                
.newRowStoreColumnDefinition(AggregateType.REPLACE));
+                    }
+                } else {
+                    
columns.add(ColumnDefinition.newRowStoreColumnDefinition(null));
+                }
+            }
+            if (Config.enable_hidden_version_column_by_default
+                    && keysType.equals(KeysType.UNIQUE_KEYS)) {
                 if (isEnableMergeOnWrite) {
-                    
columns.add(ColumnDefinition.newRowStoreColumnDefinition(AggregateType.NONE));
+                    
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.NONE));
                 } else {
-                    
columns.add(ColumnDefinition.newRowStoreColumnDefinition(AggregateType.REPLACE));
+                    
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.REPLACE));
                 }
-            } else {
-                
columns.add(ColumnDefinition.newRowStoreColumnDefinition(null));
+            }
+
+            // validate partitions
+            Map<String, ColumnDefinition> columnMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+            columns.forEach(c -> {
+                if (columnMap.put(c.getName(), c) != null) {
+                    try {
+                        
ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME,
+                                c.getName());
+                    } catch (Exception e) {
+                        throw new AnalysisException(e.getMessage(), 
e.getCause());
+                    }
+                }
+            });
+
+            if (partitionColumns != null) {
+                partitionColumns.forEach(p -> {
+                    if (!columnMap.containsKey(p)) {
+                        throw new AnalysisException(
+                                String.format("partition key %s is not 
exists", p));
+                    }
+                    validatePartitionColumn(columnMap.get(p), ctx);
+                });
+
+                Set<String> partitionColumnSets = Sets.newHashSet();
+                List<String> duplicatesKeys = partitionColumns.stream()
+                        .filter(c -> 
!partitionColumnSets.add(c)).collect(Collectors.toList());
+                if (!duplicatesKeys.isEmpty()) {
+                    throw new AnalysisException(
+                            "Duplicated partition column " + 
duplicatesKeys.get(0));
+                }
+
+                if (partitions != null) {
+                    if (!checkPartitionsTypes()) {
+                        throw new AnalysisException(
+                                "partitions types is invalid, expected FIXED 
or LESS in range partitions"
+                                        + " and IN in list partitions");
+                    }
+                    Set<String> partitionNames = Sets.newHashSet();
+                    for (PartitionDefinition partition : partitions) {
+                        if (partition instanceof StepPartition) {
+                            continue;
+                        }
+                        String partitionName = partition.getPartitionName();
+                        if (partitionNames.contains(partitionName)) {
+                            throw new AnalysisException(
+                                    "Duplicated named partition: " + 
partitionName);
+                        }
+                        partitionNames.add(partitionName);
+                    }
+                    partitions.forEach(p -> {
+                        p.setPartitionTypes(partitionColumns.stream()
+                                .map(s -> 
columnMap.get(s).getType()).collect(Collectors.toList()));
+                        p.validate(Maps.newHashMap(properties));
+                    });
+                }
+            }
+
+            // validate distribution descriptor
+            distribution.updateCols(columns.get(0).getName());
+            distribution.validate(columnMap, keysType);
+
+            // validate key set.
+            if (!distribution.isHash()) {
+                if (keysType.equals(KeysType.UNIQUE_KEYS)) {
+                    throw new AnalysisException(
+                            "Should not be distributed by random when keys 
type is unique");
+                } else if (keysType.equals(KeysType.AGG_KEYS)) {
+                    for (ColumnDefinition c : columns) {
+                        if (AggregateType.REPLACE.equals(c.getAggType())
+                                || 
AggregateType.REPLACE_IF_NOT_NULL.equals(c.getAggType())) {
+                            throw new AnalysisException(
+                                    "Should not be distributed by random when 
keys type is agg"
+                                            + "and column is in replace, [" + 
c.getName()
+                                            + "] is invalid");
+                        }
+                    }
+                }
+            }
+        } else {
+            // mysql, broker and hive do not need key desc
+            if (keysType != null) {
+                throw new AnalysisException(
+                        "Create " + engineName + " table should not contain 
keys desc");
+            }
+
+            for (ColumnDefinition columnDef : columns) {
+                columnDef.setIsKey(true);
             }
         }
-        if (Config.enable_hidden_version_column_by_default && 
keysType.equals(KeysType.UNIQUE_KEYS)) {
-            if (isEnableMergeOnWrite) {
-                
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.NONE));
-            } else {
-                
columns.add(ColumnDefinition.newVersionColumnDefinition(AggregateType.REPLACE));
+
+        // validate column
+        try {
+            if (!engineName.equals("elasticsearch") && columns.isEmpty()) {
+                
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
             }
+        } catch (Exception e) {
+            throw new AnalysisException(e.getMessage(), e.getCause());
         }
 
-        // analyze column
         final boolean finalEnableMergeOnWrite = isEnableMergeOnWrite;
         Set<String> keysSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         keysSet.addAll(keys);
-        columns.forEach(c -> c.validate(keysSet, finalEnableMergeOnWrite, 
keysType));
-
-        // analyze partitions
-        Map<String, ColumnDefinition> columnMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-        columns.forEach(c -> columnMap.put(c.getName(), c));
-
-        if (partitions != null) {
-            partitionColumns.forEach(p -> {
-                if (!columnMap.containsKey(p)) {
-                    throw new AnalysisException(String.format("partition key 
%s is not exists", p));
+        columns.forEach(c -> c.validate(engineName.equals("olap"), keysSet, 
finalEnableMergeOnWrite,
+                keysType));
+
+        // validate index
+        if (!indexes.isEmpty()) {
+            Set<String> distinct = new 
TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+            Set<Pair<IndexDef.IndexType, List<String>>> distinctCol = new 
HashSet<>();
+
+            for (IndexDefinition indexDef : indexes) {
+                indexDef.validate();
+                if (!engineName.equalsIgnoreCase("olap")) {
+                    throw new AnalysisException(
+                            "index only support in olap engine at current 
version.");
                 }
-                validatePartitionColumn(columnMap.get(p), ctx);
-            });
-            if (!checkPartitionsTypes()) {
-                throw new AnalysisException("partitions types is invalid, 
expected FIXED or LESS in range partitions"
-                        + " and IN in list partitions");
-            }
-            Set<String> partitionNames = Sets.newHashSet();
-            for (PartitionDefinition partition : partitions) {
-                if (partition instanceof StepPartition) {
-                    continue;
-                }
-                String partitionName = partition.getPartitionName();
-                if (partitionNames.contains(partitionName)) {
-                    throw new AnalysisException("Duplicated named partition: " 
+ partitionName);
-                }
-                partitionNames.add(partitionName);
-            }
-            Set<String> partitionColumnSets = Sets.newHashSet();
-            List<String> duplicatesKeys = partitionColumns.stream()
-                    .filter(c -> !partitionColumnSets.add(c))
-                    .collect(Collectors.toList());
-            if (!duplicatesKeys.isEmpty()) {
-                throw new AnalysisException("Duplicated partition column " + 
duplicatesKeys.get(0));
-            }
-            partitions.forEach(p -> {
-                p.setPartitionTypes(partitionColumns.stream().map(s -> 
columnMap.get(s).getType())
-                        .collect(Collectors.toList()));
-                p.validate(Maps.newHashMap(properties));
-            });
-        }
-
-        // analyze distribution descriptor
-        distribution.updateCols(columns.get(0).getName());
-        distribution.validate(columnMap, keysType);
-
-        // analyze key set.
-        if (!distribution.isHash()) {
-            if (keysType.equals(KeysType.UNIQUE_KEYS)) {
-                throw new AnalysisException("Should not be distributed by 
random when keys type is unique");
-            } else if (keysType.equals(KeysType.AGG_KEYS)) {
-                for (ColumnDefinition c : columns) {
-                    if (AggregateType.REPLACE.equals(c.getAggType())
-                            || 
AggregateType.REPLACE_IF_NOT_NULL.equals(c.getAggType())) {
-                        throw new AnalysisException("Should not be distributed 
by random when keys type is agg"
-                                + "and column is in replace, [" + c.getName() 
+ "] is invalid");
+                for (String indexColName : indexDef.getColumnNames()) {
+                    boolean found = false;
+                    for (ColumnDefinition column : columns) {
+                        if (column.getName().equalsIgnoreCase(indexColName)) {
+                            indexDef.checkColumn(column, keysType, 
isEnableMergeOnWrite);
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        throw new AnalysisException(
+                                "Column does not exist in table. invalid 
column: " + indexColName);
                     }
                 }
+                distinct.add(indexDef.getIndexName());
+                distinctCol.add(Pair.of(indexDef.getIndexType(), 
indexDef.getColumnNames().stream()
+                        
.map(String::toUpperCase).collect(Collectors.toList())));
+            }
+            if (distinct.size() != indexes.size()) {
+                throw new AnalysisException("index name must be unique.");
+            }
+            if (distinctCol.size() != indexes.size()) {
+                throw new AnalysisException(
+                        "same index columns have multiple same type index is 
not allowed.");
             }
         }
     }
 
     public void validateCreateTableAsSelect(List<ColumnDefinition> columns, 
ConnectContext ctx) {
         this.columns = Utils.copyRequiredMutableList(columns);
+        // bucket num is hard coded 10 to be consistent with legacy planner
+        this.distribution = new DistributionDescriptor(true, false, 10,
+                Lists.newArrayList(columns.get(0).getName()));
         validate(ctx);
     }
 
@@ -392,18 +600,48 @@ public class CreateTableInfo {
      */
     private boolean checkPartitionsTypes() {
         if (partitionType.equalsIgnoreCase(PartitionType.RANGE.name())) {
-            if (partitions.stream().allMatch(p -> p instanceof StepPartition)) 
{
+            if (partitions.stream().allMatch(
+                    p -> p instanceof StepPartition || p instanceof 
FixedRangePartition)) {
                 return true;
             }
-            return partitions.stream().allMatch(p -> (p instanceof 
LessThanPartition)
-                    || (p instanceof FixedRangePartition));
+            return partitions.stream().allMatch(
+                    p -> (p instanceof LessThanPartition) || (p instanceof 
FixedRangePartition));
         }
         return partitionType.equalsIgnoreCase(PartitionType.LIST.name())
                 && partitions.stream().allMatch(p -> p instanceof InPartition);
     }
 
+    private void checkEngineName() {
+        if (engineName.equals("mysql") || engineName.equals("odbc") || 
engineName.equals("broker")
+                || engineName.equals("elasticsearch") || 
engineName.equals("hive")
+                || engineName.equals("jdbc")) {
+            if (!isExternal) {
+                // this is for compatibility
+                isExternal = true;
+            }
+        } else {
+            if (isExternal) {
+                throw new AnalysisException(
+                        "Do not support external table with engine name = 
olap");
+            } else if (!engineName.equals("olap")) {
+                throw new AnalysisException(
+                        "Do not support table with engine name = " + 
engineName);
+            }
+        }
+
+        if (!Config.enable_odbc_mysql_broker_table && 
(engineName.equals("odbc")
+                || engineName.equals("mysql") || engineName.equals("broker"))) 
{
+            throw new AnalysisException("odbc, mysql and broker table is no 
longer supported."
+                    + " For odbc and mysql external table, use jdbc table or 
jdbc catalog instead."
+                    + " For broker table, use table valued function instead."
+                    + ". Or you can temporarily set 
'disable_odbc_mysql_broker_table=false'"
+                    + " in fe.conf to reopen this feature.");
+        }
+    }
+
     private void validatePartitionColumn(ColumnDefinition column, 
ConnectContext ctx) {
-        if (!column.isKey() && 
(!column.getAggType().equals(AggregateType.NONE) || isEnableMergeOnWrite)) {
+        if (!column.isKey()
+                && (!column.getAggType().equals(AggregateType.NONE) || 
isEnableMergeOnWrite)) {
             throw new AnalysisException("The partition column could not be 
aggregated column");
         }
         if (column.getType().isFloatLikeType()) {
@@ -425,48 +663,228 @@ public class CreateTableInfo {
         }
     }
 
+    // if auto bucket auto bucket enable, rewrite distribution bucket num &&
+    // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
+    private static Map<String, String> maybeRewriteByAutoBucket(
+            DistributionDescriptor distributionDesc, Map<String, String> 
properties) {
+        if (distributionDesc == null || !distributionDesc.isAutoBucket()) {
+            return properties;
+        }
+
+        // auto bucket is enable
+        Map<String, String> newProperties = properties;
+        if (newProperties == null) {
+            newProperties = new HashMap<String, String>();
+        }
+        newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true");
+
+        try {
+            if 
(!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE))
 {
+                
distributionDesc.updateBucketNum(FeConstants.default_bucket_num);
+            } else {
+                long partitionSize = ParseUtil.analyzeDataVolumn(
+                        
newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
+                
distributionDesc.updateBucketNum(AutoBucketUtils.getBucketsNum(partitionSize,
+                        Config.autobucket_min_buckets));
+            }
+        } catch (Exception e) {
+            throw new AnalysisException(e.getMessage(), e.getCause());
+        }
+        return newProperties;
+    }
+
+    private void validateKeyColumns() {
+        if (keysType == null) {
+            throw new AnalysisException("Keys type is null.");
+        }
+
+        if (keys.isEmpty() && keysType != KeysType.DUP_KEYS) {
+            throw new AnalysisException("The number of key columns is 0.");
+        }
+
+        if (keys.size() > columns.size()) {
+            throw new AnalysisException(
+                    "The number of key columns should be less than the number 
of columns.");
+        }
+
+        if (!clusterKeysColumnNames.isEmpty()) {
+            if (keysType != KeysType.UNIQUE_KEYS) {
+                throw new AnalysisException("Cluster keys only support unique 
keys table.");
+            }
+            clusterKeysColumnIds = Lists.newArrayList();
+            for (int i = 0; i < clusterKeysColumnNames.size(); ++i) {
+                String name = clusterKeysColumnNames.get(i);
+                // check if key is duplicate
+                for (int j = 0; j < i; j++) {
+                    if (clusterKeysColumnNames.get(j).equalsIgnoreCase(name)) {
+                        throw new AnalysisException("Duplicate cluster key 
column[" + name + "].");
+                    }
+                }
+                // check if key exists and generate key column ids
+                for (int j = 0; j < columns.size(); j++) {
+                    if (columns.get(j).getName().equalsIgnoreCase(name)) {
+                        
columns.get(j).setClusterKeyId(clusterKeysColumnIds.size());
+                        clusterKeysColumnIds.add(j);
+                        break;
+                    }
+                    if (j == columns.size() - 1) {
+                        throw new AnalysisException(
+                                "Key cluster column[" + name + "] doesn't 
exist.");
+                    }
+                }
+            }
+
+            int minKeySize = keys.size() < clusterKeysColumnNames.size() ? 
keys.size()
+                    : clusterKeysColumnNames.size();
+            boolean sameKey = true;
+            for (int i = 0; i < minKeySize; ++i) {
+                if 
(!keys.get(i).equalsIgnoreCase(clusterKeysColumnNames.get(i))) {
+                    sameKey = false;
+                    break;
+                }
+            }
+            if (sameKey) {
+                throw new AnalysisException("Unique keys and cluster keys 
should be different.");
+            }
+        }
+
+        for (int i = 0; i < keys.size(); ++i) {
+            String name = columns.get(i).getName();
+            if (!keys.get(i).equalsIgnoreCase(name)) {
+                String keyName = keys.get(i);
+                if (columns.stream().noneMatch(col -> 
col.getName().equalsIgnoreCase(keyName))) {
+                    throw new AnalysisException("Key column[" + keyName + "] 
doesn't exist.");
+                }
+                throw new AnalysisException("Key columns should be a ordered 
prefix of the schema."
+                        + " KeyColumns[" + i + "] (starts from zero) is " + 
keyName + ", "
+                        + "but corresponding column is " + name + " in the 
previous "
+                        + "columns declaration.");
+            }
+
+            if (columns.get(i).getAggType() != null) {
+                throw new AnalysisException(
+                        "Key column[" + name + "] should not specify aggregate 
type.");
+            }
+        }
+
+        // for olap table
+        for (int i = keys.size(); i < columns.size(); ++i) {
+            if (keysType == KeysType.AGG_KEYS) {
+                if (columns.get(i).getAggType() == null) {
+                    throw new AnalysisException(
+                            keysType.name() + " table should specify aggregate 
type for "
+                                    + "non-key column[" + 
columns.get(i).getName() + "]");
+                }
+            } else {
+                if (columns.get(i).getAggType() != null) {
+                    throw new AnalysisException(
+                            keysType.name() + " table should not specify 
aggregate type for "
+                                    + "non-key column[" + 
columns.get(i).getName() + "]");
+                }
+            }
+        }
+    }
+
     /**
      * translate to catalog create table stmt
      */
     public CreateTableStmt translateToLegacyStmt() {
-        List<Column> catalogColumns = columns.stream()
-                .map(ColumnDefinition::translateToCatalogStyle)
-                .collect(Collectors.toList());
-
-        List<Index> catalogIndexes = 
indexes.stream().map(IndexDefinition::translateToCatalogStyle)
-                .collect(Collectors.toList());
+        if (isAutoPartition) {
+            partitionColumns = ExpressionUtils
+                    .collectAll(autoPartitionExprs, 
UnboundSlot.class::isInstance).stream()
+                    .map(slot -> ((UnboundSlot) 
slot).getName()).collect(Collectors.toList());
+        }
         PartitionDesc partitionDesc = null;
-        if (partitions != null) {
-            List<AllPartitionDesc> partitionDescs = partitions.stream()
-                    
.map(PartitionDefinition::translateToCatalogStyle).collect(Collectors.toList());
+        if (partitionColumns != null || isAutoPartition) {
+            List<AllPartitionDesc> partitionDescs =
+                    partitions != null
+                            ? 
partitions.stream().map(PartitionDefinition::translateToCatalogStyle)
+                                    .collect(Collectors.toList())
+                            : null;
             try {
                 if (partitionType.equals(PartitionType.RANGE.name())) {
-                    partitionDesc = new RangePartitionDesc(partitionColumns, 
partitionDescs);
+                    if (isAutoPartition) {
+                        partitionDesc = new RangePartitionDesc(
+                                
convertToLegacyAutoPartitionExprs(autoPartitionExprs),
+                                partitionColumns, partitionDescs);
+                    } else {
+                        partitionDesc = new 
RangePartitionDesc(partitionColumns, partitionDescs);
+                    }
                 } else {
-                    partitionDesc = new ListPartitionDesc(partitionColumns, 
partitionDescs);
+                    if (isAutoPartition) {
+                        partitionDesc = new ListPartitionDesc(
+                                
convertToLegacyAutoPartitionExprs(autoPartitionExprs),
+                                partitionColumns, partitionDescs);
+                    } else {
+                        partitionDesc = new 
ListPartitionDesc(partitionColumns, partitionDescs);
+                    }
                 }
             } catch (Exception e) {
                 throw new AnalysisException(e.getMessage(), e.getCause());
             }
         }
+
         List<AlterClause> addRollups = Lists.newArrayList();
-        if (rollups != null) {
-            addRollups.addAll(rollups.stream()
-                    .map(RollupDefinition::translateToCatalogStyle)
+        if (!rollups.isEmpty()) {
+            
addRollups.addAll(rollups.stream().map(RollupDefinition::translateToCatalogStyle)
                     .collect(Collectors.toList()));
         }
-        return new CreateTableStmt(ifNotExists, false,
+
+        List<Column> catalogColumns = columns.stream()
+                
.map(ColumnDefinition::translateToCatalogStyle).collect(Collectors.toList());
+
+        List<Index> catalogIndexes = 
indexes.stream().map(IndexDefinition::translateToCatalogStyle)
+                .collect(Collectors.toList());
+        DistributionDesc distributionDesc =
+                distribution != null ? distribution.translateToCatalogStyle() 
: null;
+
+        // TODO should move this code to validate function
+        // EsUtil.analyzePartitionAndDistributionDesc only accept 
DistributionDesc and PartitionDesc
+        if (engineName.equals("elasticsearch")) {
+            try {
+                EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, 
distributionDesc);
+            } catch (Exception e) {
+                throw new AnalysisException(e.getMessage(), e.getCause());
+            }
+        } else if (!engineName.equals("olap")) {
+            if (partitionDesc != null || distributionDesc != null) {
+                throw new AnalysisException("Create " + engineName
+                        + " table should not contain partition or distribution 
desc");
+            }
+        }
+
+        return new CreateTableStmt(ifNotExists, isExternal,
                 new 
TableName(Env.getCurrentEnv().getCurrentCatalog().getName(), dbName, tableName),
-                catalogColumns,
-                catalogIndexes,
-                engineName,
-                new KeysDesc(keysType, keys),
-                partitionDesc,
-                distribution.translateToCatalogStyle(),
-                Maps.newHashMap(properties),
-                null,
-                comment,
-                addRollups,
-                null);
+                catalogColumns, catalogIndexes, engineName,
+                new KeysDesc(keysType, keys, clusterKeysColumnNames, 
clusterKeysColumnIds),
+                partitionDesc, distributionDesc, Maps.newHashMap(properties), 
extProperties,
+                comment, addRollups, clusterName, null);
+    }
+
+    private static ArrayList<Expr> 
convertToLegacyAutoPartitionExprs(List<Expression> expressions) {
+        return new ArrayList<>(expressions.stream().map(expression -> {
+            if (expression instanceof UnboundSlot) {
+                return new SlotRef(null, ((UnboundSlot) expression).getName());
+            } else if (expression instanceof UnboundFunction) {
+                UnboundFunction function = (UnboundFunction) expression;
+                return new FunctionCallExpr(function.getName(),
+                        new 
FunctionParams(convertToLegacyArguments(function.children())));
+            } else {
+                throw new AnalysisException(
+                        "unsupported auto partition expr " + 
expression.toString());
+            }
+        }).collect(Collectors.toList()));
+    }
+
+    private static List<Expr> convertToLegacyArguments(List<Expression> 
children) {
+        return children.stream().map(child -> {
+            if (child instanceof UnboundSlot) {
+                return new SlotRef(null, ((UnboundSlot) child).getName());
+            } else if (child instanceof Literal) {
+                return new StringLiteral(((Literal) child).getStringValue());
+            } else {
+                throw new AnalysisException("unsupported argument " + 
child.toString());
+            }
+        }).collect(Collectors.toList());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DistributionDescriptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DistributionDescriptor.java
index 9f4484d0ab6..ba8b587812e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DistributionDescriptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DistributionDescriptor.java
@@ -39,7 +39,7 @@ import java.util.Set;
 public class DistributionDescriptor {
     private final boolean isHash;
     private final boolean isAutoBucket;
-    private final int bucketNum;
+    private int bucketNum;
     private List<String> cols;
 
     public DistributionDescriptor(boolean isHash, boolean isAutoBucket, int 
bucketNum, List<String> cols) {
@@ -53,6 +53,10 @@ public class DistributionDescriptor {
         return isHash;
     }
 
+    public boolean isAutoBucket() {
+        return isAutoBucket;
+    }
+
     public void updateCols(String col) {
         Objects.requireNonNull(col, "col should not be null");
         if (CollectionUtils.isEmpty(cols)) {
@@ -60,6 +64,10 @@ public class DistributionDescriptor {
         }
     }
 
+    public void updateBucketNum(int bucketNum) {
+        this.bucketNum = bucketNum;
+    }
+
     /**
      * analyze distribution descriptor
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/FixedRangePartition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/FixedRangePartition.java
index a1d74d20691..7655c6b3285 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/FixedRangePartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/FixedRangePartition.java
@@ -21,10 +21,10 @@ import org.apache.doris.analysis.PartitionKeyDesc;
 import org.apache.doris.analysis.PartitionValue;
 import org.apache.doris.analysis.SinglePartitionDesc;
 import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.types.DataType;
 
 import com.google.common.collect.Maps;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -46,9 +46,18 @@ public class FixedRangePartition extends PartitionDefinition 
{
     @Override
     public void validate(Map<String, String> properties) {
         super.validate(properties);
-        final DataType type = partitionTypes.get(0);
-        lowerBounds = lowerBounds.stream().map(e -> 
e.castTo(type)).collect(Collectors.toList());
-        upperBounds = upperBounds.stream().map(e -> 
e.castTo(type)).collect(Collectors.toList());
+        List<Expression> newLowerBounds = new ArrayList<>();
+        List<Expression> newUpperBounds = new ArrayList<>();
+        for (int i = 0; i < partitionTypes.size(); ++i) {
+            if (i < lowerBounds.size()) {
+                
newLowerBounds.add(lowerBounds.get(i).castTo(partitionTypes.get(i)));
+            }
+            if (i < upperBounds.size()) {
+                
newUpperBounds.add(upperBounds.get(i).castTo(partitionTypes.get(i)));
+            }
+        }
+        lowerBounds = newLowerBounds;
+        upperBounds = newUpperBounds;
     }
 
     public String getPartitionName() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/InPartition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/InPartition.java
index 84821d99212..30dfcb9efce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/InPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/InPartition.java
@@ -26,6 +26,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 
 import com.google.common.collect.Maps;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -57,6 +58,10 @@ public class InPartition extends PartitionDefinition {
 
     @Override
     public AllPartitionDesc translateToCatalogStyle() {
+        if (values.isEmpty()) {
+            // add a empty list for default value process
+            values.add(new ArrayList<>());
+        }
         List<List<PartitionValue>> catalogValues = values.stream().map(l -> 
l.stream()
                 .map(this::toLegacyPartitionValueStmt)
                 .collect(Collectors.toList())).collect(Collectors.toList());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java
index 84ce1a6d6cb..dcc26e1248f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/IndexDefinition.java
@@ -17,12 +17,25 @@
 
 package org.apache.doris.nereids.trees.plans.commands.info;
 
+import org.apache.doris.analysis.IndexDef;
 import org.apache.doris.analysis.IndexDef.IndexType;
+import org.apache.doris.analysis.InvertedIndexUtil;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.types.ArrayType;
+import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.util.Utils;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
 
 /**
  * index definition
@@ -30,21 +43,179 @@ import java.util.List;
 public class IndexDefinition {
     private final String name;
     private final List<String> cols;
-    private final boolean isUseBitmap;
     private final String comment;
+    // add the column name of olapTable column into caseSensitivityColumns
+    // instead of the column name which from DorisParser
+    private List<String> caseSensitivityCols = Lists.newArrayList();
+    private IndexType indexType;
+    private Map<String, String> properties = new HashMap<>();
+    private boolean isBuildDeferred = false;
 
-    public IndexDefinition(String name, List<String> cols, boolean 
isUseBitmap, String comment) {
+    /**
+     * constructor for IndexDefinition
+     */
+    public IndexDefinition(String name, List<String> cols, String 
indexTypeName,
+            Map<String, String> properties, String comment) {
         this.name = name;
         this.cols = Utils.copyRequiredList(cols);
-        this.isUseBitmap = isUseBitmap;
+        this.indexType = IndexType.BITMAP;
+        if (indexTypeName != null) {
+            switch (indexTypeName) {
+                case "BITMAP": {
+                    this.indexType = IndexType.BITMAP;
+                    break;
+                }
+                case "INVERTED": {
+                    this.indexType = IndexType.INVERTED;
+                    break;
+                }
+                case "NGRAM_BF": {
+                    this.indexType = IndexType.NGRAM_BF;
+                    break;
+                }
+                default:
+                    throw new AnalysisException("unknown index type " + 
indexTypeName);
+            }
+        }
+
+        if (properties != null) {
+            this.properties.putAll(properties);
+        }
+
+        if (indexType == IndexType.NGRAM_BF) {
+            this.properties.putIfAbsent(IndexDef.NGRAM_SIZE_KEY, 
IndexDef.DEFAULT_NGRAM_SIZE);
+            this.properties.putIfAbsent(IndexDef.NGRAM_BF_SIZE_KEY, 
IndexDef.DEFAULT_NGRAM_BF_SIZE);
+        }
+
         this.comment = comment;
     }
 
+    /**
+     * checkColumn
+     */
+    public void checkColumn(ColumnDefinition column, KeysType keysType,
+            boolean enableUniqueKeyMergeOnWrite) throws AnalysisException {
+        if (indexType == IndexType.BITMAP || indexType == IndexType.INVERTED
+                || indexType == IndexType.BLOOMFILTER || indexType == 
IndexType.NGRAM_BF) {
+            String indexColName = column.getName();
+            caseSensitivityCols.add(indexColName);
+            DataType colType = column.getType();
+            if (indexType == IndexType.INVERTED && colType.isArrayType()) {
+                colType = ((ArrayType) colType).getItemType();
+            }
+            if (!(colType.isDateLikeType() || colType.isDecimalLikeType()
+                    || colType.isIntegralType() || colType.isStringLikeType()
+                    || colType.isBooleanType())) {
+                // TODO add colType.isVariantType() and colType.isAggState()
+                throw new AnalysisException(colType + " is not supported in " 
+ indexType.toString()
+                        + " index. " + "invalid column: " + indexColName);
+            } else if (indexType == IndexType.INVERTED && ((keysType == 
KeysType.AGG_KEYS
+                    && !column.isKey())
+                    || (keysType == KeysType.UNIQUE_KEYS && 
!enableUniqueKeyMergeOnWrite))) {
+                throw new AnalysisException(indexType.toString()
+                        + " index only used in columns of DUP_KEYS table"
+                        + " or UNIQUE_KEYS table with merge_on_write enabled"
+                        + " or key columns of AGG_KEYS table. invalid column: 
" + indexColName);
+            } else if (keysType == KeysType.AGG_KEYS && !column.isKey()
+                    && indexType != IndexType.INVERTED) {
+                throw new AnalysisException(indexType.toString()
+                        + " index only used in columns of DUP_KEYS/UNIQUE_KEYS 
table or key columns of"
+                        + " AGG_KEYS table. invalid column: " + indexColName);
+            }
+
+            if (indexType == IndexType.INVERTED) {
+                try {
+                    InvertedIndexUtil.checkInvertedIndexParser(indexColName,
+                            colType.toCatalogDataType().getPrimitiveType(), 
properties);
+                } catch (Exception ex) {
+                    throw new AnalysisException("invalid INVERTED index:" + 
ex.getMessage(), ex);
+                }
+            } else if (indexType == IndexType.NGRAM_BF) {
+                if (!colType.isStringLikeType()) {
+                    throw new AnalysisException(colType + " is not supported 
in ngram_bf index. "
+                            + "invalid column: " + indexColName);
+                } else if ((keysType == KeysType.AGG_KEYS && !column.isKey())) 
{
+                    throw new AnalysisException(
+                            "ngram_bf index only used in columns of 
DUP_KEYS/UNIQUE_KEYS table or key columns of"
+                                    + " AGG_KEYS table. invalid column: " + 
indexColName);
+                }
+                if (properties.size() != 2) {
+                    throw new AnalysisException(
+                            "ngram_bf index should have gram_size and bf_size 
properties");
+                }
+                try {
+                    int ngramSize = 
Integer.parseInt(properties.get(IndexDef.NGRAM_SIZE_KEY));
+                    int bfSize = 
Integer.parseInt(properties.get(IndexDef.NGRAM_BF_SIZE_KEY));
+                    if (ngramSize > 256 || ngramSize < 1) {
+                        throw new AnalysisException(
+                                "gram_size should be integer and less than 
256");
+                    }
+                    if (bfSize > 65536 || bfSize < 64) {
+                        throw new AnalysisException(
+                                "bf_size should be integer and between 64 and 
65536");
+                    }
+                } catch (NumberFormatException e) {
+                    throw new AnalysisException("invalid ngram properties:" + 
e.getMessage(), e);
+                }
+            }
+        } else {
+            throw new AnalysisException("Unsupported index type: " + 
indexType);
+        }
+    }
+
+    /**
+     * validate
+     */
     public void validate() {
+        if (isBuildDeferred && indexType == IndexDef.IndexType.INVERTED) {
+            if (Strings.isNullOrEmpty(name)) {
+                throw new AnalysisException("index name cannot be blank.");
+            }
+            if (name.length() > 128) {
+                throw new AnalysisException(
+                        "index name too long, the index name length at most is 
128.");
+            }
+            return;
+        }
+
+        if (indexType == IndexDef.IndexType.BITMAP || indexType == 
IndexDef.IndexType.INVERTED) {
+            if (cols == null || cols.size() != 1) {
+                throw new AnalysisException(
+                        indexType.toString() + " index can only apply to a 
single column.");
+            }
+            if (Strings.isNullOrEmpty(name)) {
+                throw new AnalysisException("index name cannot be blank.");
+            }
+            if (name.length() > 64) {
+                throw new AnalysisException(
+                        "index name too long, the index name length at most is 
64.");
+            }
+            TreeSet<String> distinct = new 
TreeSet<>(String.CASE_INSENSITIVE_ORDER);
+            distinct.addAll(cols);
+            if (cols.size() != distinct.size()) {
+                throw new AnalysisException("columns of index has 
duplicated.");
+            }
+        }
+    }
+
+    public List<String> getColumnNames() {
+        if (!caseSensitivityCols.isEmpty()) {
+            return ImmutableList.copyOf(caseSensitivityCols);
+        } else {
+            return ImmutableList.copyOf(cols);
+        }
+    }
+
+    public String getIndexName() {
+        return name;
+    }
+
+    public IndexType getIndexType() {
+        return indexType;
     }
 
     public Index translateToCatalogStyle() {
-        return new Index(Env.getCurrentEnv().getNextId(), name, cols, 
isUseBitmap ? IndexType.BITMAP : null, null,
+        return new Index(Env.getCurrentEnv().getNextId(), name, cols, 
indexType, properties,
                 comment);
     }
 }
diff --git 
a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out 
b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
index 9de9e5ef01e..dc85e705e53 100644
--- a/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
+++ b/regression-test/data/external_table_p0/jdbc/test_doris_jdbc_catalog.out
@@ -96,21 +96,21 @@ json_col    JSON    Yes     false   \N      NONE
 
 -- !desc_ctas_arr --
 int_col        INT     Yes     true    \N      
-arr_bool_col   ARRAY<BOOLEAN>  Yes     false   []      NONE
-arr_tinyint_col        ARRAY<TINYINT>  Yes     false   []      NONE
-arr_smallint_col       ARRAY<SMALLINT> Yes     false   []      NONE
-arr_int_col    ARRAY<INT>      Yes     false   []      NONE
-arr_bigint_col ARRAY<BIGINT>   Yes     false   []      NONE
-arr_largeint_col       ARRAY<LARGEINT> Yes     false   []      NONE
-arr_float_col  ARRAY<FLOAT>    Yes     false   []      NONE
-arr_double_col ARRAY<DOUBLE>   Yes     false   []      NONE
-arr_decimal1_col       ARRAY<DECIMALV3(10, 5)> Yes     false   []      NONE
-arr_decimal2_col       ARRAY<DECIMALV3(30, 10)>        Yes     false   []      
NONE
-arr_date_col   ARRAY<DATEV2>   Yes     false   []      NONE
-arr_datetime_col       ARRAY<DATETIMEV2(3)>    Yes     false   []      NONE
-arr_char_col   ARRAY<CHAR(10)> Yes     false   []      NONE
-arr_varchar_col        ARRAY<VARCHAR(10)>      Yes     false   []      NONE
-arr_string_col ARRAY<TEXT>     Yes     false   []      NONE
+arr_bool_col   ARRAY<BOOLEAN>  Yes     false   \N      NONE
+arr_tinyint_col        ARRAY<TINYINT>  Yes     false   \N      NONE
+arr_smallint_col       ARRAY<SMALLINT> Yes     false   \N      NONE
+arr_int_col    ARRAY<INT>      Yes     false   \N      NONE
+arr_bigint_col ARRAY<BIGINT>   Yes     false   \N      NONE
+arr_largeint_col       ARRAY<LARGEINT> Yes     false   \N      NONE
+arr_float_col  ARRAY<FLOAT>    Yes     false   \N      NONE
+arr_double_col ARRAY<DOUBLE>   Yes     false   \N      NONE
+arr_decimal1_col       ARRAY<DECIMALV3(10, 5)> Yes     false   \N      NONE
+arr_decimal2_col       ARRAY<DECIMALV3(30, 10)>        Yes     false   \N      
NONE
+arr_date_col   ARRAY<DATEV2>   Yes     false   \N      NONE
+arr_datetime_col       ARRAY<DATETIMEV2(3)>    Yes     false   \N      NONE
+arr_char_col   ARRAY<CHAR(10)> Yes     false   \N      NONE
+arr_varchar_col        ARRAY<VARCHAR(10)>      Yes     false   \N      NONE
+arr_string_col ARRAY<TEXT>     Yes     false   \N      NONE
 
 -- !query_ctas_base --
 \N     \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N
diff --git a/regression-test/data/insert_p0/insert_with_null.out 
b/regression-test/data/insert_p0/insert_with_null.out
index fa56f23bc36..7ee81a10bc4 100644
--- a/regression-test/data/insert_p0/insert_with_null.out
+++ b/regression-test/data/insert_p0/insert_with_null.out
@@ -5,12 +5,13 @@
 4      null    []
 5      NULL    ["k5, k6"]
 6      \N      ["k7", "k8"]
-7      abc     []
+7      abc     \N
 
 -- !sql --
 6      \N      ["k7", "k8"]
 
 -- !sql --
+7      abc     \N
 
 -- !sql --
 1      "b"     ["k1=v1, k2=v2"]
@@ -30,12 +31,13 @@
 4      null    []
 5      NULL    ["k5, k6"]
 6      \N      ["k7", "k8"]
-7      abc     []
+7      abc     \N
 
 -- !sql --
 6      \N      ["k7", "k8"]
 
 -- !sql --
+7      abc     \N
 
 -- !sql --
 1      "b"     ["k1=v1, k2=v2"]
@@ -43,10 +45,11 @@
 4      null    []
 5      NULL    ["k5, k6"]
 6      \N      ["k7", "k8"]
-7      abc     []
+7      abc     \N
 
 -- !sql --
 6      \N      ["k7", "k8"]
 
 -- !sql --
+7      abc     \N
 
diff --git 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy
index 0e5eac531f8..20315e8af4d 100644
--- 
a/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy
+++ 
b/regression-test/suites/data_model_p0/unique/test_unique_table_auto_inc.groovy
@@ -304,7 +304,7 @@ suite("test_unique_table_auto_inc") {
     sql """
         CREATE TABLE IF NOT EXISTS `${table8}` (
           `id` BIGINT NOT NULL AUTO_INCREMENT COMMENT "用户 ID",
-          `name` varchar(65533) NOT NULL COMMENT "用户姓名",
+          `name` varchar(65533) NOT NULL COMMENT "用户姓名"
         ) ENGINE=OLAP
         UNIQUE KEY(`id`)
         COMMENT "OLAP"
diff --git a/regression-test/suites/ddl_p0/test_create_table_like.groovy 
b/regression-test/suites/ddl_p0/test_create_table_like.groovy
index 5ee66fba8b4..a1154c05a43 100644
--- a/regression-test/suites/ddl_p0/test_create_table_like.groovy
+++ b/regression-test/suites/ddl_p0/test_create_table_like.groovy
@@ -30,7 +30,7 @@ suite("test_create_table_like") {
             `timestamp1` decimal(10, 0) null comment "c1",
             `timestamp2` decimal(10, 1) null comment "c2",
             `timestamp3` decimalv3(10, 0) null comment "c3",
-            `timestamp4` decimalv3(10, 1) null comment "c4",
+            `timestamp4` decimalv3(10, 1) null comment "c4"
         )
         DISTRIBUTED BY HASH(`id`) BUCKETS 1
         PROPERTIES ('replication_num' = '1')"""
diff --git a/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy 
b/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy
index 5457c120af4..4f09680ba53 100644
--- a/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy
+++ b/regression-test/suites/external_table_p0/tvf/test_s3_tvf.groovy
@@ -37,7 +37,7 @@ suite("test_s3_tvf", "p0") {
         CREATE TABLE IF NOT EXISTS ${table_name} (
             `user_id` LARGEINT NOT NULL COMMENT "用户id",
             `name` STRING COMMENT "用户名称",
-            `age` INT COMMENT "用户年龄",
+            `age` INT COMMENT "用户年龄"
             )
             DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
         """
diff --git a/regression-test/suites/index_p0/test_index_meta.groovy 
b/regression-test/suites/index_p0/test_index_meta.groovy
index abec7ce2cd6..2a6f75870c2 100644
--- a/regression-test/suites/index_p0/test_index_meta.groovy
+++ b/regression-test/suites/index_p0/test_index_meta.groovy
@@ -65,12 +65,12 @@ suite("index_meta", "p0") {
     assertEquals(show_result[0][2], "idx_id")
     assertEquals(show_result[0][4], "id")
     assertEquals(show_result[0][10], "BITMAP")
-    assertEquals(show_result[0][11], "index for id")
+    assertEquals(show_result[0][11], "'index for id'")
     assertEquals(show_result[0][12], "")
     assertEquals(show_result[1][2], "idx_name")
     assertEquals(show_result[1][4], "name")
     assertEquals(show_result[1][10], "INVERTED")
-    assertEquals(show_result[1][11], "index for name")
+    assertEquals(show_result[1][11], "'index for name'")
     assertEquals(show_result[1][12], "(\"parser\" = \"none\")")
 
     // add index on column description
@@ -84,12 +84,12 @@ suite("index_meta", "p0") {
     assertEquals(show_result[0][2], "idx_id")
     assertEquals(show_result[0][4], "id")
     assertEquals(show_result[0][10], "BITMAP")
-    assertEquals(show_result[0][11], "index for id")
+    assertEquals(show_result[0][11], "'index for id'")
     assertEquals(show_result[0][12], "")
     assertEquals(show_result[1][2], "idx_name")
     assertEquals(show_result[1][4], "name")
     assertEquals(show_result[1][10], "INVERTED")
-    assertEquals(show_result[1][11], "index for name")
+    assertEquals(show_result[1][11], "'index for name'")
     assertEquals(show_result[1][12], "(\"parser\" = \"none\")")
     assertEquals(show_result[2][2], "idx_desc")
     assertEquals(show_result[2][4], "description")
@@ -108,7 +108,7 @@ suite("index_meta", "p0") {
     assertEquals(show_result[0][2], "idx_id")
     assertEquals(show_result[0][4], "id")
     assertEquals(show_result[0][10], "BITMAP")
-    assertEquals(show_result[0][11], "index for id")
+    assertEquals(show_result[0][11], "'index for id'")
     assertEquals(show_result[0][12], "")
     assertEquals(show_result[1][2], "idx_desc")
     assertEquals(show_result[1][4], "description")
@@ -127,7 +127,7 @@ suite("index_meta", "p0") {
     assertEquals(show_result[0][2], "idx_id")
     assertEquals(show_result[0][4], "id")
     assertEquals(show_result[0][10], "BITMAP")
-    assertEquals(show_result[0][11], "index for id")
+    assertEquals(show_result[0][11], "'index for id'")
     assertEquals(show_result[0][12], "")
     assertEquals(show_result[1][2], "idx_desc")
     assertEquals(show_result[1][4], "description")
diff --git 
a/regression-test/suites/load_p0/stream_load/test_csv_split_line.groovy 
b/regression-test/suites/load_p0/stream_load/test_csv_split_line.groovy
index 47bd8c3bbc9..76f18317fb8 100644
--- a/regression-test/suites/load_p0/stream_load/test_csv_split_line.groovy
+++ b/regression-test/suites/load_p0/stream_load/test_csv_split_line.groovy
@@ -97,7 +97,7 @@ suite("test_csv_split_line", "p0") {
         a int ,
         b varchar(30),
         c int ,
-        d varchar(30),
+        d varchar(30)
     )
     DUPLICATE KEY(`a`)
     DISTRIBUTED BY HASH(`a`) BUCKETS 10
diff --git 
a/regression-test/suites/nereids_function_p0/cast_function/test_cast_map_function.groovy
 
b/regression-test/suites/nereids_function_p0/cast_function/test_cast_map_function.groovy
index e3a4ccdaecc..90893acd563 100644
--- 
a/regression-test/suites/nereids_function_p0/cast_function/test_cast_map_function.groovy
+++ 
b/regression-test/suites/nereids_function_p0/cast_function/test_cast_map_function.groovy
@@ -24,7 +24,7 @@ suite("test_cast_map_function", "query") {
     sql """
             CREATE TABLE IF NOT EXISTS ${tableName} (
               `k1` int(11) NULL COMMENT "",
-              `k2` Map<char(7), int(11)> NOT NULL COMMENT "",
+              `k2` Map<char(7), int(11)> NOT NULL COMMENT ""
             ) ENGINE=OLAP
             DUPLICATE KEY(`k1`)
             DISTRIBUTED BY HASH(`k1`) BUCKETS 1
diff --git 
a/regression-test/suites/nereids_p0/session_variable/test_default_limit.groovy 
b/regression-test/suites/nereids_p0/session_variable/test_default_limit.groovy
index 525dbbf1128..2854d87b8e3 100644
--- 
a/regression-test/suites/nereids_p0/session_variable/test_default_limit.groovy
+++ 
b/regression-test/suites/nereids_p0/session_variable/test_default_limit.groovy
@@ -26,7 +26,7 @@ suite('test_default_limit') {
         create table baseall (
             k0 int,
             k1 int,
-            k2 int,
+            k2 int
         )
         distributed by hash(k0) buckets 16
         properties(
@@ -38,7 +38,7 @@ suite('test_default_limit') {
         create table bigtable (
             k0 int,
             k1 int,
-            k2 int,
+            k2 int
         )
         distributed by hash(k0) buckets 16
         properties(
diff --git 
a/regression-test/suites/partition_p0/multi_partition/test_multi_column_partition.groovy
 
b/regression-test/suites/partition_p0/multi_partition/test_multi_column_partition.groovy
index f86c3757841..6a03e72624d 100644
--- 
a/regression-test/suites/partition_p0/multi_partition/test_multi_column_partition.groovy
+++ 
b/regression-test/suites/partition_p0/multi_partition/test_multi_column_partition.groovy
@@ -392,20 +392,6 @@ suite("test_multi_partition_key", "p0") {
             """,
             false
     )
-    test {
-        sql """
-            CREATE TABLE IF NOT EXISTS test_multi_col_ddd (
-                k1 TINYINT NOT NULL, 
-                k2 SMALLINT NOT NULL) 
-            PARTITION BY RANGE(k1,k2) ( 
-                  PARTITION partition_a VALUES [("-127","-127"), ("10", 
MAXVALUE)), 
-                  PARTITION partition_b VALUES [("10","100"), ("40","0")), 
-                  PARTITION partition_c VALUES [("126","126"), ("127")) )
-            DISTRIBUTED BY HASH(k1,k2) BUCKETS 1
-            PROPERTIES("replication_allocation" = "tag.location.default: 1")
-        """
-        exception "Not support MAXVALUE in multi partition range values"
-    }
     // add partition with range
     sql "ALTER TABLE test_multi_column_fixed_range_1 ADD PARTITION 
partition_add VALUES LESS THAN ('50','1000') "
     ret = sql "SHOW PARTITIONS FROM test_multi_column_fixed_range_1 WHERE 
PartitionName='partition_add'"
@@ -464,4 +450,20 @@ suite("test_multi_partition_key", "p0") {
     try_sql "drop table if exists test_multi_column_fixed_range_1"
     try_sql "drop table if exists test_multi_partition_key_2"
 
+    sql """set enable_nereids_planner=false"""
+    test {
+        sql """
+            CREATE TABLE IF NOT EXISTS test_multi_col_ddd (
+                k1 TINYINT NOT NULL, 
+                k2 SMALLINT NOT NULL) 
+            PARTITION BY RANGE(k1,k2) ( 
+                  PARTITION partition_a VALUES [("-127","-127"), ("10", 
MAXVALUE)), 
+                  PARTITION partition_b VALUES [("10","100"), ("40","0")), 
+                  PARTITION partition_c VALUES [("126","126"), ("127")) )
+            DISTRIBUTED BY HASH(k1,k2) BUCKETS 1
+            PROPERTIES("replication_allocation" = "tag.location.default: 1")
+        """
+        exception "Not support MAXVALUE in multi partition range values"
+    }
+
 }
diff --git a/regression-test/suites/point_query_p0/load.groovy 
b/regression-test/suites/point_query_p0/load.groovy
index ead83cdf57a..d5cf8074540 100644
--- a/regression-test/suites/point_query_p0/load.groovy
+++ b/regression-test/suites/point_query_p0/load.groovy
@@ -100,4 +100,23 @@ suite("test_point_query_load", "p0") {
           }
      }
     sql "INSERT INTO ${testTable} SELECT * from ${testTable}"
+
+    sql """set enable_nereids_planner=true;"""
+    explain {
+        sql("""SELECT
+                    t0.`c_int` as column_key,
+                    COUNT(1) as `count`
+                FROM
+                    (
+                        SELECT
+                            `c_int`
+                        FROM
+                            `tbl_scalar_types_dup`
+                        limit
+                            200000
+                    ) as `t0`
+                GROUP BY
+                    `t0`.`c_int`""")
+        notContains "(mv_${testTable})"
+    }
 }
diff --git 
a/regression-test/suites/query_p0/sql_functions/cast_function/test_cast_map_function.groovy
 
b/regression-test/suites/query_p0/sql_functions/cast_function/test_cast_map_function.groovy
index 021f8096b04..d412e0d8f37 100644
--- 
a/regression-test/suites/query_p0/sql_functions/cast_function/test_cast_map_function.groovy
+++ 
b/regression-test/suites/query_p0/sql_functions/cast_function/test_cast_map_function.groovy
@@ -24,7 +24,7 @@ suite("test_cast_map_function", "query") {
     sql """
             CREATE TABLE IF NOT EXISTS ${tableName} (
               `k1` int(11) NULL COMMENT "",
-              `k2` Map<char(7), int(11)> NOT NULL COMMENT "",
+              `k2` Map<char(7), int(11)> NOT NULL COMMENT ""
             ) ENGINE=OLAP
             DUPLICATE KEY(`k1`)
             DISTRIBUTED BY HASH(`k1`) BUCKETS 1
diff --git 
a/regression-test/suites/schema_change_p0/test_alter_table_drop_column.groovy 
b/regression-test/suites/schema_change_p0/test_alter_table_drop_column.groovy
index 433cbadcddf..024806c5b93 100644
--- 
a/regression-test/suites/schema_change_p0/test_alter_table_drop_column.groovy
+++ 
b/regression-test/suites/schema_change_p0/test_alter_table_drop_column.groovy
@@ -157,7 +157,7 @@ suite("test_alter_table_drop_column") {
             `siteid` INT DEFAULT '10',
             `citycode` SMALLINT,
             `username` VARCHAR(32) DEFAULT 'test',
-            `pv` BIGINT SUM DEFAULT '0'
+            `pv` BIGINT DEFAULT '0'
         )
         DUPLICATE KEY(`siteid`, `citycode`, `username`)
         DISTRIBUTED BY HASH(siteid) BUCKETS 1
diff --git 
a/regression-test/suites/schema_change_p0/test_alter_table_modify_column.groovy 
b/regression-test/suites/schema_change_p0/test_alter_table_modify_column.groovy
index 8df8e911229..405b28fefe3 100644
--- 
a/regression-test/suites/schema_change_p0/test_alter_table_modify_column.groovy
+++ 
b/regression-test/suites/schema_change_p0/test_alter_table_modify_column.groovy
@@ -139,9 +139,9 @@ suite("test_alter_table_modify_column") {
             `citycode` SMALLINT DEFAULT '10',
             `siteid` INT DEFAULT '10',
             `username` VARCHAR(32) DEFAULT 'test',
-            `pv` BIGINT SUM DEFAULT '0'
+            `pv` BIGINT DEFAULT '0'
         )
-        DUPLICATE KEY(`siteid`, `citycode`, `username`)
+        DUPLICATE KEY(`citycode`, `siteid`, `username`)
         DISTRIBUTED BY HASH(siteid) BUCKETS 1
         PROPERTIES (
             "replication_num" = "1"
diff --git 
a/regression-test/suites/unique_with_mow_p0/cluster_key/test_create_table.groovy
 
b/regression-test/suites/unique_with_mow_p0/cluster_key/test_create_table.groovy
index 91c2bc6ba6e..9c515efeb72 100644
--- 
a/regression-test/suites/unique_with_mow_p0/cluster_key/test_create_table.groovy
+++ 
b/regression-test/suites/unique_with_mow_p0/cluster_key/test_create_table.groovy
@@ -22,24 +22,6 @@ suite("test_create_table") {
         try_sql("DROP TABLE IF EXISTS ${tableName}")
     }
 
-    // duplicate table with cluster keys
-    test {
-        sql """
-            CREATE TABLE `$tableName` (
-                    `c_custkey` int(11) NOT NULL COMMENT "",
-                    `c_name` varchar(26) NOT NULL COMMENT "",
-                    `c_address` varchar(41) NOT NULL COMMENT "",
-                    `c_city` varchar(11) NOT NULL COMMENT ""
-            )
-            DUPLICATE KEY (`c_custkey`)
-            CLUSTER BY (`c_name`, `c_address`)
-            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
-            PROPERTIES (
-                    "replication_num" = "1"
-             );
-        """
-        exception "Syntax error"
-    }
 
     // mor unique table with cluster keys
     test {
@@ -199,4 +181,44 @@ suite("test_create_table") {
             "enable_unique_key_merge_on_write" = "true"
         );
     """
+
+    sql """set enable_nereids_planner=false;"""
+    // duplicate table with cluster keys
+    test {
+        sql """
+            CREATE TABLE `$tableName` (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            CLUSTER BY (`c_name`, `c_address`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1"
+             );
+        """
+        exception "Syntax error"
+    }
+
+    sql """set enable_nereids_planner=true;"""
+    // duplicate table with cluster keys
+    test {
+        sql """
+            CREATE TABLE `$tableName` (
+                    `c_custkey` int(11) NOT NULL COMMENT "",
+                    `c_name` varchar(26) NOT NULL COMMENT "",
+                    `c_address` varchar(41) NOT NULL COMMENT "",
+                    `c_city` varchar(11) NOT NULL COMMENT ""
+            )
+            DUPLICATE KEY (`c_custkey`)
+            CLUSTER BY (`c_name`, `c_address`)
+            DISTRIBUTED BY HASH(`c_custkey`) BUCKETS 1
+            PROPERTIES (
+                    "replication_num" = "1"
+             );
+        """
+        exception "Cluster keys only support unique keys table"
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to