This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6adca4e48d [feature](merge-cloud) Add create cloud table (#30001)
c6adca4e48d is described below

commit c6adca4e48d2a1d38f6a5bd81a471216a29d4e93
Author: yujun <[email protected]>
AuthorDate: Tue Jan 16 17:56:05 2024 +0800

    [feature](merge-cloud) Add create cloud table (#30001)
---
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../doris/alter/MaterializedViewHandler.java       |   3 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   3 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |   3 +-
 .../main/java/org/apache/doris/catalog/Column.java | 143 ++++++
 .../doris/catalog/DynamicPartitionProperty.java    |  45 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  21 +-
 .../java/org/apache/doris/catalog/EnvFactory.java  |  34 ++
 .../main/java/org/apache/doris/catalog/Index.java  |  42 ++
 .../java/org/apache/doris/catalog/OlapTable.java   |  26 +-
 .../org/apache/doris/catalog/TableProperty.java    |  10 +
 .../main/java/org/apache/doris/catalog/Tablet.java |  32 +-
 .../apache/doris/cloud/catalog/CloudTablet.java    |  82 ++++
 .../cloud/datasource/CloudInternalCatalog.java     | 492 +++++++++++++++++++++
 .../apache/doris/common/util/PropertyAnalyzer.java |  20 +
 .../apache/doris/datasource/InternalCatalog.java   |  71 ++-
 .../apache/doris/service/FrontendServiceImpl.java  |   9 +-
 .../org/apache/doris/system/SystemInfoService.java |   3 +
 .../org/apache/doris/catalog/EnvFactoryTest.java   |   4 +
 19 files changed, 996 insertions(+), 54 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1b1d8906d96..909e1bf2d7a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2440,6 +2440,10 @@ public class Config extends ConfigBase {
         return !cloud_unique_id.isEmpty();
     }
 
+    public static boolean isNotCloudMode() {
+        return cloud_unique_id.isEmpty();
+    }
+
     /**
      * MetaService endpoint, ip:port, such as meta_service_endpoint = 
"192.0.0.10:8866"
      */
@@ -2452,6 +2456,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static int meta_service_connection_pool_size = 20;
 
+    @ConfField(mutable = true)
+    public static int meta_service_rpc_retry_times = 200;
+
     // A connection will expire after a random time during [base, 2*base), so 
that the FE
     // has a chance to connect to a new RS. Set zero to disable it.
     @ConfField(mutable = true)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 6425230b147..3f7122f3650 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.AggregateType;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -397,7 +398,7 @@ public class MaterializedViewHandler extends AlterHandler {
                 long baseTabletId = baseTablet.getId();
                 long mvTabletId = idGeneratorBuffer.getNextId();
 
-                Tablet newTablet = new Tablet(mvTabletId);
+                Tablet newTablet = EnvFactory.createTablet(mvTabletId);
                 mvIndex.addTablet(newTablet, mvTabletMeta);
                 addedTablets.add(newTablet);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 206f028b023..b5895ead653 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.KeysType;
@@ -1547,7 +1548,7 @@ public class SchemaChangeHandler extends AlterHandler {
                     long originTabletId = originTablet.getId();
                     long shadowTabletId = idGeneratorBuffer.getNextId();
 
-                    Tablet shadowTablet = new Tablet(shadowTabletId);
+                    Tablet shadowTablet = 
EnvFactory.createTablet(shadowTabletId);
                     shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
                     addedTablets.add(shadowTablet);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index c6c6beb3358..0398199e2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.BinlogConfig;
 import org.apache.doris.catalog.DataProperty;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -1122,7 +1123,7 @@ public class RestoreJob extends AbstractJob {
             for (int i = 0; i < remotetabletSize; i++) {
                 // generate new tablet id
                 long newTabletId = env.getNextId();
-                Tablet newTablet = new Tablet(newTabletId);
+                Tablet newTablet = EnvFactory.createTablet(newTabletId);
                 // add tablet to index, but not add to TabletInvertedIndex
                 remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* 
is restore */);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index dc94074cac6..f83eb942ca6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -32,11 +32,16 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SqlUtils;
 import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.proto.OlapFile;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TColumnType;
+import org.apache.doris.thrift.TPrimitiveType;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.gson.annotations.SerializedName;
+import com.google.protobuf.ByteString;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -630,6 +635,144 @@ public class Column implements Writable, 
GsonPostProcessable {
         }
     }
 
+    // CLOUD_CODE_BEGIN
+    public int getFieldLengthByType(TPrimitiveType type, int stringLength) 
throws DdlException {
+        switch (type) {
+            case TINYINT:
+            case BOOLEAN:
+                return 1;
+            case SMALLINT:
+                return 2;
+            case INT:
+                return 4;
+            case BIGINT:
+                return 8;
+            case LARGEINT:
+                return 16;
+            case DATE:
+                return 3;
+            case DATEV2:
+                return 4;
+            case DATETIME:
+                return 8;
+            case DATETIMEV2:
+                return 8;
+            case FLOAT:
+                return 4;
+            case DOUBLE:
+                return 8;
+            case QUANTILE_STATE:
+            case OBJECT:
+                return 16;
+            case CHAR:
+                return stringLength;
+            case VARCHAR:
+            case HLL:
+            case AGG_STATE:
+                return stringLength + 2; // sizeof(OLAP_VARCHAR_MAX_LENGTH)
+            case STRING:
+                return stringLength + 4; // sizeof(OLAP_STRING_MAX_LENGTH)
+            case JSONB:
+                return stringLength + 4; // sizeof(OLAP_JSONB_MAX_LENGTH)
+            case ARRAY:
+                return 65535; // OLAP_ARRAY_MAX_LENGTH
+            case DECIMAL32:
+                return 4;
+            case DECIMAL64:
+                return 8;
+            case DECIMAL128I:
+                return 16;
+            case DECIMALV2:
+                return 12; // use 12 bytes in olap engine.
+            case STRUCT:
+                return 65535;
+            case MAP:
+                return 65535;
+            default:
+                LOG.warn("unknown field type. [type= << {} << ]", type);
+                throw new DdlException("unknown field type. type: " + type);
+        }
+    }
+
+    public OlapFile.ColumnPB toPb(Set<String> bfColumns, List<Index> indexes) 
throws DdlException {
+        OlapFile.ColumnPB.Builder builder = OlapFile.ColumnPB.newBuilder();
+
+        // when doing schema change, some modified column has a prefix in name.
+        // this prefix is only used in FE, not visible to BE, so we should 
remove this prefix.
+        builder.setName(name.startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)
+                ? 
name.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()) : name);
+
+        builder.setUniqueId(uniqueId);
+        builder.setType(this.getDataType().toThrift().name());
+        builder.setIsKey(this.isKey);
+        if (null != this.aggregationType) {
+            if (type.isAggStateType()) {
+                AggStateType aggState = (AggStateType) type;
+                builder.setAggregation(aggState.getFunctionName());
+                builder.setResultIsNullable(aggState.getResultIsNullable());
+                for (Column column : children) {
+                    builder.addChildrenColumns(column.toPb(Sets.newHashSet(), 
Lists.newArrayList()));
+                }
+            } else {
+                builder.setAggregation(this.aggregationType.toString());
+            }
+        } else {
+            builder.setAggregation("NONE");
+        }
+        builder.setIsNullable(this.isAllowNull);
+        if (this.defaultValue != null) {
+            
builder.setDefaultValue(ByteString.copyFrom(this.defaultValue.getBytes()));
+        }
+        builder.setPrecision(this.getPrecision());
+        builder.setFrac(this.getScale());
+
+        int length = getFieldLengthByType(this.getDataType().toThrift(), 
this.getStrLen());
+
+        builder.setLength(length);
+        builder.setIndexLength(length);
+        if (this.getDataType().toThrift() == TPrimitiveType.VARCHAR
+                || this.getDataType().toThrift() == TPrimitiveType.STRING) {
+            builder.setIndexLength(this.getOlapColumnIndexSize());
+        }
+
+        if (bfColumns != null && bfColumns.contains(this.name)) {
+            builder.setIsBfColumn(true);
+        } else {
+            builder.setIsBfColumn(false);
+        }
+        builder.setVisible(visible);
+
+        if (indexes != null) {
+            for (Index index : indexes) {
+                if (index.getIndexType() == IndexDef.IndexType.BITMAP) {
+                    List<String> columns = index.getColumns();
+                    if (this.name.equalsIgnoreCase(columns.get(0))) {
+                        builder.setHasBitmapIndex(true);
+                        break;
+                    }
+                }
+            }
+        }
+
+        if (this.type.isArrayType()) {
+            Column child = this.getChildren().get(0);
+            builder.addChildrenColumns(child.toPb(Sets.newHashSet(), 
Lists.newArrayList()));
+        } else if (this.type.isMapType()) {
+            Column k = this.getChildren().get(0);
+            builder.addChildrenColumns(k.toPb(Sets.newHashSet(), 
Lists.newArrayList()));
+            Column v = this.getChildren().get(1);
+            builder.addChildrenColumns(v.toPb(Sets.newHashSet(), 
Lists.newArrayList()));
+        } else if (this.type.isStructType()) {
+            List<Column> childrenColumns = this.getChildren();
+            for (Column c : childrenColumns) {
+                builder.addChildrenColumns(c.toPb(Sets.newHashSet(), 
Lists.newArrayList()));
+            }
+        }
+
+        OlapFile.ColumnPB col = builder.build();
+        return col;
+    }
+    // CLOUD_CODE_END
 
     public void checkSchemaChangeAllowed(Column other) throws DdlException {
         if (Strings.isNullOrEmpty(other.name)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
index 54c49c1ee8d..2daeb7bc72c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.util.DynamicPartitionUtil;
@@ -216,28 +217,34 @@ public class DynamicPartitionProperty {
      * use table replication_num as dynamic_partition.replication_num default 
value
      */
     public String getProperties(ReplicaAllocation tableReplicaAlloc) {
-        ReplicaAllocation tmpAlloc = this.replicaAlloc.isNotSet() ? 
tableReplicaAlloc : this.replicaAlloc;
-        String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
-                + ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
-                + ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
-                + ",\n\"" + START + "\" = \"" + start + "\""
-                + ",\n\"" + END + "\" = \"" + end + "\""
-                + ",\n\"" + PREFIX + "\" = \"" + prefix + "\""
-                + ",\n\"" + REPLICATION_ALLOCATION + "\" = \"" + 
tmpAlloc.toCreateStmt() + "\""
-                + ",\n\"" + BUCKETS + "\" = \"" + buckets + "\""
-                + ",\n\"" + CREATE_HISTORY_PARTITION + "\" = \"" + 
createHistoryPartition + "\""
-                + ",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" + 
historyPartitionNum + "\""
-                + ",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum + 
"\""
-                + ",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" + 
reservedHistoryPeriods + "\""
-                + ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\"";
-        if (!Strings.isNullOrEmpty(storageMedium)) {
-            res += ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\"";
+        StringBuilder sb = new StringBuilder();
+        sb.append(",\n\"" + ENABLE + "\" = \"" + enable + "\"");
+        sb.append(",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"");
+        sb.append(",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\"");
+        sb.append(",\n\"" + START + "\" = \"" + start + "\"");
+        sb.append(",\n\"" + END + "\" = \"" + end + "\"");
+        sb.append(",\n\"" + PREFIX + "\" = \"" + prefix + "\"");
+        if (Config.isNotCloudMode()) {
+            ReplicaAllocation tmpAlloc = this.replicaAlloc.isNotSet() ? 
tableReplicaAlloc : this.replicaAlloc;
+            sb.append(",\n\"" + REPLICATION_ALLOCATION + "\" = \"" + 
tmpAlloc.toCreateStmt() + "\"");
+        }
+        sb.append(",\n\"" + BUCKETS + "\" = \"" + buckets + "\"");
+        sb.append(",\n\"" + CREATE_HISTORY_PARTITION + "\" = \"" + 
createHistoryPartition + "\"");
+        sb.append(",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" + 
historyPartitionNum + "\"");
+        sb.append(",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum + 
"\"");
+        sb.append(",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" + 
reservedHistoryPeriods + "\"");
+        if (Config.isNotCloudMode()) {
+            sb.append(",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + 
"\"");
+            if (!Strings.isNullOrEmpty(storageMedium)) {
+                sb.append(",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium 
+ "\"");
+            }
         }
         if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) {
-            res += ",\n\"" + START_DAY_OF_WEEK + "\" = \"" + 
startOfWeek.dayOfWeek + "\"";
+            sb.append(",\n\"" + START_DAY_OF_WEEK + "\" = \"" + 
startOfWeek.dayOfWeek + "\"");
         } else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) {
-            res += ",\n\"" + START_DAY_OF_MONTH + "\" = \"" + startOfMonth.day 
+ "\"";
+            sb.append(",\n\"" + START_DAY_OF_MONTH + "\" = \"" + 
startOfMonth.day + "\"");
         }
-        return res;
+        return sb.toString();
     }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5b9676a201d..b72544365aa 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3243,12 +3243,18 @@ public class Env {
 
             // replicationNum
             ReplicaAllocation replicaAlloc = 
olapTable.getDefaultReplicaAllocation();
-            
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\"
 = \"");
-            sb.append(replicaAlloc.toCreateStmt()).append("\"");
 
-            // min load replica num
-            
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\"
 = \"");
-            sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
+            if (Config.isCloudMode()) {
+                
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS).append("\"
 = \"");
+                sb.append(olapTable.getTTLSeconds()).append("\"");
+            } else {
+                
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\"
 = \"");
+                sb.append(replicaAlloc.toCreateStmt()).append("\"");
+
+                // min load replica num
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\"
 = \"");
+                sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
+            }
 
             // bloom filter
             Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
@@ -3296,6 +3302,11 @@ public class Env {
                 sb.append(olapTable.getDataSortInfo().toSql());
             }
 
+            if (Config.isCloudMode() && olapTable.getTTLSeconds() != 0) {
+                
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS).append("\"
 = \"");
+                sb.append(olapTable.getTTLSeconds()).append("\"");
+            }
+
             // in memory
             if (olapTable.isInMemory()) {
                 
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = 
\"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index a44e9da9c77..c653159d7b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -20,10 +20,12 @@ package org.apache.doris.catalog;
 import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
 import org.apache.doris.common.Config;
 import org.apache.doris.datasource.InternalCatalog;
 
+import java.lang.reflect.Type;
 
 public class EnvFactory {
 
@@ -43,6 +45,14 @@ public class EnvFactory {
         }
     }
 
+    public static Type getPartitionClass() {
+        if (Config.isCloudMode()) {
+            return CloudPartition.class;
+        } else {
+            return Partition.class;
+        }
+    }
+
     public static Partition createPartition() {
         if (Config.isCloudMode()) {
             return new CloudPartition();
@@ -51,6 +61,30 @@ public class EnvFactory {
         }
     }
 
+    public static Type getTabletClass() {
+        if (Config.isCloudMode()) {
+            return CloudTablet.class;
+        } else {
+            return Tablet.class;
+        }
+    }
+
+    public static Tablet createTablet() {
+        if (Config.isCloudMode()) {
+            return new CloudTablet();
+        } else {
+            return new Tablet();
+        }
+    }
+
+    public static Tablet createTablet(long tabletId) {
+        if (Config.isCloudMode()) {
+            return new CloudTablet(tabletId);
+        } else {
+            return new Tablet(tabletId);
+        }
+    }
+
     public static Replica createReplica() {
         if (Config.isCloudMode()) {
             return new CloudReplica();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
index e2235868a1c..4ec72e0c232 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.proto.OlapFile;
 import org.apache.doris.thrift.TIndexType;
 import org.apache.doris.thrift.TOlapTableIndex;
 
@@ -211,6 +212,47 @@ public class Index implements Writable {
         return tIndex;
     }
 
+    public OlapFile.TabletIndexPB toPb(List<Column> schemaColumns) {
+        OlapFile.TabletIndexPB.Builder builder = 
OlapFile.TabletIndexPB.newBuilder();
+        builder.setIndexId(indexId);
+        builder.setIndexName(indexName);
+        for (String columnName : columns) {
+            for (Column column : schemaColumns) {
+                if (column.getName().equals(columnName)) {
+                    builder.addColUniqueId(column.getUniqueId());
+                }
+            }
+        }
+
+        switch (indexType) {
+            case BITMAP:
+                builder.setIndexType(OlapFile.IndexType.BITMAP);
+                break;
+
+            case INVERTED:
+                builder.setIndexType(OlapFile.IndexType.INVERTED);
+                break;
+
+            case NGRAM_BF:
+                builder.setIndexType(OlapFile.IndexType.NGRAM_BF);
+                break;
+
+            case BLOOMFILTER:
+                builder.setIndexType(OlapFile.IndexType.BLOOMFILTER);
+                break;
+
+            default:
+                throw new RuntimeException("indexType " + indexType + " is not 
processed in toPb");
+        }
+
+        if (properties != null) {
+            builder.putAllProperties(properties);
+        }
+
+        OlapFile.TabletIndexPB index = builder.build();
+        return index;
+    }
+
     public static void checkConflict(Collection<Index> indices, Set<String> 
bloomFilters) throws AnalysisException {
         indices = indices == null ? Collections.emptyList() : indices;
         bloomFilters = bloomFilters == null ? Collections.emptySet() : 
bloomFilters;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 3236fc09ec9..5e36886e56a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -609,7 +609,7 @@ public class OlapTable extends Table {
                 idx.clearTabletsForRestore();
                 for (int i = 0; i < tabletNum; i++) {
                     long newTabletId = env.getNextId();
-                    Tablet newTablet = new Tablet(newTabletId);
+                    Tablet newTablet = EnvFactory.createTablet(newTabletId);
                     idx.addTablet(newTablet, null /* tablet meta */, true /* 
is restore */);
 
                     // replicas
@@ -668,6 +668,14 @@ public class OlapTable extends Table {
         return result;
     }
 
+    public List<Long> getIndexIdList() {
+        List<Long> result = Lists.newArrayList();
+        for (Long indexId : indexIdToMeta.keySet()) {
+            result.add(indexId);
+        }
+        return result;
+    }
+
     // schema
     public Map<Long, List<Column>> getIndexIdToSchema() {
         return getIndexIdToSchema(Util.showHiddenColumns());
@@ -1902,6 +1910,22 @@ public class OlapTable extends Table {
         return "";
     }
 
+    public long getTTLSeconds() {
+        if (tableProperty != null) {
+            return tableProperty.getTTLSeconds();
+        }
+        return 0L;
+    }
+
+    public void setTTLSeconds(long ttlSeconds) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS,
+                                            
Long.valueOf(ttlSeconds).toString());
+        tableProperty.buildTTLSeconds();
+    }
+
     public boolean getEnableLightSchemaChange() {
         if (tableProperty != null) {
             return tableProperty.getUseSchemaLightChange();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 3d5215bc483..5e857acf884 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -60,6 +60,7 @@ public class TableProperty implements Writable {
     private ReplicaAllocation replicaAlloc = 
ReplicaAllocation.DEFAULT_ALLOCATION;
     private boolean isInMemory = false;
     private short minLoadReplicaNum = -1;
+    private long ttlSeconds = 0L;
 
     private String storagePolicy = "";
     private Boolean isBeingSynced = null;
@@ -188,6 +189,15 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildTTLSeconds() {
+        ttlSeconds = 
Long.parseLong(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS,
 "0"));
+        return this;
+    }
+
+    public long getTTLSeconds() {
+        return ttlSeconds;
+    }
+
     public TableProperty buildEnableLightSchemaChange() {
         enableLightSchemaChange = Boolean.parseBoolean(
                 
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE, 
"false"));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index e579c6b9a48..0ab869ec803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
@@ -84,7 +85,7 @@ public class Tablet extends MetaObject implements Writable {
     @SerializedName(value = "id")
     private long id;
     @SerializedName(value = "replicas")
-    private List<Replica> replicas;
+    protected List<Replica> replicas;
     @SerializedName(value = "checkedVersion")
     private long checkedVersion;
     @Deprecated
@@ -112,7 +113,7 @@ public class Tablet extends MetaObject implements Writable {
         this(tabletId, new ArrayList<>());
     }
 
-    public Tablet(long tabletId, List<Replica> replicas) {
+    private Tablet(long tabletId, List<Replica> replicas) {
         this.id = tabletId;
         this.replicas = replicas;
         if (this.replicas == null) {
@@ -168,13 +169,14 @@ public class Tablet extends MetaObject implements 
Writable {
         }
     }
 
-    private boolean deleteRedundantReplica(long backendId, long version) {
+    protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
         boolean delete = false;
         boolean hasBackend = false;
+        long version = newReplica.getVersion();
         Iterator<Replica> iterator = replicas.iterator();
         while (iterator.hasNext()) {
             Replica replica = iterator.next();
-            if (replica.getBackendId() == backendId) {
+            if (replica.getBackendId() == newReplica.getBackendId()) {
                 hasBackend = true;
                 if (replica.getVersion() <= version) {
                     iterator.remove();
@@ -187,7 +189,7 @@ public class Tablet extends MetaObject implements Writable {
     }
 
     public void addReplica(Replica replica, boolean isRestore) {
-        if (deleteRedundantReplica(replica.getBackendId(), 
replica.getVersion())) {
+        if (isLatestReplicaAndDeleteOld(replica)) {
             replicas.add(replica);
             if (!isRestore) {
                 Env.getCurrentInvertedIndex().addReplica(id, replica);
@@ -212,16 +214,22 @@ public class Tablet extends MetaObject implements 
Writable {
     }
 
     public List<Long> getNormalReplicaBackendIds() {
-        return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
+        try {
+            return 
Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
+        } catch (Exception e) {
+            LOG.warn("failed to getNormalReplicaBackendIds", e);
+            return Lists.newArrayList();
+        }
     }
 
     // return map of (BE id -> path hash) of normal replicas
     // for load plan.
-    public Multimap<Long, Long> getNormalReplicaBackendPathMap() {
+    public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws 
UserException {
         Multimap<Long, Long> map = HashMultimap.create();
         SystemInfoService infoService = Env.getCurrentSystemInfo();
         for (Replica replica : replicas) {
-            if (!infoService.checkBackendAlive(replica.getBackendId())) {
+            long backendId = replica.getBackendId();
+            if (!infoService.checkBackendAlive(backendId)) {
                 continue;
             }
 
@@ -232,7 +240,7 @@ public class Tablet extends MetaObject implements Writable {
             ReplicaState state = replica.getState();
             if (state.canLoad()
                     || (state == ReplicaState.DECOMMISSION && 
replica.getPostWatermarkTxnId() < 0)) {
-                map.put(replica.getBackendId(), replica.getPathHash());
+                map.put(backendId, replica.getPathHash());
             }
         }
         return map;
@@ -383,7 +391,7 @@ public class Tablet extends MetaObject implements Writable {
         int replicaCount = in.readInt();
         for (int i = 0; i < replicaCount; ++i) {
             Replica replica = Replica.read(in);
-            if (deleteRedundantReplica(replica.getBackendId(), 
replica.getVersion())) {
+            if (isLatestReplicaAndDeleteOld(replica)) {
                 replicas.add(replica);
             }
         }
@@ -396,10 +404,10 @@ public class Tablet extends MetaObject implements 
Writable {
     public static Tablet read(DataInput in) throws IOException {
         if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
             String json = Text.readString(in);
-            return GsonUtils.GSON.fromJson(json, Tablet.class);
+            return GsonUtils.GSON.fromJson(json, EnvFactory.getTabletClass());
         }
 
-        Tablet tablet = new Tablet();
+        Tablet tablet = EnvFactory.createTablet();
         tablet.readFields(in);
         return tablet;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
new file mode 100644
index 00000000000..be8a2599dde
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+
+public class CloudTablet extends Tablet {
+
+    public CloudTablet() {
+        super();
+    }
+
+    public CloudTablet(long tabletId) {
+        super(tabletId);
+    }
+
+    @Override
+    public Replica getReplicaByBackendId(long backendId) {
+        if (!replicas.isEmpty()) {
+            return replicas.get(0);
+        }
+        return null;
+    }
+
+    @Override
+    public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws 
UserException {
+        Multimap<Long, Long> pathMap = super.getNormalReplicaBackendPathMap();
+
+        if (pathMap.containsKey(-1L)) {
+            pathMap.removeAll(-1L);
+            if (pathMap.isEmpty()) {
+                throw new UserException(InternalErrorCode.META_NOT_FOUND_ERR,
+                        SystemInfoService.NOT_USING_VALID_CLUSTER_MSG);
+            }
+        }
+
+        return pathMap;
+    }
+
+    @Override
+    protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
+        boolean delete = false;
+        boolean hasBackend = false;
+        long version = newReplica.getVersion();
+        Iterator<Replica> iterator = replicas.iterator();
+        while (iterator.hasNext()) {
+            hasBackend = true;
+            Replica replica = iterator.next();
+            if (replica.getVersion() <= version) {
+                iterator.remove();
+                delete = true;
+            }
+        }
+
+        return delete || !hasBackend;
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 5db352f8526..13f04fd36df 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -17,13 +17,505 @@
 
 package org.apache.doris.cloud.datasource;
 
+import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.proto.OlapCommon;
+import org.apache.doris.proto.OlapFile;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TSortType;
+import org.apache.doris.thrift.TStorageFormat;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TTabletType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import doris.segment_v2.SegmentV2;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
 public class CloudInternalCatalog extends InternalCatalog {
+    private static final Logger LOG = 
LogManager.getLogger(CloudInternalCatalog.class);
 
     public CloudInternalCatalog() {
         super();
     }
 
+    // BEGIN CREATE TABLE
+
+    // TODO(merge-cloud): merge code with InternalCatalog
+    @Override
+    protected Partition createPartitionWithIndices(long dbId, long tableId, 
String tableName,
+            long baseIndexId, long partitionId, String partitionName, 
Map<Long, MaterializedIndexMeta> indexIdToMeta,
+            DistributionInfo distributionInfo, TStorageMedium storageMedium, 
ReplicaAllocation replicaAlloc,
+            Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> 
tabletIdSet, List<Index> indexes,
+            boolean isInMemory, TStorageFormat storageFormat, TTabletType 
tabletType, TCompressionType compressionType,
+            DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, 
String storagePolicy,
+            IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
+            boolean enableSingleReplicaCompaction, boolean 
skipWriteIndexOnLoad,
+            String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
+            Long timeSeriesCompactionFileCountThreshold, Long 
timeSeriesCompactionTimeThresholdSeconds,
+            Long timeSeriesCompactionEmptyRowsetsThreshold,
+            boolean storeRowColumn, BinlogConfig binlogConfig,
+            boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes,
+            long ttlSeconds) throws DdlException {
+        // create base index first.
+        Preconditions.checkArgument(baseIndexId != -1);
+        MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, 
IndexState.NORMAL);
+
+        LOG.info("begin create cloud partition");
+        // create partition with base index
+        Partition partition = new CloudPartition(partitionId, partitionName, 
baseIndex,
+                distributionInfo, dbId, tableId);
+
+        // add to index map
+        Map<Long, MaterializedIndex> indexMap = Maps.newHashMap();
+        indexMap.put(baseIndexId, baseIndex);
+
+        // create rollup index if has
+        for (long indexId : indexIdToMeta.keySet()) {
+            if (indexId == baseIndexId) {
+                continue;
+            }
+
+            MaterializedIndex rollup = new MaterializedIndex(indexId, 
IndexState.NORMAL);
+            indexMap.put(indexId, rollup);
+        }
+
+        // version and version hash
+        if (versionInfo != null) {
+            partition.updateVisibleVersion(versionInfo);
+        }
+        long version = partition.getVisibleVersion();
+
+        // short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+        for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
+            long indexId = entry.getKey();
+            MaterializedIndex index = entry.getValue();
+            MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+
+            // create tablets
+            int schemaHash = indexMeta.getSchemaHash();
+            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, schemaHash, storageMedium);
+            createCloudTablets(index, ReplicaState.NORMAL, distributionInfo, 
version, replicaAlloc,
+                    tabletMeta, tabletIdSet);
+
+            short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
+            // TStorageType storageType = indexMeta.getStorageType();
+            List<Column> columns = indexMeta.getSchema();
+            KeysType keysType = indexMeta.getKeysType();
+
+            Cloud.CreateTabletsRequest.Builder requestBuilder = 
Cloud.CreateTabletsRequest.newBuilder();
+            for (Tablet tablet : index.getTablets()) {
+                OlapFile.TabletMetaCloudPB.Builder builder = 
createTabletMetaBuilder(tableId, indexId,
+                        partitionId, tablet, tabletType, schemaHash, keysType, 
shortKeyColumnCount,
+                        bfColumns, bfFpp, indexes, columns, dataSortInfo, 
compressionType,
+                        storagePolicy, isInMemory, false, tableName, 
ttlSeconds,
+                        enableUniqueKeyMergeOnWrite, storeRowColumn, 
indexMeta.getSchemaVersion());
+                requestBuilder.addTabletMetas(builder);
+            }
+
+            LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {}, 
partitionId: {}, partitionName: {}, "
+                    + "indexId: {}",
+                    dbId, tableId, tableName, partitionId, partitionName, 
indexId);
+            sendCreateTabletsRpc(requestBuilder);
+            if (index.getId() != baseIndexId) {
+                // add rollup index to partition
+                partition.createRollupIndex(index);
+            }
+        }
+
+        LOG.info("succeed in creating partition[{}-{}], table : [{}-{}]", 
partitionId, partitionName,
+                tableId, tableName);
+
+        return partition;
+    }
+
+    private OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long 
tableId, long indexId,
+            long partitionId, Tablet tablet, TTabletType tabletType, int 
schemaHash, KeysType keysType,
+            short shortKeyColumnCount, Set<String> bfColumns, double bfFpp, 
List<Index> indexes,
+            List<Column> schemaColumns, DataSortInfo dataSortInfo, 
TCompressionType compressionType,
+            String storagePolicy, boolean isInMemory, boolean isShadow,
+            String tableName, long ttlSeconds, boolean 
enableUniqueKeyMergeOnWrite,
+            boolean storeRowColumn, int schemaVersion) throws DdlException {
+        OlapFile.TabletMetaCloudPB.Builder builder = 
OlapFile.TabletMetaCloudPB.newBuilder();
+        builder.setTableId(tableId);
+        builder.setIndexId(indexId);
+        builder.setPartitionId(partitionId);
+        builder.setTabletId(tablet.getId());
+        builder.setSchemaHash(schemaHash);
+        builder.setTableName(tableName);
+        builder.setCreationTime(System.currentTimeMillis() / 1000);
+        builder.setCumulativeLayerPoint(-1);
+        builder.setTabletState(isShadow ? OlapFile.TabletStatePB.PB_NOTREADY : 
OlapFile.TabletStatePB.PB_RUNNING);
+        builder.setIsInMemory(isInMemory);
+        builder.setTtlSeconds(ttlSeconds);
+        builder.setSchemaVersion(schemaVersion);
+
+        UUID uuid = UUID.randomUUID();
+        Types.PUniqueId tabletUid = Types.PUniqueId.newBuilder()
+                .setHi(uuid.getMostSignificantBits())
+                .setLo(uuid.getLeastSignificantBits())
+                .build();
+        builder.setTabletUid(tabletUid);
+
+        builder.setPreferredRowsetType(OlapFile.RowsetTypePB.BETA_ROWSET);
+        builder.setTabletType(tabletType == TTabletType.TABLET_TYPE_DISK
+                ? OlapFile.TabletTypePB.TABLET_TYPE_DISK : 
OlapFile.TabletTypePB.TABLET_TYPE_MEMORY);
+
+        builder.setReplicaId(tablet.getReplicas().get(0).getId());
+        builder.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+
+        OlapFile.TabletSchemaCloudPB.Builder schemaBuilder = 
OlapFile.TabletSchemaCloudPB.newBuilder();
+        schemaBuilder.setSchemaVersion(schemaVersion);
+
+        if (keysType == KeysType.DUP_KEYS) {
+            schemaBuilder.setKeysType(OlapFile.KeysType.DUP_KEYS);
+        } else if (keysType == KeysType.UNIQUE_KEYS) {
+            schemaBuilder.setKeysType(OlapFile.KeysType.UNIQUE_KEYS);
+        } else if (keysType == KeysType.AGG_KEYS) {
+            schemaBuilder.setKeysType(OlapFile.KeysType.AGG_KEYS);
+        } else {
+            throw new DdlException("invalid key types");
+        }
+        schemaBuilder.setNumShortKeyColumns(shortKeyColumnCount);
+        schemaBuilder.setNumRowsPerRowBlock(1024);
+        schemaBuilder.setCompressKind(OlapCommon.CompressKind.COMPRESS_LZ4);
+        schemaBuilder.setBfFpp(bfFpp);
+
+        int deleteSign = -1;
+        int sequenceCol = -1;
+        for (int i = 0; i < schemaColumns.size(); i++) {
+            Column column = schemaColumns.get(i);
+            if (column.isDeleteSignColumn()) {
+                deleteSign = i;
+            }
+            if (column.isSequenceColumn()) {
+                sequenceCol = i;
+            }
+        }
+        schemaBuilder.setDeleteSignIdx(deleteSign);
+        schemaBuilder.setSequenceColIdx(sequenceCol);
+        schemaBuilder.setStoreRowColumn(storeRowColumn);
+
+        if (dataSortInfo.getSortType() == TSortType.LEXICAL) {
+            schemaBuilder.setSortType(OlapFile.SortType.LEXICAL);
+        } else if (dataSortInfo.getSortType() == TSortType.ZORDER) {
+            schemaBuilder.setSortType(OlapFile.SortType.ZORDER);
+        } else {
+            LOG.warn("invalid sort types:{}", dataSortInfo.getSortType());
+            throw new DdlException("invalid sort types");
+        }
+
+        switch (compressionType) {
+            case NO_COMPRESSION:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.NO_COMPRESSION);
+                break;
+            case SNAPPY:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.SNAPPY);
+                break;
+            case LZ4:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4);
+                break;
+            case LZ4F:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4F);
+                break;
+            case ZLIB:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.ZLIB);
+                break;
+            case ZSTD:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.ZSTD);
+                break;
+            default:
+                
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4F);
+                break;
+        }
+
+        schemaBuilder.setSortColNum(dataSortInfo.getColNum());
+        for (int i = 0; i < schemaColumns.size(); i++) {
+            Column column = schemaColumns.get(i);
+            schemaBuilder.addColumn(column.toPb(bfColumns, indexes));
+        }
+
+        if (indexes != null) {
+            for (int i = 0; i < indexes.size(); i++) {
+                Index index = indexes.get(i);
+                schemaBuilder.addIndex(index.toPb(schemaColumns));
+            }
+        }
+        OlapFile.TabletSchemaCloudPB schema = schemaBuilder.build();
+        builder.setSchema(schema);
+        // rowset
+        OlapFile.RowsetMetaCloudPB.Builder rowsetBuilder = 
createInitialRowset(tablet, partitionId,
+                schemaHash, schema);
+        builder.addRsMetas(rowsetBuilder);
+        return builder;
+    }
+
+    private OlapFile.RowsetMetaCloudPB.Builder createInitialRowset(Tablet 
tablet, long partitionId,
+            int schemaHash, OlapFile.TabletSchemaCloudPB schema) {
+        OlapFile.RowsetMetaCloudPB.Builder rowsetBuilder = 
OlapFile.RowsetMetaCloudPB.newBuilder();
+        rowsetBuilder.setRowsetId(0);
+        rowsetBuilder.setPartitionId(partitionId);
+        rowsetBuilder.setTabletId(tablet.getId());
+        rowsetBuilder.setTabletSchemaHash(schemaHash);
+        rowsetBuilder.setRowsetType(OlapFile.RowsetTypePB.BETA_ROWSET);
+        rowsetBuilder.setRowsetState(OlapFile.RowsetStatePB.VISIBLE);
+        rowsetBuilder.setStartVersion(0);
+        rowsetBuilder.setEndVersion(1);
+        rowsetBuilder.setNumRows(0);
+        rowsetBuilder.setTotalDiskSize(0);
+        rowsetBuilder.setDataDiskSize(0);
+        rowsetBuilder.setIndexDiskSize(0);
+        
rowsetBuilder.setSegmentsOverlapPb(OlapFile.SegmentsOverlapPB.NONOVERLAPPING);
+        rowsetBuilder.setNumSegments(0);
+        rowsetBuilder.setEmpty(true);
+
+        UUID uuid = UUID.randomUUID();
+        String rowsetIdV2Str = String.format("%016X", 2L << 56)
+                + String.format("%016X", uuid.getMostSignificantBits())
+                + String.format("%016X", uuid.getLeastSignificantBits());
+        rowsetBuilder.setRowsetIdV2(rowsetIdV2Str);
+
+        rowsetBuilder.setTabletSchema(schema);
+        return rowsetBuilder;
+    }
+
+    private void createCloudTablets(MaterializedIndex index, ReplicaState 
replicaState,
+            DistributionInfo distributionInfo, long version, ReplicaAllocation 
replicaAlloc,
+            TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+        for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
+            Tablet tablet = 
EnvFactory.createTablet(Env.getCurrentEnv().getNextId());
+
+            // add tablet to inverted index first
+            index.addTablet(tablet, tabletMeta);
+            tabletIdSet.add(tablet.getId());
+
+            long replicaId = Env.getCurrentEnv().getNextId();
+            Replica replica = new CloudReplica(replicaId, null, replicaState, 
version,
+                    tabletMeta.getOldSchemaHash(), tabletMeta.getDbId(), 
tabletMeta.getTableId(),
+                    tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
+            tablet.addReplica(replica);
+        }
+    }
+
+    @Override
+    protected void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+        if (partitionIds == null) {
+            prepareMaterializedIndex(tableId, indexIds);
+        } else {
+            preparePartition(dbId, tableId, partitionIds, indexIds);
+        }
+    }
+
+    @Override
+    protected void afterCreatePartitions(long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+        if (partitionIds == null) {
+            commitMaterializedIndex(tableId, indexIds);
+        } else {
+            commitPartition(tableId, partitionIds, indexIds);
+        }
+    }
+
+    private void preparePartition(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+        Cloud.PartitionRequest.Builder partitionRequestBuilder = 
Cloud.PartitionRequest.newBuilder();
+        partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        partitionRequestBuilder.setTableId(tableId);
+        partitionRequestBuilder.addAllPartitionIds(partitionIds);
+        partitionRequestBuilder.addAllIndexIds(indexIds);
+        partitionRequestBuilder.setExpiration(0);
+        if (dbId > 0) {
+            partitionRequestBuilder.setDbId(dbId);
+        }
+        final Cloud.PartitionRequest partitionRequest = 
partitionRequestBuilder.build();
+
+        Cloud.PartitionResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().preparePartition(partitionRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, preparePartition RpcException", 
tryTimes, e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("preparePartition response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    private void commitPartition(long tableId, List<Long> partitionIds, 
List<Long> indexIds) throws DdlException {
+        Cloud.PartitionRequest.Builder partitionRequestBuilder = 
Cloud.PartitionRequest.newBuilder();
+        partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        partitionRequestBuilder.addAllPartitionIds(partitionIds);
+        partitionRequestBuilder.addAllIndexIds(indexIds);
+        partitionRequestBuilder.setTableId(tableId);
+        final Cloud.PartitionRequest partitionRequest = 
partitionRequestBuilder.build();
+
+        Cloud.PartitionResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().commitPartition(partitionRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, commitPartition RpcException", 
tryTimes, e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("commitPartition response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    private void prepareMaterializedIndex(Long tableId, List<Long> indexIds) 
throws DdlException {
+        Cloud.IndexRequest.Builder indexRequestBuilder = 
Cloud.IndexRequest.newBuilder();
+        indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        indexRequestBuilder.addAllIndexIds(indexIds);
+        indexRequestBuilder.setTableId(tableId);
+        indexRequestBuilder.setExpiration(0);
+        final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+        Cloud.IndexResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().prepareIndex(indexRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, prepareIndex RpcException", tryTimes, 
e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("prepareIndex response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    private void commitMaterializedIndex(Long tableId, List<Long> indexIds) 
throws DdlException {
+        Cloud.IndexRequest.Builder indexRequestBuilder = 
Cloud.IndexRequest.newBuilder();
+        indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        indexRequestBuilder.addAllIndexIds(indexIds);
+        indexRequestBuilder.setTableId(tableId);
+        final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+        Cloud.IndexResponse response = null;
+        int tryTimes = 0;
+        while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+            try {
+                response = 
MetaServiceProxy.getInstance().commitIndex(indexRequest);
+                if (response.getStatus().getCode() != 
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+                    break;
+                }
+            } catch (RpcException e) {
+                LOG.warn("tryTimes:{}, commitIndex RpcException", tryTimes, e);
+                if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+                    throw new DdlException(e.getMessage());
+                }
+            }
+            sleepSeveralMs();
+        }
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            LOG.warn("commitIndex response: {} ", response);
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    private void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder 
requestBuilder) throws DdlException  {
+        requestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+        Cloud.CreateTabletsRequest createTabletsReq = requestBuilder.build();
+
+        LOG.debug("send create tablets rpc, createTabletsReq: {}", 
createTabletsReq);
+        Cloud.CreateTabletsResponse response;
+        try {
+            response = 
MetaServiceProxy.getInstance().createTablets(createTabletsReq);
+        } catch (RpcException e) {
+            LOG.warn("failed to send create tablets rpc {}", e.getMessage());
+            throw new RuntimeException(e);
+        }
+        LOG.info("create tablets response: {}", response);
+
+        if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+            throw new DdlException(response.getStatus().getMsg());
+        }
+    }
+
+    // END CREATE TABLE
+
+    @Override
+    protected void checkAvailableCapacity(Database db) throws DdlException {
+        // check cluster capacity
+        Env.getCurrentSystemInfo().checkAvailableCapacity();
+        // check db quota
+        db.checkQuota();
+    }
+
+    private void sleepSeveralMs() {
+        // sleep random millis [20, 200] ms, avoid txn conflict
+        int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+        LOG.debug("randomMillis:{}", randomMillis);
+        try {
+            Thread.sleep(randomMillis);
+        } catch (InterruptedException e) {
+            LOG.info("ignore InterruptedException: ", e);
+        }
+    }
+
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 8a4051c9af3..f1e457336f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -101,6 +101,8 @@ public class PropertyAnalyzer {
 
     public static final String PROPERTIES_INMEMORY = "in_memory";
 
+    public static final String PROPERTIES_FILE_CACHE_TTL_SECONDS = 
"file_cache_ttl_seconds";
+
     // _auto_bucket can only set in create table stmt rewrite bucket and can 
not be changed
     public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket";
     public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE = 
"estimate_partition_size";
@@ -245,6 +247,7 @@ public class PropertyAnalyzer {
         properties.remove(PROPERTIES_STORAGE_COOLDOWN_TIME);
         properties.remove(PROPERTIES_STORAGE_POLICY);
         properties.remove(PROPERTIES_DATA_BASE_TIME);
+        properties.remove(PROPERTIES_FILE_CACHE_TTL_SECONDS);
 
         Preconditions.checkNotNull(storageMedium);
 
@@ -449,6 +452,23 @@ public class PropertyAnalyzer {
         return version;
     }
 
+    public static long analyzeTTL(Map<String, String> properties) throws 
AnalysisException {
+        long ttlSeconds = 0;
+        if (properties != null && 
properties.containsKey(PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
+            String ttlSecondsStr = 
properties.get(PROPERTIES_FILE_CACHE_TTL_SECONDS);
+            try {
+                ttlSeconds = Long.parseLong(ttlSecondsStr);
+                if (ttlSeconds < 0) {
+                    throw new NumberFormatException();
+                }
+            } catch (NumberFormatException e) {
+                throw new AnalysisException("The value " + ttlSecondsStr + " 
formats error or  is out of range "
+                           + "(0 < integer < Long.MAX_VALUE)");
+            }
+        }
+        return ttlSeconds;
+    }
+
     public static int analyzeSchemaVersion(Map<String, String> properties) 
throws AnalysisException {
         int schemaVersion = 0;
         if (properties != null && 
properties.containsKey(PROPERTIES_SCHEMA_VERSION)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index b5e68de5b2e..4671daba0cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -65,6 +65,7 @@ import org.apache.doris.catalog.DatabaseProperty;
 import org.apache.doris.catalog.DistributionInfo;
 import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.EsTable;
 import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.HiveTable;
@@ -1068,10 +1069,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
         // only internal table should check quota and cluster capacity
         if (!stmt.isExternal()) {
-            // check cluster capacity
-            Env.getCurrentSystemInfo().checkAvailableCapacity();
-            // check db quota
-            db.checkQuota();
+            checkAvailableCapacity(db);
         }
 
         // check if table exists in db
@@ -1532,6 +1530,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
         try {
             long partitionId = idGeneratorBuffer.getNextId();
+            List<Long> partitionIds = Lists.newArrayList(partitionId);
+            List<Long> indexIds = 
indexIdToMeta.keySet().stream().collect(Collectors.toList());
+            beforeCreatePartitions(db.getId(), olapTable.getId(), 
partitionIds, indexIds);
             Partition partition = createPartitionWithIndices(db.getId(), 
olapTable.getId(),
                     olapTable.getName(), olapTable.getBaseIndexId(), 
partitionId, partitionName, indexIdToMeta,
                     distributionInfo, dataProperty.getStorageMedium(), 
singlePartitionDesc.getReplicaAlloc(),
@@ -1546,7 +1547,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                     olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                     olapTable.storeRowColumn(),
-                    binlogConfig, dataProperty.isStorageMediumSpecified(), 
null);
+                    binlogConfig, dataProperty.isStorageMediumSpecified(), 
null,
+                    olapTable.getTTLSeconds());
+            afterCreatePartitions(olapTable.getId(), partitionIds, indexIds);
             // TODO cluster key ids
 
             // check again
@@ -1791,7 +1794,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         }
     }
 
-    private Partition createPartitionWithIndices(long dbId, long tableId, 
String tableName,
+    protected Partition createPartitionWithIndices(long dbId, long tableId, 
String tableName,
             long baseIndexId, long partitionId, String partitionName, 
Map<Long, MaterializedIndexMeta> indexIdToMeta,
             DistributionInfo distributionInfo, TStorageMedium storageMedium, 
ReplicaAllocation replicaAlloc,
             Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> 
tabletIdSet, List<Index> indexes,
@@ -1803,7 +1806,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             Long timeSeriesCompactionFileCountThreshold, Long 
timeSeriesCompactionTimeThresholdSeconds,
             Long timeSeriesCompactionEmptyRowsetsThreshold,
             boolean storeRowColumn, BinlogConfig binlogConfig,
-            boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes) 
throws DdlException {
+            boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes,
+            long ttlSeconds) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
         MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, 
IndexState.NORMAL);
@@ -1947,6 +1951,21 @@ public class InternalCatalog implements 
CatalogIf<Database> {
         return partition;
     }
 
+    protected void beforeCreatePartitions(long dbId, long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+    }
+
+    protected void afterCreatePartitions(long tableId, List<Long> 
partitionIds, List<Long> indexIds)
+            throws DdlException {
+    }
+
+    protected void checkAvailableCapacity(Database db) throws DdlException {
+        // check cluster capacity
+        Env.getCurrentSystemInfo().checkAvailableCapacity();
+        // check db quota
+        db.checkQuota();
+    }
+
     // Create olap table and related base index synchronously.
     private void createOlapTable(Database db, CreateTableStmt stmt) throws 
UserException {
         String tableName = stmt.getTableName();
@@ -2265,6 +2284,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
         boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties, 
PropertyAnalyzer.PROPERTIES_MUTABLE, true);
 
+        Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties);
+        olapTable.setTTLSeconds(ttlSeconds);
+
         // set storage policy
         String storagePolicy = 
PropertyAnalyzer.analyzeStoragePolicy(properties);
         
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
@@ -2481,6 +2503,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             "Database " + db.getFullName() + " create 
unpartitioned table " + tableName + " increasing "
                                     + totalReplicaNum + " of replica exceeds 
quota[" + db.getReplicaQuota() + "]");
                 }
+                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList());
                 Partition partition = createPartitionWithIndices(db.getId(), 
olapTable.getId(),
                         olapTable.getName(), olapTable.getBaseIndexId(), 
partitionId, partitionName,
                         olapTable.getIndexIdToMeta(), 
partitionDistributionInfo,
@@ -2496,7 +2519,9 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                         storeRowColumn, binlogConfigForTask,
                         
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
-                        keysDesc.getClusterKeysColumnIds());
+                        keysDesc.getClusterKeysColumnIds(),
+                        olapTable.getTTLSeconds());
+                afterCreatePartitions(olapTable.getId(), null, 
olapTable.getIndexIdList());
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE
                     || partitionInfo.getType() == PartitionType.LIST) {
@@ -2539,6 +2564,8 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                                     + totalReplicaNum + " of replica exceeds 
quota[" + db.getReplicaQuota() + "]");
                 }
 
+                beforeCreatePartitions(db.getId(), olapTable.getId(), null, 
olapTable.getIndexIdList());
+
                 // this is a 2-level partitioned tables
                 for (Map.Entry<String, Long> entry : 
partitionNameToId.entrySet()) {
                     DataProperty dataProperty = 
partitionInfo.getDataProperty(entry.getValue());
@@ -2558,7 +2585,6 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         partionStoragePolicy = storagePolicy;
                     }
                     
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy);
-
                     Partition partition = 
createPartitionWithIndices(db.getId(),
                             olapTable.getId(), olapTable.getName(), 
olapTable.getBaseIndexId(), entry.getValue(),
                             entry.getKey(), olapTable.getIndexIdToMeta(), 
partitionDistributionInfo,
@@ -2573,11 +2599,14 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                             
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
                             
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                             storeRowColumn, binlogConfigForTask,
-                            dataProperty.isStorageMediumSpecified(), 
keysDesc.getClusterKeysColumnIds());
+                            dataProperty.isStorageMediumSpecified(),
+                            keysDesc.getClusterKeysColumnIds(),
+                            olapTable.getTTLSeconds());
                     olapTable.addPartition(partition);
                     
olapTable.getPartitionInfo().getDataProperty(partition.getId())
                             .setStoragePolicy(partionStoragePolicy);
                 }
+                afterCreatePartitions(olapTable.getId(), null, 
olapTable.getIndexIdList());
             } else {
                 throw new DdlException("Unsupported partition method: " + 
partitionInfo.getType().name());
             }
@@ -2810,7 +2839,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
         for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
             // create a new tablet with random chosen backends
-            Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
+            Tablet tablet = 
EnvFactory.createTablet(idGeneratorBuffer.getNextId());
 
             // add tablet to inverted index first
             index.addTablet(tablet, tabletMeta);
@@ -2997,6 +3026,19 @@ public class InternalCatalog implements 
CatalogIf<Database> {
             long bufferSize = 
IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl, 
origPartitions.values());
             IdGeneratorBuffer idGeneratorBuffer =
                     origPartitions.isEmpty() ? null : 
Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
+
+            Map<Long, Long> oldToNewPartitionId = new HashMap<Long, Long>();
+            List<Long> newPartitionIds = new ArrayList<Long>();
+            for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+                long oldPartitionId = entry.getValue();
+                long newPartitionId = idGeneratorBuffer.getNextId();
+                oldToNewPartitionId.put(oldPartitionId, newPartitionId);
+                newPartitionIds.add(newPartitionId);
+            }
+
+            List<Long> indexIds = 
copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList());
+            beforeCreatePartitions(db.getId(), copiedTbl.getId(), 
newPartitionIds, indexIds);
+
             for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
                 // the new partition must use new id
                 // If we still use the old partition id, the behavior of 
current load jobs on this partition
@@ -3004,7 +3046,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                 // By using a new id, load job will be aborted(just like 
partition is dropped),
                 // which is the right behavior.
                 long oldPartitionId = entry.getValue();
-                long newPartitionId = idGeneratorBuffer.getNextId();
+                long newPartitionId = oldToNewPartitionId.get(oldPartitionId);
                 Partition newPartition = 
createPartitionWithIndices(db.getId(), copiedTbl.getId(),
                         copiedTbl.getName(), copiedTbl.getBaseIndexId(), 
newPartitionId, entry.getKey(),
                         copiedTbl.getIndexIdToMeta(), 
partitionsDistributionInfo.get(oldPartitionId),
@@ -3023,9 +3065,12 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                         olapTable.storeRowColumn(), binlogConfig,
                         
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
-                        clusterKeyIdxes);
+                        clusterKeyIdxes, olapTable.getTTLSeconds());
                 newPartitions.add(newPartition);
             }
+
+            afterCreatePartitions(copiedTbl.getId(), newPartitionIds, 
indexIds);
+
         } catch (DdlException e) {
             // create partition failed, remove all newly created tablets
             for (Long tabletId : tabletIdSet) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c02bce86b89..280cf5ebf77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3153,7 +3153,14 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     // we should ensure the replica backend is alive
                     // otherwise, there will be a 'unknown node id, id=xxx' 
error for stream load
                     // BE id -> path hash
-                    Multimap<Long, Long> bePathsMap = 
tablet.getNormalReplicaBackendPathMap();
+                    Multimap<Long, Long> bePathsMap;
+                    try {
+                        bePathsMap = tablet.getNormalReplicaBackendPathMap();
+                    } catch (UserException ex) {
+                        
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+                        result.setStatus(errorStatus);
+                        return result;
+                    }
                     if (bePathsMap.keySet().size() < quorum) {
                         LOG.warn("auto go quorum exception");
                     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 78297aae83a..62511e25e47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -83,6 +83,9 @@ public class SystemInfoService {
 
     public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is 
no scanNode Backend available.";
 
+    public static final String NOT_USING_VALID_CLUSTER_MSG = "Not using valid 
cloud clusters, "
+            + "please use a cluster before issuing any queries";
+
     private volatile ImmutableMap<Long, Backend> idToBackendRef = 
ImmutableMap.of();
     private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef = 
ImmutableMap.of();
     // TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk 
of inconsistency
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
index ddd3b9441a5..1a939457f5e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.catalog.CloudPartition;
 import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.datasource.CloudInternalCatalog;
 import org.apache.doris.common.Config;
 import org.apache.doris.datasource.InternalCatalog;
@@ -42,6 +43,8 @@ public class EnvFactoryTest {
         Assert.assertFalse(EnvFactory.createInternalCatalog() instanceof 
CloudInternalCatalog);
         Assert.assertTrue(EnvFactory.createPartition() instanceof Partition);
         Assert.assertFalse(EnvFactory.createPartition() instanceof 
CloudPartition);
+        Assert.assertTrue(EnvFactory.createTablet() instanceof Tablet);
+        Assert.assertFalse(EnvFactory.createTablet() instanceof CloudTablet);
         Assert.assertTrue(EnvFactory.createReplica() instanceof Replica);
         Assert.assertFalse(EnvFactory.createReplica() instanceof CloudReplica);
 
@@ -49,6 +52,7 @@ public class EnvFactoryTest {
         Assert.assertTrue(EnvFactory.createEnv(false) instanceof CloudEnv);
         Assert.assertTrue(EnvFactory.createInternalCatalog() instanceof 
CloudInternalCatalog);
         Assert.assertTrue(EnvFactory.createPartition() instanceof 
CloudPartition);
+        Assert.assertTrue(EnvFactory.createTablet() instanceof CloudTablet);
         Assert.assertTrue(EnvFactory.createReplica() instanceof CloudReplica);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to