This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d41dd79d6c3 branch-3.1: [refactor](cloud) separate cloud restore
create tablet rpc into multiple batches (#55691)
d41dd79d6c3 is described below
commit d41dd79d6c35403dcd36e2c1c1c788eebadb9e4a
Author: xy720 <[email protected]>
AuthorDate: Fri Sep 5 18:23:57 2025 +0800
branch-3.1: [refactor](cloud) separate cloud restore create tablet rpc into
multiple batches (#55691)
pick #54933
---
cloud/src/recycler/recycler.cpp | 3 +-
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../java/org/apache/doris/catalog/OlapTable.java | 4 +-
.../apache/doris/cloud/backup/CloudRestoreJob.java | 117 ++++++++++++---------
4 files changed, 77 insertions(+), 53 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 8e19c9ccd19..8faa59e7ce7 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -996,7 +996,8 @@ int64_t calculate_restore_job_expired_time(
const std::string& instance_id_, const RestoreJobCloudPB& restore_job,
int64_t* earlest_ts /* restore job earliest expiration ts */) {
if (config::force_immediate_recycle || restore_job.state() ==
RestoreJobCloudPB::DROPPED ||
- restore_job.state() == RestoreJobCloudPB::COMPLETED) {
+ restore_job.state() == RestoreJobCloudPB::COMPLETED ||
+ restore_job.state() == RestoreJobCloudPB::RECYCLING) {
// final state, recycle immediately
return 0L;
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1e1785bb5f7..3f6577e3669 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1658,6 +1658,12 @@ public class Config extends ConfigBase {
"Whether to enable cloud restore job."}, varType =
VariableAnnotation.EXPERIMENTAL)
public static boolean enable_cloud_restore_job = false;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "存算分离恢复过程中,一次 create tablets rpc 创建的 tablet 数量上限,默认值为256个",
+ "During the cloud restore job, the maximum number of tablets created
by one "
+ + "create tablets RPC, 256 by default."})
+ public static int cloud_restore_create_tablet_batch_size = 256;
+
/**
* Control the default max num of the instance for a user.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 50bfcead7b5..da1bfb0fadf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -697,7 +697,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
public void resetVersionForRestore() {
for (Partition partition : idToPartition.values()) {
- partition.setNextVersion(partition.getVisibleVersion() + 1);
+ partition.setNextVersion(partition.getCachedVisibleVersion() + 1);
}
}
@@ -793,7 +793,7 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
if (Config.isCloudMode()) {
long newReplicaId = Env.getCurrentEnv().getNextId();
Replica replica = new CloudReplica(newReplicaId, null,
ReplicaState.NORMAL,
- visibleVersion, schemaHash, db.getId(), id,
partition.getId(), idx.getId(), i);
+ visibleVersion, schemaHash, db.getId(), id,
entry.getKey(), idx.getId(), i);
newTablet.addReplica(replica, true /* is restore */);
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
index 997a44a031f..4f0a7bce1b9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java
@@ -47,6 +47,7 @@ import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
@@ -85,7 +86,7 @@ public class CloudRestoreJob extends RestoreJob {
private String cloudClusterId = null;
- private Map<OlapTable, Cloud.CreateTabletsRequest.Builder> tabletsPerTable
= new HashMap<>();
+ private Map<Pair<OlapTable, Partition>,
List<Cloud.CreateTabletsRequest.Builder>> tabletRequests = new HashMap<>();
public enum MetaSeriviceOperation {
PREPARE,
@@ -190,14 +191,18 @@ public class CloudRestoreJob extends RestoreJob {
handleMetaObject(MetaSeriviceOperation.PREPARE);
// send create tablets requests
boolean needSetStorageVault = ((CloudEnv)
Env.getCurrentEnv()).getEnableStorageVault();
- for (Map.Entry<OlapTable, Cloud.CreateTabletsRequest.Builder>
entry : tabletsPerTable.entrySet()) {
- OlapTable table = entry.getKey();
- Cloud.CreateTabletsRequest.Builder requestBuilder =
entry.getValue();
- Cloud.CreateTabletsResponse resp =
sendCreateTabletsRequests(requestBuilder, table,
- needSetStorageVault);
- if (resp.hasStorageVaultId()) {
- storageVaultId = resp.getStorageVaultId();
- needSetStorageVault = false;
+ for (Map.Entry<Pair<OlapTable, Partition>,
List<Cloud.CreateTabletsRequest.Builder>> entry
+ : tabletRequests.entrySet()) {
+ Pair<OlapTable, Partition> tableToPartition = entry.getKey();
+ OlapTable table = tableToPartition.first;
+ List<Cloud.CreateTabletsRequest.Builder> requestBuilders =
entry.getValue();
+ for (Cloud.CreateTabletsRequest.Builder requestBuilder :
requestBuilders) {
+ Cloud.CreateTabletsResponse resp =
sendCreateTabletsRequests(requestBuilder, table,
+ needSetStorageVault);
+ if (resp.hasStorageVaultId()) {
+ storageVaultId = resp.getStorageVaultId();
+ needSetStorageVault = false;
+ }
}
}
// set storage vault for new restoring table
@@ -214,7 +219,7 @@ public class CloudRestoreJob extends RestoreJob {
} catch (Exception e) {
status = new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} finally {
- tabletsPerTable.clear();
+ tabletRequests.clear();
}
}
@@ -352,51 +357,60 @@ public class CloudRestoreJob extends RestoreJob {
public void createReplicas(Database db, OlapTable localTbl, Partition
restorePart,
Map<Long, TabletRef> tabletBases) {
List<String> rowStoreColumns =
localTbl.getTableProperty().getCopiedRowStoreColumns();
- Cloud.CreateTabletsRequest.Builder requestBuilder =
tabletsPerTable.computeIfAbsent(localTbl,
- r -> Cloud.CreateTabletsRequest.newBuilder());
+ List<Cloud.CreateTabletsRequest.Builder> requestBuilders =
tabletRequests.computeIfAbsent(
+ Pair.of(localTbl, restorePart), r -> Lists.newArrayList());
for (MaterializedIndex restoredIdx :
restorePart.getMaterializedIndices(MaterializedIndex.IndexExtState
.VISIBLE)) {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() ==
localTbl.getBaseIndexId()
? localTbl.getCopiedIndexes() : null;
- for (Tablet restoreTablet : restoredIdx.getTablets()) {
- try {
- requestBuilder.addTabletMetas(((CloudInternalCatalog)
Env.getCurrentInternalCatalog())
- .createTabletMetaBuilder(localTbl.getId(),
restoredIdx.getId(),
- restorePart.getId(), restoreTablet,
-
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
- indexMeta.getSchemaHash(),
indexMeta.getKeysType(),
- indexMeta.getShortKeyColumnCount(),
localTbl.getCopiedBfColumns(),
- localTbl.getBfFpp(), indexes,
indexMeta.getSchema(), localTbl.getDataSortInfo(),
- localTbl.getCompressionType(),
localTbl.getStoragePolicy(),
- localTbl.isInMemory(), false,
localTbl.getName(), localTbl.getTTLSeconds(),
- localTbl.getEnableUniqueKeyMergeOnWrite(),
localTbl.storeRowColumn(),
- localTbl.getBaseSchemaVersion(),
localTbl.getCompactionPolicy(),
-
localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
-
localTbl.getTimeSeriesCompactionFileCountThreshold(),
-
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
-
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
-
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.disableAutoCompaction(),
-
localTbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
- localTbl.getEnableMowLightDelete(),
- localTbl.getInvertedIndexFileStorageFormat(),
- localTbl.rowStorePageSize(),
- localTbl.variantEnableFlattenNested(),
- localTbl.storagePageSize(),
localTbl.getTDEAlgorithmPB(),
- localTbl.storageDictPageSize(), false));
- // In cloud mode all storage medium will be saved to HDD.
- TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD);
-
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
-
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(),
- restoreTablet.getReplicaByBackendId(-1));
- } catch (Exception e) {
- String errMsg = String.format("create tablet meta builder
failed, errMsg:%s, local table:%d, "
- + "restore partition=%d, restore index=%d, restore
tablet=%d", e.getMessage(),
- localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), restoreTablet.getId());
- status = new Status(Status.ErrCode.COMMON_ERROR, errMsg);
+ int maxCreateTabletBatchSize =
Config.cloud_restore_create_tablet_batch_size;
+ List<Tablet> restoreTablets = restoredIdx.getTablets();
+ for (int i = 0; i < restoreTablets.size(); i +=
maxCreateTabletBatchSize) {
+ int end = Math.min(i + maxCreateTabletBatchSize,
restoreTablets.size());
+ List<Tablet> subRestoreTablets = restoreTablets.subList(i,
end);
+ Cloud.CreateTabletsRequest.Builder requestBuilder =
Cloud.CreateTabletsRequest.newBuilder();
+ for (Tablet restoreTablet : subRestoreTablets) {
+ try {
+ requestBuilder.addTabletMetas(((CloudInternalCatalog)
Env.getCurrentInternalCatalog())
+ .createTabletMetaBuilder(localTbl.getId(),
restoredIdx.getId(),
+ restorePart.getId(), restoreTablet,
+
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
+ indexMeta.getSchemaHash(),
indexMeta.getKeysType(),
+ indexMeta.getShortKeyColumnCount(),
localTbl.getCopiedBfColumns(),
+ localTbl.getBfFpp(), indexes,
indexMeta.getSchema(), localTbl.getDataSortInfo(),
+ localTbl.getCompressionType(),
localTbl.getStoragePolicy(),
+ localTbl.isInMemory(), false,
localTbl.getName(), localTbl.getTTLSeconds(),
+ localTbl.getEnableUniqueKeyMergeOnWrite(),
localTbl.storeRowColumn(),
+ localTbl.getBaseSchemaVersion(),
localTbl.getCompactionPolicy(),
+
localTbl.getTimeSeriesCompactionGoalSizeMbytes(),
+
localTbl.getTimeSeriesCompactionFileCountThreshold(),
+
localTbl.getTimeSeriesCompactionTimeThresholdSeconds(),
+
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
+
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.disableAutoCompaction(),
+
localTbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
+ localTbl.getEnableMowLightDelete(),
+
localTbl.getInvertedIndexFileStorageFormat(),
+ localTbl.rowStorePageSize(),
+ localTbl.variantEnableFlattenNested(),
+ localTbl.storagePageSize(),
localTbl.getTDEAlgorithmPB(),
+ localTbl.storageDictPageSize(), false));
+ // In cloud mode all storage medium will be saved to
HDD.
+ TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
+ restoredIdx.getId(),
indexMeta.getSchemaHash(), TStorageMedium.HDD);
+
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
+
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(),
+ restoreTablet.getReplicaByBackendId(-1));
+ } catch (Exception e) {
+ String errMsg = String.format("create tablet meta
builder failed, errMsg:%s, local table:%d, "
+ + "restore partition=%d, restore index=%d,
restore tablet=%d", e.getMessage(),
+ localTbl.getId(), restorePart.getId(),
restoredIdx.getId(), restoreTablet.getId());
+ status = new Status(Status.ErrCode.COMMON_ERROR,
errMsg);
+ return;
+ }
}
+ requestBuilders.add(requestBuilder);
}
}
}
@@ -515,8 +529,11 @@ public class CloudRestoreJob extends RestoreJob {
+ "vault name=%s, errMsg=%s", dbId, olapTable.getName(),
storageVaultName, e.getMessage());
throw new DdlException(errMsg);
}
- LOG.info("cloud restore job restore tablets, dbId: {}, tableName: {},
vault name: {}", dbId,
- olapTable.getName(), storageVaultName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("cloud restore job restore tablets, dbId: {}, tableName:
{}, vault name: {},"
+ + "tablet created: {}", dbId, olapTable.getName(),
storageVaultName,
+ requestBuilder.getTabletMetasCount());
+ }
return resp;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]