This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6adca4e48d [feature](merge-cloud) Add create cloud table (#30001)
c6adca4e48d is described below
commit c6adca4e48d2a1d38f6a5bd81a471216a29d4e93
Author: yujun <[email protected]>
AuthorDate: Tue Jan 16 17:56:05 2024 +0800
[feature](merge-cloud) Add create cloud table (#30001)
---
.../main/java/org/apache/doris/common/Config.java | 7 +
.../doris/alter/MaterializedViewHandler.java | 3 +-
.../apache/doris/alter/SchemaChangeHandler.java | 3 +-
.../java/org/apache/doris/backup/RestoreJob.java | 3 +-
.../main/java/org/apache/doris/catalog/Column.java | 143 ++++++
.../doris/catalog/DynamicPartitionProperty.java | 45 +-
.../main/java/org/apache/doris/catalog/Env.java | 21 +-
.../java/org/apache/doris/catalog/EnvFactory.java | 34 ++
.../main/java/org/apache/doris/catalog/Index.java | 42 ++
.../java/org/apache/doris/catalog/OlapTable.java | 26 +-
.../org/apache/doris/catalog/TableProperty.java | 10 +
.../main/java/org/apache/doris/catalog/Tablet.java | 32 +-
.../apache/doris/cloud/catalog/CloudTablet.java | 82 ++++
.../cloud/datasource/CloudInternalCatalog.java | 492 +++++++++++++++++++++
.../apache/doris/common/util/PropertyAnalyzer.java | 20 +
.../apache/doris/datasource/InternalCatalog.java | 71 ++-
.../apache/doris/service/FrontendServiceImpl.java | 9 +-
.../org/apache/doris/system/SystemInfoService.java | 3 +
.../org/apache/doris/catalog/EnvFactoryTest.java | 4 +
19 files changed, 996 insertions(+), 54 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1b1d8906d96..909e1bf2d7a 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2440,6 +2440,10 @@ public class Config extends ConfigBase {
return !cloud_unique_id.isEmpty();
}
+ public static boolean isNotCloudMode() {
+ return cloud_unique_id.isEmpty();
+ }
+
/**
* MetaService endpoint, ip:port, such as meta_service_endpoint =
"192.0.0.10:8866"
*/
@@ -2452,6 +2456,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int meta_service_connection_pool_size = 20;
+ @ConfField(mutable = true)
+ public static int meta_service_rpc_retry_times = 200;
+
// A connection will expire after a random time during [base, 2*base), so
that the FE
// has a chance to connect to a new RS. Set zero to disable it.
@ConfField(mutable = true)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index 6425230b147..3f7122f3650 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -31,6 +31,7 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
@@ -397,7 +398,7 @@ public class MaterializedViewHandler extends AlterHandler {
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
- Tablet newTablet = new Tablet(mvTabletId);
+ Tablet newTablet = EnvFactory.createTablet(mvTabletId);
mvIndex.addTablet(newTablet, mvTabletMeta);
addedTablets.add(newTablet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 206f028b023..b5895ead653 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -41,6 +41,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
@@ -1547,7 +1548,7 @@ public class SchemaChangeHandler extends AlterHandler {
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
- Tablet shadowTablet = new Tablet(shadowTabletId);
+ Tablet shadowTablet =
EnvFactory.createTablet(shadowTabletId);
shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
addedTablets.add(shadowTablet);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index c6c6beb3358..0398199e2e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@@ -1122,7 +1123,7 @@ public class RestoreJob extends AbstractJob {
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
- Tablet newTablet = new Tablet(newTabletId);
+ Tablet newTablet = EnvFactory.createTablet(newTabletId);
// add tablet to index, but not add to TabletInvertedIndex
remoteIdx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index dc94074cac6..f83eb942ca6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -32,11 +32,16 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.proto.OlapFile;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TColumnType;
+import org.apache.doris.thrift.TPrimitiveType;
import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
+import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -630,6 +635,144 @@ public class Column implements Writable,
GsonPostProcessable {
}
}
+ // CLOUD_CODE_BEGIN
+ public int getFieldLengthByType(TPrimitiveType type, int stringLength)
throws DdlException {
+ switch (type) {
+ case TINYINT:
+ case BOOLEAN:
+ return 1;
+ case SMALLINT:
+ return 2;
+ case INT:
+ return 4;
+ case BIGINT:
+ return 8;
+ case LARGEINT:
+ return 16;
+ case DATE:
+ return 3;
+ case DATEV2:
+ return 4;
+ case DATETIME:
+ return 8;
+ case DATETIMEV2:
+ return 8;
+ case FLOAT:
+ return 4;
+ case DOUBLE:
+ return 8;
+ case QUANTILE_STATE:
+ case OBJECT:
+ return 16;
+ case CHAR:
+ return stringLength;
+ case VARCHAR:
+ case HLL:
+ case AGG_STATE:
+ return stringLength + 2; // sizeof(OLAP_VARCHAR_MAX_LENGTH)
+ case STRING:
+ return stringLength + 4; // sizeof(OLAP_STRING_MAX_LENGTH)
+ case JSONB:
+ return stringLength + 4; // sizeof(OLAP_JSONB_MAX_LENGTH)
+ case ARRAY:
+ return 65535; // OLAP_ARRAY_MAX_LENGTH
+ case DECIMAL32:
+ return 4;
+ case DECIMAL64:
+ return 8;
+ case DECIMAL128I:
+ return 16;
+ case DECIMALV2:
+ return 12; // use 12 bytes in olap engine.
+ case STRUCT:
+ return 65535;
+ case MAP:
+ return 65535;
+ default:
+ LOG.warn("unknown field type. [type= << {} << ]", type);
+ throw new DdlException("unknown field type. type: " + type);
+ }
+ }
+
+ public OlapFile.ColumnPB toPb(Set<String> bfColumns, List<Index> indexes)
throws DdlException {
+ OlapFile.ColumnPB.Builder builder = OlapFile.ColumnPB.newBuilder();
+
+ // when doing schema change, some modified column has a prefix in name.
+ // this prefix is only used in FE, not visible to BE, so we should
remove this prefix.
+ builder.setName(name.startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)
+ ?
name.substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()) : name);
+
+ builder.setUniqueId(uniqueId);
+ builder.setType(this.getDataType().toThrift().name());
+ builder.setIsKey(this.isKey);
+ if (null != this.aggregationType) {
+ if (type.isAggStateType()) {
+ AggStateType aggState = (AggStateType) type;
+ builder.setAggregation(aggState.getFunctionName());
+ builder.setResultIsNullable(aggState.getResultIsNullable());
+ for (Column column : children) {
+ builder.addChildrenColumns(column.toPb(Sets.newHashSet(),
Lists.newArrayList()));
+ }
+ } else {
+ builder.setAggregation(this.aggregationType.toString());
+ }
+ } else {
+ builder.setAggregation("NONE");
+ }
+ builder.setIsNullable(this.isAllowNull);
+ if (this.defaultValue != null) {
+
builder.setDefaultValue(ByteString.copyFrom(this.defaultValue.getBytes()));
+ }
+ builder.setPrecision(this.getPrecision());
+ builder.setFrac(this.getScale());
+
+ int length = getFieldLengthByType(this.getDataType().toThrift(),
this.getStrLen());
+
+ builder.setLength(length);
+ builder.setIndexLength(length);
+ if (this.getDataType().toThrift() == TPrimitiveType.VARCHAR
+ || this.getDataType().toThrift() == TPrimitiveType.STRING) {
+ builder.setIndexLength(this.getOlapColumnIndexSize());
+ }
+
+ if (bfColumns != null && bfColumns.contains(this.name)) {
+ builder.setIsBfColumn(true);
+ } else {
+ builder.setIsBfColumn(false);
+ }
+ builder.setVisible(visible);
+
+ if (indexes != null) {
+ for (Index index : indexes) {
+ if (index.getIndexType() == IndexDef.IndexType.BITMAP) {
+ List<String> columns = index.getColumns();
+ if (this.name.equalsIgnoreCase(columns.get(0))) {
+ builder.setHasBitmapIndex(true);
+ break;
+ }
+ }
+ }
+ }
+
+ if (this.type.isArrayType()) {
+ Column child = this.getChildren().get(0);
+ builder.addChildrenColumns(child.toPb(Sets.newHashSet(),
Lists.newArrayList()));
+ } else if (this.type.isMapType()) {
+ Column k = this.getChildren().get(0);
+ builder.addChildrenColumns(k.toPb(Sets.newHashSet(),
Lists.newArrayList()));
+ Column v = this.getChildren().get(1);
+ builder.addChildrenColumns(v.toPb(Sets.newHashSet(),
Lists.newArrayList()));
+ } else if (this.type.isStructType()) {
+ List<Column> childrenColumns = this.getChildren();
+ for (Column c : childrenColumns) {
+ builder.addChildrenColumns(c.toPb(Sets.newHashSet(),
Lists.newArrayList()));
+ }
+ }
+
+ OlapFile.ColumnPB col = builder.build();
+ return col;
+ }
+ // CLOUD_CODE_END
public void checkSchemaChangeAllowed(Column other) throws DdlException {
if (Strings.isNullOrEmpty(other.name)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
index 54c49c1ee8d..2daeb7bc72c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/DynamicPartitionProperty.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.TimestampArithmeticExpr.TimeUnit;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.DynamicPartitionUtil;
@@ -216,28 +217,34 @@ public class DynamicPartitionProperty {
* use table replication_num as dynamic_partition.replication_num default
value
*/
public String getProperties(ReplicaAllocation tableReplicaAlloc) {
- ReplicaAllocation tmpAlloc = this.replicaAlloc.isNotSet() ?
tableReplicaAlloc : this.replicaAlloc;
- String res = ",\n\"" + ENABLE + "\" = \"" + enable + "\""
- + ",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\""
- + ",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\""
- + ",\n\"" + START + "\" = \"" + start + "\""
- + ",\n\"" + END + "\" = \"" + end + "\""
- + ",\n\"" + PREFIX + "\" = \"" + prefix + "\""
- + ",\n\"" + REPLICATION_ALLOCATION + "\" = \"" +
tmpAlloc.toCreateStmt() + "\""
- + ",\n\"" + BUCKETS + "\" = \"" + buckets + "\""
- + ",\n\"" + CREATE_HISTORY_PARTITION + "\" = \"" +
createHistoryPartition + "\""
- + ",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" +
historyPartitionNum + "\""
- + ",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum +
"\""
- + ",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" +
reservedHistoryPeriods + "\""
- + ",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy + "\"";
- if (!Strings.isNullOrEmpty(storageMedium)) {
- res += ",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium + "\"";
+ StringBuilder sb = new StringBuilder();
+ sb.append(",\n\"" + ENABLE + "\" = \"" + enable + "\"");
+ sb.append(",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"");
+ sb.append(",\n\"" + TIME_ZONE + "\" = \"" + tz.getID() + "\"");
+ sb.append(",\n\"" + START + "\" = \"" + start + "\"");
+ sb.append(",\n\"" + END + "\" = \"" + end + "\"");
+ sb.append(",\n\"" + PREFIX + "\" = \"" + prefix + "\"");
+ if (Config.isNotCloudMode()) {
+ ReplicaAllocation tmpAlloc = this.replicaAlloc.isNotSet() ?
tableReplicaAlloc : this.replicaAlloc;
+ sb.append(",\n\"" + REPLICATION_ALLOCATION + "\" = \"" +
tmpAlloc.toCreateStmt() + "\"");
+ }
+ sb.append(",\n\"" + BUCKETS + "\" = \"" + buckets + "\"");
+ sb.append(",\n\"" + CREATE_HISTORY_PARTITION + "\" = \"" +
createHistoryPartition + "\"");
+ sb.append(",\n\"" + HISTORY_PARTITION_NUM + "\" = \"" +
historyPartitionNum + "\"");
+ sb.append(",\n\"" + HOT_PARTITION_NUM + "\" = \"" + hotPartitionNum +
"\"");
+ sb.append(",\n\"" + RESERVED_HISTORY_PERIODS + "\" = \"" +
reservedHistoryPeriods + "\"");
+ if (Config.isNotCloudMode()) {
+ sb.append(",\n\"" + STORAGE_POLICY + "\" = \"" + storagePolicy +
"\"");
+ if (!Strings.isNullOrEmpty(storageMedium)) {
+ sb.append(",\n\"" + STORAGE_MEDIUM + "\" = \"" + storageMedium
+ "\"");
+ }
}
if (getTimeUnit().equalsIgnoreCase(TimeUnit.WEEK.toString())) {
- res += ",\n\"" + START_DAY_OF_WEEK + "\" = \"" +
startOfWeek.dayOfWeek + "\"";
+ sb.append(",\n\"" + START_DAY_OF_WEEK + "\" = \"" +
startOfWeek.dayOfWeek + "\"");
} else if (getTimeUnit().equalsIgnoreCase(TimeUnit.MONTH.toString())) {
- res += ",\n\"" + START_DAY_OF_MONTH + "\" = \"" + startOfMonth.day
+ "\"";
+ sb.append(",\n\"" + START_DAY_OF_MONTH + "\" = \"" +
startOfMonth.day + "\"");
}
- return res;
+ return sb.toString();
}
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 5b9676a201d..b72544365aa 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -3243,12 +3243,18 @@ public class Env {
// replicationNum
ReplicaAllocation replicaAlloc =
olapTable.getDefaultReplicaAllocation();
-
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\"
= \"");
- sb.append(replicaAlloc.toCreateStmt()).append("\"");
- // min load replica num
-
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\"
= \"");
- sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
+ if (Config.isCloudMode()) {
+
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS).append("\"
= \"");
+ sb.append(olapTable.getTTLSeconds()).append("\"");
+ } else {
+
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\"
= \"");
+ sb.append(replicaAlloc.toCreateStmt()).append("\"");
+
+ // min load replica num
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\"
= \"");
+ sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
+ }
// bloom filter
Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
@@ -3296,6 +3302,11 @@ public class Env {
sb.append(olapTable.getDataSortInfo().toSql());
}
+ if (Config.isCloudMode() && olapTable.getTTLSeconds() != 0) {
+
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS).append("\"
= \"");
+ sb.append(olapTable.getTTLSeconds()).append("\"");
+ }
+
// in memory
if (olapTable.isInMemory()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" =
\"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
index a44e9da9c77..c653159d7b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EnvFactory.java
@@ -20,10 +20,12 @@ package org.apache.doris.catalog;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.InternalCatalog;
+import java.lang.reflect.Type;
public class EnvFactory {
@@ -43,6 +45,14 @@ public class EnvFactory {
}
}
+ public static Type getPartitionClass() {
+ if (Config.isCloudMode()) {
+ return CloudPartition.class;
+ } else {
+ return Partition.class;
+ }
+ }
+
public static Partition createPartition() {
if (Config.isCloudMode()) {
return new CloudPartition();
@@ -51,6 +61,30 @@ public class EnvFactory {
}
}
+ public static Type getTabletClass() {
+ if (Config.isCloudMode()) {
+ return CloudTablet.class;
+ } else {
+ return Tablet.class;
+ }
+ }
+
+ public static Tablet createTablet() {
+ if (Config.isCloudMode()) {
+ return new CloudTablet();
+ } else {
+ return new Tablet();
+ }
+ }
+
+ public static Tablet createTablet(long tabletId) {
+ if (Config.isCloudMode()) {
+ return new CloudTablet(tabletId);
+ } else {
+ return new Tablet(tabletId);
+ }
+ }
+
public static Replica createReplica() {
if (Config.isCloudMode()) {
return new CloudReplica();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
index e2235868a1c..4ec72e0c232 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Index.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.proto.OlapFile;
import org.apache.doris.thrift.TIndexType;
import org.apache.doris.thrift.TOlapTableIndex;
@@ -211,6 +212,47 @@ public class Index implements Writable {
return tIndex;
}
+ public OlapFile.TabletIndexPB toPb(List<Column> schemaColumns) {
+ OlapFile.TabletIndexPB.Builder builder =
OlapFile.TabletIndexPB.newBuilder();
+ builder.setIndexId(indexId);
+ builder.setIndexName(indexName);
+ for (String columnName : columns) {
+ for (Column column : schemaColumns) {
+ if (column.getName().equals(columnName)) {
+ builder.addColUniqueId(column.getUniqueId());
+ }
+ }
+ }
+
+ switch (indexType) {
+ case BITMAP:
+ builder.setIndexType(OlapFile.IndexType.BITMAP);
+ break;
+
+ case INVERTED:
+ builder.setIndexType(OlapFile.IndexType.INVERTED);
+ break;
+
+ case NGRAM_BF:
+ builder.setIndexType(OlapFile.IndexType.NGRAM_BF);
+ break;
+
+ case BLOOMFILTER:
+ builder.setIndexType(OlapFile.IndexType.BLOOMFILTER);
+ break;
+
+ default:
+ throw new RuntimeException("indexType " + indexType + " is not
processed in toPb");
+ }
+
+ if (properties != null) {
+ builder.putAllProperties(properties);
+ }
+
+ OlapFile.TabletIndexPB index = builder.build();
+ return index;
+ }
+
public static void checkConflict(Collection<Index> indices, Set<String>
bloomFilters) throws AnalysisException {
indices = indices == null ? Collections.emptyList() : indices;
bloomFilters = bloomFilters == null ? Collections.emptySet() :
bloomFilters;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 3236fc09ec9..5e36886e56a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -609,7 +609,7 @@ public class OlapTable extends Table {
idx.clearTabletsForRestore();
for (int i = 0; i < tabletNum; i++) {
long newTabletId = env.getNextId();
- Tablet newTablet = new Tablet(newTabletId);
+ Tablet newTablet = EnvFactory.createTablet(newTabletId);
idx.addTablet(newTablet, null /* tablet meta */, true /*
is restore */);
// replicas
@@ -668,6 +668,14 @@ public class OlapTable extends Table {
return result;
}
+ public List<Long> getIndexIdList() {
+ List<Long> result = Lists.newArrayList();
+ for (Long indexId : indexIdToMeta.keySet()) {
+ result.add(indexId);
+ }
+ return result;
+ }
+
// schema
public Map<Long, List<Column>> getIndexIdToSchema() {
return getIndexIdToSchema(Util.showHiddenColumns());
@@ -1902,6 +1910,22 @@ public class OlapTable extends Table {
return "";
}
+ public long getTTLSeconds() {
+ if (tableProperty != null) {
+ return tableProperty.getTTLSeconds();
+ }
+ return 0L;
+ }
+
+ public void setTTLSeconds(long ttlSeconds) {
+ if (tableProperty == null) {
+ tableProperty = new TableProperty(new HashMap<>());
+ }
+
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS,
+
Long.valueOf(ttlSeconds).toString());
+ tableProperty.buildTTLSeconds();
+ }
+
public boolean getEnableLightSchemaChange() {
if (tableProperty != null) {
return tableProperty.getUseSchemaLightChange();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 3d5215bc483..5e857acf884 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -60,6 +60,7 @@ public class TableProperty implements Writable {
private ReplicaAllocation replicaAlloc =
ReplicaAllocation.DEFAULT_ALLOCATION;
private boolean isInMemory = false;
private short minLoadReplicaNum = -1;
+ private long ttlSeconds = 0L;
private String storagePolicy = "";
private Boolean isBeingSynced = null;
@@ -188,6 +189,15 @@ public class TableProperty implements Writable {
return this;
}
+ public TableProperty buildTTLSeconds() {
+ ttlSeconds =
Long.parseLong(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS,
"0"));
+ return this;
+ }
+
+ public long getTTLSeconds() {
+ return ttlSeconds;
+ }
+
public TableProperty buildEnableLightSchemaChange() {
enableLightSchemaChange = Boolean.parseBoolean(
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE,
"false"));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index e579c6b9a48..0ab869ec803 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -84,7 +85,7 @@ public class Tablet extends MetaObject implements Writable {
@SerializedName(value = "id")
private long id;
@SerializedName(value = "replicas")
- private List<Replica> replicas;
+ protected List<Replica> replicas;
@SerializedName(value = "checkedVersion")
private long checkedVersion;
@Deprecated
@@ -112,7 +113,7 @@ public class Tablet extends MetaObject implements Writable {
this(tabletId, new ArrayList<>());
}
- public Tablet(long tabletId, List<Replica> replicas) {
+ private Tablet(long tabletId, List<Replica> replicas) {
this.id = tabletId;
this.replicas = replicas;
if (this.replicas == null) {
@@ -168,13 +169,14 @@ public class Tablet extends MetaObject implements
Writable {
}
}
- private boolean deleteRedundantReplica(long backendId, long version) {
+ protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
boolean delete = false;
boolean hasBackend = false;
+ long version = newReplica.getVersion();
Iterator<Replica> iterator = replicas.iterator();
while (iterator.hasNext()) {
Replica replica = iterator.next();
- if (replica.getBackendId() == backendId) {
+ if (replica.getBackendId() == newReplica.getBackendId()) {
hasBackend = true;
if (replica.getVersion() <= version) {
iterator.remove();
@@ -187,7 +189,7 @@ public class Tablet extends MetaObject implements Writable {
}
public void addReplica(Replica replica, boolean isRestore) {
- if (deleteRedundantReplica(replica.getBackendId(),
replica.getVersion())) {
+ if (isLatestReplicaAndDeleteOld(replica)) {
replicas.add(replica);
if (!isRestore) {
Env.getCurrentInvertedIndex().addReplica(id, replica);
@@ -212,16 +214,22 @@ public class Tablet extends MetaObject implements
Writable {
}
public List<Long> getNormalReplicaBackendIds() {
- return Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
+ try {
+ return
Lists.newArrayList(getNormalReplicaBackendPathMap().keySet());
+ } catch (Exception e) {
+ LOG.warn("failed to getNormalReplicaBackendIds", e);
+ return Lists.newArrayList();
+ }
}
// return map of (BE id -> path hash) of normal replicas
// for load plan.
- public Multimap<Long, Long> getNormalReplicaBackendPathMap() {
+ public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws
UserException {
Multimap<Long, Long> map = HashMultimap.create();
SystemInfoService infoService = Env.getCurrentSystemInfo();
for (Replica replica : replicas) {
- if (!infoService.checkBackendAlive(replica.getBackendId())) {
+ long backendId = replica.getBackendId();
+ if (!infoService.checkBackendAlive(backendId)) {
continue;
}
@@ -232,7 +240,7 @@ public class Tablet extends MetaObject implements Writable {
ReplicaState state = replica.getState();
if (state.canLoad()
|| (state == ReplicaState.DECOMMISSION &&
replica.getPostWatermarkTxnId() < 0)) {
- map.put(replica.getBackendId(), replica.getPathHash());
+ map.put(backendId, replica.getPathHash());
}
}
return map;
@@ -383,7 +391,7 @@ public class Tablet extends MetaObject implements Writable {
int replicaCount = in.readInt();
for (int i = 0; i < replicaCount; ++i) {
Replica replica = Replica.read(in);
- if (deleteRedundantReplica(replica.getBackendId(),
replica.getVersion())) {
+ if (isLatestReplicaAndDeleteOld(replica)) {
replicas.add(replica);
}
}
@@ -396,10 +404,10 @@ public class Tablet extends MetaObject implements
Writable {
public static Tablet read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, Tablet.class);
+ return GsonUtils.GSON.fromJson(json, EnvFactory.getTabletClass());
}
- Tablet tablet = new Tablet();
+ Tablet tablet = EnvFactory.createTablet();
tablet.readFields(in);
return tablet;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
new file mode 100644
index 00000000000..be8a2599dde
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTablet.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.catalog;
+
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.InternalErrorCode;
+import org.apache.doris.common.UserException;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.Multimap;
+
+import java.util.Iterator;
+
+public class CloudTablet extends Tablet {
+
+ public CloudTablet() {
+ super();
+ }
+
+ public CloudTablet(long tabletId) {
+ super(tabletId);
+ }
+
+ @Override
+ public Replica getReplicaByBackendId(long backendId) {
+ if (!replicas.isEmpty()) {
+ return replicas.get(0);
+ }
+ return null;
+ }
+
+ @Override
+ public Multimap<Long, Long> getNormalReplicaBackendPathMap() throws
UserException {
+ Multimap<Long, Long> pathMap = super.getNormalReplicaBackendPathMap();
+
+ if (pathMap.containsKey(-1L)) {
+ pathMap.removeAll(-1L);
+ if (pathMap.isEmpty()) {
+ throw new UserException(InternalErrorCode.META_NOT_FOUND_ERR,
+ SystemInfoService.NOT_USING_VALID_CLUSTER_MSG);
+ }
+ }
+
+ return pathMap;
+ }
+
+ @Override
+ protected boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
+ boolean delete = false;
+ boolean hasBackend = false;
+ long version = newReplica.getVersion();
+ Iterator<Replica> iterator = replicas.iterator();
+ while (iterator.hasNext()) {
+ hasBackend = true;
+ Replica replica = iterator.next();
+ if (replica.getVersion() <= version) {
+ iterator.remove();
+ delete = true;
+ }
+ }
+
+ return delete || !hasBackend;
+ }
+
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 5db352f8526..13f04fd36df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -17,13 +17,505 @@
package org.apache.doris.cloud.datasource;
+import org.apache.doris.analysis.DataSortInfo;
+import org.apache.doris.catalog.BinlogConfig;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.Index;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.MaterializedIndex.IndexState;
+import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.ReplicaAllocation;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.cloud.catalog.CloudPartition;
+import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.proto.Cloud;
+import org.apache.doris.cloud.rpc.MetaServiceProxy;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.proto.OlapCommon;
+import org.apache.doris.proto.OlapFile;
+import org.apache.doris.proto.Types;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TSortType;
+import org.apache.doris.thrift.TStorageFormat;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.thrift.TTabletType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import doris.segment_v2.SegmentV2;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
public class CloudInternalCatalog extends InternalCatalog {
+ private static final Logger LOG =
LogManager.getLogger(CloudInternalCatalog.class);
public CloudInternalCatalog() {
super();
}
+ // BEGIN CREATE TABLE
+
+ // TODO(merge-cloud): merge code with InternalCatalog
+ @Override
+ protected Partition createPartitionWithIndices(long dbId, long tableId,
String tableName,
+ long baseIndexId, long partitionId, String partitionName,
Map<Long, MaterializedIndexMeta> indexIdToMeta,
+ DistributionInfo distributionInfo, TStorageMedium storageMedium,
ReplicaAllocation replicaAlloc,
+ Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long>
tabletIdSet, List<Index> indexes,
+ boolean isInMemory, TStorageFormat storageFormat, TTabletType
tabletType, TCompressionType compressionType,
+ DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite,
String storagePolicy,
+ IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
+ boolean enableSingleReplicaCompaction, boolean
skipWriteIndexOnLoad,
+ String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
+ Long timeSeriesCompactionFileCountThreshold, Long
timeSeriesCompactionTimeThresholdSeconds,
+ Long timeSeriesCompactionEmptyRowsetsThreshold,
+ boolean storeRowColumn, BinlogConfig binlogConfig,
+ boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes,
+ long ttlSeconds) throws DdlException {
+ // create base index first.
+ Preconditions.checkArgument(baseIndexId != -1);
+ MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId,
IndexState.NORMAL);
+
+ LOG.info("begin create cloud partition");
+ // create partition with base index
+ Partition partition = new CloudPartition(partitionId, partitionName,
baseIndex,
+ distributionInfo, dbId, tableId);
+
+ // add to index map
+ Map<Long, MaterializedIndex> indexMap = Maps.newHashMap();
+ indexMap.put(baseIndexId, baseIndex);
+
+ // create rollup index if has
+ for (long indexId : indexIdToMeta.keySet()) {
+ if (indexId == baseIndexId) {
+ continue;
+ }
+
+ MaterializedIndex rollup = new MaterializedIndex(indexId,
IndexState.NORMAL);
+ indexMap.put(indexId, rollup);
+ }
+
+ // version and version hash
+ if (versionInfo != null) {
+ partition.updateVisibleVersion(versionInfo);
+ }
+ long version = partition.getVisibleVersion();
+
+ // short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
+ for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
+ long indexId = entry.getKey();
+ MaterializedIndex index = entry.getValue();
+ MaterializedIndexMeta indexMeta = indexIdToMeta.get(indexId);
+
+ // create tablets
+ int schemaHash = indexMeta.getSchemaHash();
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium);
+ createCloudTablets(index, ReplicaState.NORMAL, distributionInfo,
version, replicaAlloc,
+ tabletMeta, tabletIdSet);
+
+ short shortKeyColumnCount = indexMeta.getShortKeyColumnCount();
+ // TStorageType storageType = indexMeta.getStorageType();
+ List<Column> columns = indexMeta.getSchema();
+ KeysType keysType = indexMeta.getKeysType();
+
+ Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
+ for (Tablet tablet : index.getTablets()) {
+ OlapFile.TabletMetaCloudPB.Builder builder =
createTabletMetaBuilder(tableId, indexId,
+ partitionId, tablet, tabletType, schemaHash, keysType,
shortKeyColumnCount,
+ bfColumns, bfFpp, indexes, columns, dataSortInfo,
compressionType,
+ storagePolicy, isInMemory, false, tableName,
ttlSeconds,
+ enableUniqueKeyMergeOnWrite, storeRowColumn,
indexMeta.getSchemaVersion());
+ requestBuilder.addTabletMetas(builder);
+ }
+
+ LOG.info("create tablets, dbId: {}, tableId: {}, tableName: {},
partitionId: {}, partitionName: {}, "
+ + "indexId: {}",
+ dbId, tableId, tableName, partitionId, partitionName,
indexId);
+ sendCreateTabletsRpc(requestBuilder);
+ if (index.getId() != baseIndexId) {
+ // add rollup index to partition
+ partition.createRollupIndex(index);
+ }
+ }
+
+ LOG.info("succeed in creating partition[{}-{}], table : [{}-{}]",
partitionId, partitionName,
+ tableId, tableName);
+
+ return partition;
+ }
+
+ private OlapFile.TabletMetaCloudPB.Builder createTabletMetaBuilder(long
tableId, long indexId,
+ long partitionId, Tablet tablet, TTabletType tabletType, int
schemaHash, KeysType keysType,
+ short shortKeyColumnCount, Set<String> bfColumns, double bfFpp,
List<Index> indexes,
+ List<Column> schemaColumns, DataSortInfo dataSortInfo,
TCompressionType compressionType,
+ String storagePolicy, boolean isInMemory, boolean isShadow,
+ String tableName, long ttlSeconds, boolean
enableUniqueKeyMergeOnWrite,
+ boolean storeRowColumn, int schemaVersion) throws DdlException {
+ OlapFile.TabletMetaCloudPB.Builder builder =
OlapFile.TabletMetaCloudPB.newBuilder();
+ builder.setTableId(tableId);
+ builder.setIndexId(indexId);
+ builder.setPartitionId(partitionId);
+ builder.setTabletId(tablet.getId());
+ builder.setSchemaHash(schemaHash);
+ builder.setTableName(tableName);
+ builder.setCreationTime(System.currentTimeMillis() / 1000);
+ builder.setCumulativeLayerPoint(-1);
+ builder.setTabletState(isShadow ? OlapFile.TabletStatePB.PB_NOTREADY :
OlapFile.TabletStatePB.PB_RUNNING);
+ builder.setIsInMemory(isInMemory);
+ builder.setTtlSeconds(ttlSeconds);
+ builder.setSchemaVersion(schemaVersion);
+
+ UUID uuid = UUID.randomUUID();
+ Types.PUniqueId tabletUid = Types.PUniqueId.newBuilder()
+ .setHi(uuid.getMostSignificantBits())
+ .setLo(uuid.getLeastSignificantBits())
+ .build();
+ builder.setTabletUid(tabletUid);
+
+ builder.setPreferredRowsetType(OlapFile.RowsetTypePB.BETA_ROWSET);
+ builder.setTabletType(tabletType == TTabletType.TABLET_TYPE_DISK
+ ? OlapFile.TabletTypePB.TABLET_TYPE_DISK :
OlapFile.TabletTypePB.TABLET_TYPE_MEMORY);
+
+ builder.setReplicaId(tablet.getReplicas().get(0).getId());
+ builder.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+
+ OlapFile.TabletSchemaCloudPB.Builder schemaBuilder =
OlapFile.TabletSchemaCloudPB.newBuilder();
+ schemaBuilder.setSchemaVersion(schemaVersion);
+
+ if (keysType == KeysType.DUP_KEYS) {
+ schemaBuilder.setKeysType(OlapFile.KeysType.DUP_KEYS);
+ } else if (keysType == KeysType.UNIQUE_KEYS) {
+ schemaBuilder.setKeysType(OlapFile.KeysType.UNIQUE_KEYS);
+ } else if (keysType == KeysType.AGG_KEYS) {
+ schemaBuilder.setKeysType(OlapFile.KeysType.AGG_KEYS);
+ } else {
+ throw new DdlException("invalid key types");
+ }
+ schemaBuilder.setNumShortKeyColumns(shortKeyColumnCount);
+ schemaBuilder.setNumRowsPerRowBlock(1024);
+ schemaBuilder.setCompressKind(OlapCommon.CompressKind.COMPRESS_LZ4);
+ schemaBuilder.setBfFpp(bfFpp);
+
+ int deleteSign = -1;
+ int sequenceCol = -1;
+ for (int i = 0; i < schemaColumns.size(); i++) {
+ Column column = schemaColumns.get(i);
+ if (column.isDeleteSignColumn()) {
+ deleteSign = i;
+ }
+ if (column.isSequenceColumn()) {
+ sequenceCol = i;
+ }
+ }
+ schemaBuilder.setDeleteSignIdx(deleteSign);
+ schemaBuilder.setSequenceColIdx(sequenceCol);
+ schemaBuilder.setStoreRowColumn(storeRowColumn);
+
+ if (dataSortInfo.getSortType() == TSortType.LEXICAL) {
+ schemaBuilder.setSortType(OlapFile.SortType.LEXICAL);
+ } else if (dataSortInfo.getSortType() == TSortType.ZORDER) {
+ schemaBuilder.setSortType(OlapFile.SortType.ZORDER);
+ } else {
+ LOG.warn("invalid sort types:{}", dataSortInfo.getSortType());
+ throw new DdlException("invalid sort types");
+ }
+
+ switch (compressionType) {
+ case NO_COMPRESSION:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.NO_COMPRESSION);
+ break;
+ case SNAPPY:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.SNAPPY);
+ break;
+ case LZ4:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4);
+ break;
+ case LZ4F:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4F);
+ break;
+ case ZLIB:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.ZLIB);
+ break;
+ case ZSTD:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.ZSTD);
+ break;
+ default:
+
schemaBuilder.setCompressionType(SegmentV2.CompressionTypePB.LZ4F);
+ break;
+ }
+
+ schemaBuilder.setSortColNum(dataSortInfo.getColNum());
+ for (int i = 0; i < schemaColumns.size(); i++) {
+ Column column = schemaColumns.get(i);
+ schemaBuilder.addColumn(column.toPb(bfColumns, indexes));
+ }
+
+ if (indexes != null) {
+ for (int i = 0; i < indexes.size(); i++) {
+ Index index = indexes.get(i);
+ schemaBuilder.addIndex(index.toPb(schemaColumns));
+ }
+ }
+ OlapFile.TabletSchemaCloudPB schema = schemaBuilder.build();
+ builder.setSchema(schema);
+ // rowset
+ OlapFile.RowsetMetaCloudPB.Builder rowsetBuilder =
createInitialRowset(tablet, partitionId,
+ schemaHash, schema);
+ builder.addRsMetas(rowsetBuilder);
+ return builder;
+ }
+
+ private OlapFile.RowsetMetaCloudPB.Builder createInitialRowset(Tablet
tablet, long partitionId,
+ int schemaHash, OlapFile.TabletSchemaCloudPB schema) {
+ OlapFile.RowsetMetaCloudPB.Builder rowsetBuilder =
OlapFile.RowsetMetaCloudPB.newBuilder();
+ rowsetBuilder.setRowsetId(0);
+ rowsetBuilder.setPartitionId(partitionId);
+ rowsetBuilder.setTabletId(tablet.getId());
+ rowsetBuilder.setTabletSchemaHash(schemaHash);
+ rowsetBuilder.setRowsetType(OlapFile.RowsetTypePB.BETA_ROWSET);
+ rowsetBuilder.setRowsetState(OlapFile.RowsetStatePB.VISIBLE);
+ rowsetBuilder.setStartVersion(0);
+ rowsetBuilder.setEndVersion(1);
+ rowsetBuilder.setNumRows(0);
+ rowsetBuilder.setTotalDiskSize(0);
+ rowsetBuilder.setDataDiskSize(0);
+ rowsetBuilder.setIndexDiskSize(0);
+
rowsetBuilder.setSegmentsOverlapPb(OlapFile.SegmentsOverlapPB.NONOVERLAPPING);
+ rowsetBuilder.setNumSegments(0);
+ rowsetBuilder.setEmpty(true);
+
+ UUID uuid = UUID.randomUUID();
+ String rowsetIdV2Str = String.format("%016X", 2L << 56)
+ + String.format("%016X", uuid.getMostSignificantBits())
+ + String.format("%016X", uuid.getLeastSignificantBits());
+ rowsetBuilder.setRowsetIdV2(rowsetIdV2Str);
+
+ rowsetBuilder.setTabletSchema(schema);
+ return rowsetBuilder;
+ }
+
+ private void createCloudTablets(MaterializedIndex index, ReplicaState
replicaState,
+ DistributionInfo distributionInfo, long version, ReplicaAllocation
replicaAlloc,
+ TabletMeta tabletMeta, Set<Long> tabletIdSet) throws DdlException {
+ for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
+ Tablet tablet =
EnvFactory.createTablet(Env.getCurrentEnv().getNextId());
+
+ // add tablet to inverted index first
+ index.addTablet(tablet, tabletMeta);
+ tabletIdSet.add(tablet.getId());
+
+ long replicaId = Env.getCurrentEnv().getNextId();
+ Replica replica = new CloudReplica(replicaId, null, replicaState,
version,
+ tabletMeta.getOldSchemaHash(), tabletMeta.getDbId(),
tabletMeta.getTableId(),
+ tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
+ tablet.addReplica(replica);
+ }
+ }
+
+ @Override
+ protected void beforeCreatePartitions(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ if (partitionIds == null) {
+ prepareMaterializedIndex(tableId, indexIds);
+ } else {
+ preparePartition(dbId, tableId, partitionIds, indexIds);
+ }
+ }
+
+ @Override
+ protected void afterCreatePartitions(long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ if (partitionIds == null) {
+ commitMaterializedIndex(tableId, indexIds);
+ } else {
+ commitPartition(tableId, partitionIds, indexIds);
+ }
+ }
+
+ private void preparePartition(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ Cloud.PartitionRequest.Builder partitionRequestBuilder =
Cloud.PartitionRequest.newBuilder();
+ partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ partitionRequestBuilder.setTableId(tableId);
+ partitionRequestBuilder.addAllPartitionIds(partitionIds);
+ partitionRequestBuilder.addAllIndexIds(indexIds);
+ partitionRequestBuilder.setExpiration(0);
+ if (dbId > 0) {
+ partitionRequestBuilder.setDbId(dbId);
+ }
+ final Cloud.PartitionRequest partitionRequest =
partitionRequestBuilder.build();
+
+ Cloud.PartitionResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().preparePartition(partitionRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, preparePartition RpcException",
tryTimes, e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("preparePartition response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ private void commitPartition(long tableId, List<Long> partitionIds,
List<Long> indexIds) throws DdlException {
+ Cloud.PartitionRequest.Builder partitionRequestBuilder =
Cloud.PartitionRequest.newBuilder();
+ partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ partitionRequestBuilder.addAllPartitionIds(partitionIds);
+ partitionRequestBuilder.addAllIndexIds(indexIds);
+ partitionRequestBuilder.setTableId(tableId);
+ final Cloud.PartitionRequest partitionRequest =
partitionRequestBuilder.build();
+
+ Cloud.PartitionResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().commitPartition(partitionRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, commitPartition RpcException",
tryTimes, e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("commitPartition response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ private void prepareMaterializedIndex(Long tableId, List<Long> indexIds)
throws DdlException {
+ Cloud.IndexRequest.Builder indexRequestBuilder =
Cloud.IndexRequest.newBuilder();
+ indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ indexRequestBuilder.addAllIndexIds(indexIds);
+ indexRequestBuilder.setTableId(tableId);
+ indexRequestBuilder.setExpiration(0);
+ final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+ Cloud.IndexResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().prepareIndex(indexRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, prepareIndex RpcException", tryTimes,
e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("prepareIndex response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ private void commitMaterializedIndex(Long tableId, List<Long> indexIds)
throws DdlException {
+ Cloud.IndexRequest.Builder indexRequestBuilder =
Cloud.IndexRequest.newBuilder();
+ indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ indexRequestBuilder.addAllIndexIds(indexIds);
+ indexRequestBuilder.setTableId(tableId);
+ final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();
+
+ Cloud.IndexResponse response = null;
+ int tryTimes = 0;
+ while (tryTimes++ < Config.meta_service_rpc_retry_times) {
+ try {
+ response =
MetaServiceProxy.getInstance().commitIndex(indexRequest);
+ if (response.getStatus().getCode() !=
Cloud.MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (RpcException e) {
+ LOG.warn("tryTimes:{}, commitIndex RpcException", tryTimes, e);
+ if (tryTimes + 1 >= Config.meta_service_rpc_retry_times) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+ sleepSeveralMs();
+ }
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ LOG.warn("commitIndex response: {} ", response);
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ private void sendCreateTabletsRpc(Cloud.CreateTabletsRequest.Builder
requestBuilder) throws DdlException {
+ requestBuilder.setCloudUniqueId(Config.cloud_unique_id);
+ Cloud.CreateTabletsRequest createTabletsReq = requestBuilder.build();
+
+ LOG.debug("send create tablets rpc, createTabletsReq: {}",
createTabletsReq);
+ Cloud.CreateTabletsResponse response;
+ try {
+ response =
MetaServiceProxy.getInstance().createTablets(createTabletsReq);
+ } catch (RpcException e) {
+ LOG.warn("failed to send create tablets rpc {}", e.getMessage());
+ throw new RuntimeException(e);
+ }
+ LOG.info("create tablets response: {}", response);
+
+ if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) {
+ throw new DdlException(response.getStatus().getMsg());
+ }
+ }
+
+ // END CREATE TABLE
+
+ @Override
+ protected void checkAvailableCapacity(Database db) throws DdlException {
+ // check cluster capacity
+ Env.getCurrentSystemInfo().checkAvailableCapacity();
+ // check db quota
+ db.checkQuota();
+ }
+
+ private void sleepSeveralMs() {
+ // sleep random millis [20, 200] ms, avoid txn conflict
+ int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+ LOG.debug("randomMillis:{}", randomMillis);
+ try {
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException e) {
+ LOG.info("ignore InterruptedException: ", e);
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 8a4051c9af3..f1e457336f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -101,6 +101,8 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_INMEMORY = "in_memory";
+ public static final String PROPERTIES_FILE_CACHE_TTL_SECONDS =
"file_cache_ttl_seconds";
+
// _auto_bucket can only set in create table stmt rewrite bucket and can
not be changed
public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket";
public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE =
"estimate_partition_size";
@@ -245,6 +247,7 @@ public class PropertyAnalyzer {
properties.remove(PROPERTIES_STORAGE_COOLDOWN_TIME);
properties.remove(PROPERTIES_STORAGE_POLICY);
properties.remove(PROPERTIES_DATA_BASE_TIME);
+ properties.remove(PROPERTIES_FILE_CACHE_TTL_SECONDS);
Preconditions.checkNotNull(storageMedium);
@@ -449,6 +452,23 @@ public class PropertyAnalyzer {
return version;
}
+ public static long analyzeTTL(Map<String, String> properties) throws
AnalysisException {
+ long ttlSeconds = 0;
+ if (properties != null &&
properties.containsKey(PROPERTIES_FILE_CACHE_TTL_SECONDS)) {
+ String ttlSecondsStr =
properties.get(PROPERTIES_FILE_CACHE_TTL_SECONDS);
+ try {
+ ttlSeconds = Long.parseLong(ttlSecondsStr);
+ if (ttlSeconds < 0) {
+ throw new NumberFormatException();
+ }
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("The value " + ttlSecondsStr + "
formats error or is out of range "
+ + "(0 < integer < Long.MAX_VALUE)");
+ }
+ }
+ return ttlSeconds;
+ }
+
public static int analyzeSchemaVersion(Map<String, String> properties)
throws AnalysisException {
int schemaVersion = 0;
if (properties != null &&
properties.containsKey(PROPERTIES_SCHEMA_VERSION)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index b5e68de5b2e..4671daba0cb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -65,6 +65,7 @@ import org.apache.doris.catalog.DatabaseProperty;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.EnvFactory;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveTable;
@@ -1068,10 +1069,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// only internal table should check quota and cluster capacity
if (!stmt.isExternal()) {
- // check cluster capacity
- Env.getCurrentSystemInfo().checkAvailableCapacity();
- // check db quota
- db.checkQuota();
+ checkAvailableCapacity(db);
}
// check if table exists in db
@@ -1532,6 +1530,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
try {
long partitionId = idGeneratorBuffer.getNextId();
+ List<Long> partitionIds = Lists.newArrayList(partitionId);
+ List<Long> indexIds =
indexIdToMeta.keySet().stream().collect(Collectors.toList());
+ beforeCreatePartitions(db.getId(), olapTable.getId(),
partitionIds, indexIds);
Partition partition = createPartitionWithIndices(db.getId(),
olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(),
partitionId, partitionName, indexIdToMeta,
distributionInfo, dataProperty.getStorageMedium(),
singlePartitionDesc.getReplicaAlloc(),
@@ -1546,7 +1547,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.storeRowColumn(),
- binlogConfig, dataProperty.isStorageMediumSpecified(),
null);
+ binlogConfig, dataProperty.isStorageMediumSpecified(),
null,
+ olapTable.getTTLSeconds());
+ afterCreatePartitions(olapTable.getId(), partitionIds, indexIds);
// TODO cluster key ids
// check again
@@ -1791,7 +1794,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
}
}
- private Partition createPartitionWithIndices(long dbId, long tableId,
String tableName,
+ protected Partition createPartitionWithIndices(long dbId, long tableId,
String tableName,
long baseIndexId, long partitionId, String partitionName,
Map<Long, MaterializedIndexMeta> indexIdToMeta,
DistributionInfo distributionInfo, TStorageMedium storageMedium,
ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long>
tabletIdSet, List<Index> indexes,
@@ -1803,7 +1806,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
Long timeSeriesCompactionFileCountThreshold, Long
timeSeriesCompactionTimeThresholdSeconds,
Long timeSeriesCompactionEmptyRowsetsThreshold,
boolean storeRowColumn, BinlogConfig binlogConfig,
- boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes)
throws DdlException {
+ boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes,
+ long ttlSeconds) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId,
IndexState.NORMAL);
@@ -1947,6 +1951,21 @@ public class InternalCatalog implements
CatalogIf<Database> {
return partition;
}
+ protected void beforeCreatePartitions(long dbId, long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ }
+
+ protected void afterCreatePartitions(long tableId, List<Long>
partitionIds, List<Long> indexIds)
+ throws DdlException {
+ }
+
+ protected void checkAvailableCapacity(Database db) throws DdlException {
+ // check cluster capacity
+ Env.getCurrentSystemInfo().checkAvailableCapacity();
+ // check db quota
+ db.checkQuota();
+ }
+
// Create olap table and related base index synchronously.
private void createOlapTable(Database db, CreateTableStmt stmt) throws
UserException {
String tableName = stmt.getTableName();
@@ -2265,6 +2284,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
boolean isMutable = PropertyAnalyzer.analyzeBooleanProp(properties,
PropertyAnalyzer.PROPERTIES_MUTABLE, true);
+ Long ttlSeconds = PropertyAnalyzer.analyzeTTL(properties);
+ olapTable.setTTLSeconds(ttlSeconds);
+
// set storage policy
String storagePolicy =
PropertyAnalyzer.analyzeStoragePolicy(properties);
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
@@ -2481,6 +2503,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
"Database " + db.getFullName() + " create
unpartitioned table " + tableName + " increasing "
+ totalReplicaNum + " of replica exceeds
quota[" + db.getReplicaQuota() + "]");
}
+ beforeCreatePartitions(db.getId(), olapTable.getId(), null,
olapTable.getIndexIdList());
Partition partition = createPartitionWithIndices(db.getId(),
olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(),
partitionId, partitionName,
olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
@@ -2496,7 +2519,9 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
storeRowColumn, binlogConfigForTask,
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
- keysDesc.getClusterKeysColumnIds());
+ keysDesc.getClusterKeysColumnIds(),
+ olapTable.getTTLSeconds());
+ afterCreatePartitions(olapTable.getId(), null,
olapTable.getIndexIdList());
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@@ -2539,6 +2564,8 @@ public class InternalCatalog implements
CatalogIf<Database> {
+ totalReplicaNum + " of replica exceeds
quota[" + db.getReplicaQuota() + "]");
}
+ beforeCreatePartitions(db.getId(), olapTable.getId(), null,
olapTable.getIndexIdList());
+
// this is a 2-level partitioned tables
for (Map.Entry<String, Long> entry :
partitionNameToId.entrySet()) {
DataProperty dataProperty =
partitionInfo.getDataProperty(entry.getValue());
@@ -2558,7 +2585,6 @@ public class InternalCatalog implements
CatalogIf<Database> {
partionStoragePolicy = storagePolicy;
}
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy);
-
Partition partition =
createPartitionWithIndices(db.getId(),
olapTable.getId(), olapTable.getName(),
olapTable.getBaseIndexId(), entry.getValue(),
entry.getKey(), olapTable.getIndexIdToMeta(),
partitionDistributionInfo,
@@ -2573,11 +2599,14 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
storeRowColumn, binlogConfigForTask,
- dataProperty.isStorageMediumSpecified(),
keysDesc.getClusterKeysColumnIds());
+ dataProperty.isStorageMediumSpecified(),
+ keysDesc.getClusterKeysColumnIds(),
+ olapTable.getTTLSeconds());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().getDataProperty(partition.getId())
.setStoragePolicy(partionStoragePolicy);
}
+ afterCreatePartitions(olapTable.getId(), null,
olapTable.getIndexIdList());
} else {
throw new DdlException("Unsupported partition method: " +
partitionInfo.getType().name());
}
@@ -2810,7 +2839,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
- Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
+ Tablet tablet =
EnvFactory.createTablet(idGeneratorBuffer.getNextId());
// add tablet to inverted index first
index.addTablet(tablet, tabletMeta);
@@ -2997,6 +3026,19 @@ public class InternalCatalog implements
CatalogIf<Database> {
long bufferSize =
IdGeneratorUtil.getBufferSizeForTruncateTable(copiedTbl,
origPartitions.values());
IdGeneratorBuffer idGeneratorBuffer =
origPartitions.isEmpty() ? null :
Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
+
+ Map<Long, Long> oldToNewPartitionId = new HashMap<Long, Long>();
+ List<Long> newPartitionIds = new ArrayList<Long>();
+ for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
+ long oldPartitionId = entry.getValue();
+ long newPartitionId = idGeneratorBuffer.getNextId();
+ oldToNewPartitionId.put(oldPartitionId, newPartitionId);
+ newPartitionIds.add(newPartitionId);
+ }
+
+ List<Long> indexIds =
copiedTbl.getIndexIdToMeta().keySet().stream().collect(Collectors.toList());
+ beforeCreatePartitions(db.getId(), copiedTbl.getId(),
newPartitionIds, indexIds);
+
for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
// the new partition must use new id
// If we still use the old partition id, the behavior of
current load jobs on this partition
@@ -3004,7 +3046,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// By using a new id, load job will be aborted(just like
partition is dropped),
// which is the right behavior.
long oldPartitionId = entry.getValue();
- long newPartitionId = idGeneratorBuffer.getNextId();
+ long newPartitionId = oldToNewPartitionId.get(oldPartitionId);
Partition newPartition =
createPartitionWithIndices(db.getId(), copiedTbl.getId(),
copiedTbl.getName(), copiedTbl.getBaseIndexId(),
newPartitionId, entry.getKey(),
copiedTbl.getIndexIdToMeta(),
partitionsDistributionInfo.get(oldPartitionId),
@@ -3023,9 +3065,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.storeRowColumn(), binlogConfig,
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
- clusterKeyIdxes);
+ clusterKeyIdxes, olapTable.getTTLSeconds());
newPartitions.add(newPartition);
}
+
+ afterCreatePartitions(copiedTbl.getId(), newPartitionIds,
indexIds);
+
} catch (DdlException e) {
// create partition failed, remove all newly created tablets
for (Long tabletId : tabletIdSet) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c02bce86b89..280cf5ebf77 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3153,7 +3153,14 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// we should ensure the replica backend is alive
// otherwise, there will be a 'unknown node id, id=xxx'
error for stream load
// BE id -> path hash
- Multimap<Long, Long> bePathsMap =
tablet.getNormalReplicaBackendPathMap();
+ Multimap<Long, Long> bePathsMap;
+ try {
+ bePathsMap = tablet.getNormalReplicaBackendPathMap();
+ } catch (UserException ex) {
+
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
+ result.setStatus(errorStatus);
+ return result;
+ }
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 78297aae83a..62511e25e47 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -83,6 +83,9 @@ public class SystemInfoService {
public static final String NO_SCAN_NODE_BACKEND_AVAILABLE_MSG = "There is
no scanNode Backend available.";
+ public static final String NOT_USING_VALID_CLUSTER_MSG = "Not using valid
cloud clusters, "
+ + "please use a cluster before issuing any queries";
+
private volatile ImmutableMap<Long, Backend> idToBackendRef =
ImmutableMap.of();
private volatile ImmutableMap<Long, AtomicLong> idToReportVersionRef =
ImmutableMap.of();
// TODO(gavin): use {clusterId -> List<BackendId>} instead to reduce risk
of inconsistency
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
index ddd3b9441a5..1a939457f5e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvFactoryTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.catalog.CloudReplica;
+import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.InternalCatalog;
@@ -42,6 +43,8 @@ public class EnvFactoryTest {
Assert.assertFalse(EnvFactory.createInternalCatalog() instanceof
CloudInternalCatalog);
Assert.assertTrue(EnvFactory.createPartition() instanceof Partition);
Assert.assertFalse(EnvFactory.createPartition() instanceof
CloudPartition);
+ Assert.assertTrue(EnvFactory.createTablet() instanceof Tablet);
+ Assert.assertFalse(EnvFactory.createTablet() instanceof CloudTablet);
Assert.assertTrue(EnvFactory.createReplica() instanceof Replica);
Assert.assertFalse(EnvFactory.createReplica() instanceof CloudReplica);
@@ -49,6 +52,7 @@ public class EnvFactoryTest {
Assert.assertTrue(EnvFactory.createEnv(false) instanceof CloudEnv);
Assert.assertTrue(EnvFactory.createInternalCatalog() instanceof
CloudInternalCatalog);
Assert.assertTrue(EnvFactory.createPartition() instanceof
CloudPartition);
+ Assert.assertTrue(EnvFactory.createTablet() instanceof CloudTablet);
Assert.assertTrue(EnvFactory.createReplica() instanceof CloudReplica);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]