This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 49948419391 branch-3.0: [fix](backup) Automatic adapt upload/download
snapshot batch size #44560 (#44639)
49948419391 is described below
commit 499484193918c714c820a5ba471f59372c65186b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 27 10:21:58 2024 +0800
branch-3.0: [fix](backup) Automatic adapt upload/download snapshot batch
size #44560 (#44639)
Cherry-picked from #44560
Co-authored-by: walter <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 13 ++++----
.../java/org/apache/doris/backup/BackupJob.java | 17 ++++------
.../java/org/apache/doris/backup/RestoreJob.java | 37 +++++++---------------
3 files changed, 25 insertions(+), 42 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 7b23272bbb5..4b3e5bc0a3e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2724,16 +2724,17 @@ public class Config extends ConfigBase {
public static String nereids_trace_log_dir = System.getenv("LOG_DIR") +
"/nereids_trace";
@ConfField(mutable = true, masterOnly = true, description = {
- "备份过程中,分配给每个be的upload任务最大个数,默认值为3个。",
- "The max number of upload tasks assigned to each be during the
backup process, the default value is 3."
+ "备份过程中,一个 upload 任务上传的快照数量上限,默认值为10个",
+ "The max number of snapshots assigned to a upload task during the
backup process, the default value is 10."
})
- public static int backup_upload_task_num_per_be = 3;
+ public static int backup_upload_snapshot_batch_size = 10;
@ConfField(mutable = true, masterOnly = true, description = {
- "恢复过程中,分配给每个be的download任务最大个数,默认值为3个。",
- "The max number of download tasks assigned to each be during the
restore process, the default value is 3."
+ "恢复过程中,一个 download 任务下载的快照数量上限,默认值为10个",
+ "The max number of snapshots assigned to a download task during
the restore process, "
+ + "the default value is 10."
})
- public static int restore_download_task_num_per_be = 3;
+ public static int restore_download_snapshot_batch_size = 10;
@ConfField(mutable = true, masterOnly = true, description = {
"备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 478e8902d7d..621a2b1d9f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -751,13 +751,10 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
- int batchNum = totalNum;
- if (Config.backup_upload_task_num_per_be > 0) {
- batchNum = Math.min(totalNum,
Config.backup_upload_task_num_per_be);
- }
// each task contains several upload sub tasks
- int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
- LOG.info("backend {} has {} batch, total {} tasks, {}", beId,
batchNum, totalNum, this);
+ int taskNumPerBatch = Config.backup_upload_snapshot_batch_size;
+ LOG.info("backend {} has total {} snapshots, per task batch size
{}, {}",
+ beId, totalNum, taskNumPerBatch, this);
List<FsBroker> brokers = Lists.newArrayList();
Status st = repo.getBrokerAddress(beId, env, brokers);
@@ -768,12 +765,10 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
Preconditions.checkState(brokers.size() == 1);
// allot tasks
- int index = 0;
- for (int batch = 0; batch < batchNum; batch++) {
+ for (int index = 0; index < totalNum; index += taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
- int currentBatchTaskNum = (batch == batchNum - 1) ? totalNum -
index : taskNumPerBatch;
- for (int j = 0; j < currentBatchTaskNum; j++) {
- SnapshotInfo info = infos.get(index++);
+ for (int j = 0; j < taskNumPerBatch && index + j < totalNum;
j++) {
+ SnapshotInfo info = infos.get(index + j);
String src = info.getTabletPath();
String dest = repo.getRepoTabletPathBySnapshotInfo(label,
info);
if (dest == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index e922955c5aa..a984eed5950 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -1686,16 +1686,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos =
beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
- int batchNum = totalNum;
- if (Config.restore_download_task_num_per_be > 0) {
- batchNum = Math.min(totalNum,
Config.restore_download_task_num_per_be);
- }
// each task contains several upload sub tasks
- int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
- if (LOG.isDebugEnabled()) {
- LOG.debug("backend {} has {} batch, total {} tasks,
{}",
- beId, batchNum, totalNum, this);
- }
+ int taskNumPerBatch =
Config.restore_download_snapshot_batch_size;
+ LOG.info("backend {} has total {} snapshots, per task
batch size {}, {}",
+ beId, totalNum, taskNumPerBatch, this);
List<FsBroker> brokerAddrs = null;
brokerAddrs = Lists.newArrayList();
@@ -1707,12 +1701,10 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
Preconditions.checkState(brokerAddrs.size() == 1);
// allot tasks
- int index = 0;
- for (int batch = 0; batch < batchNum; batch++) {
+ for (int index = 0; index < totalNum; index +=
taskNumPerBatch) {
Map<String, String> srcToDest = Maps.newHashMap();
- int currentBatchTaskNum = (batch == batchNum - 1) ?
totalNum - index : taskNumPerBatch;
- for (int j = 0; j < currentBatchTaskNum; j++) {
- SnapshotInfo info = beSnapshotInfos.get(index++);
+ for (int j = 0; j < taskNumPerBatch && index + j <
totalNum; j++) {
+ SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND,
"restored table "
@@ -1846,22 +1838,17 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> beSnapshotInfos =
beToSnapshots.get(beId);
int totalNum = beSnapshotInfos.size();
- int batchNum = totalNum;
- if (Config.restore_download_task_num_per_be > 0) {
- batchNum = Math.min(totalNum,
Config.restore_download_task_num_per_be);
- }
// each task contains several upload sub tasks
- int taskNumPerBatch = Math.max(totalNum / batchNum, 1);
+ int taskNumPerBatch =
Config.restore_download_snapshot_batch_size;
+ LOG.info("backend {} has total {} snapshots, per task
batch size {}, {}",
+ beId, totalNum, taskNumPerBatch, this);
// allot tasks
- int index = 0;
- for (int batch = 0; batch < batchNum; batch++) {
+ for (int index = 0; index < totalNum; index +=
taskNumPerBatch) {
List<TRemoteTabletSnapshot> remoteTabletSnapshots =
Lists.newArrayList();
- int currentBatchTaskNum = (batch == batchNum - 1) ?
totalNum - index : taskNumPerBatch;
- for (int j = 0; j < currentBatchTaskNum; j++) {
+ for (int j = 0; j < taskNumPerBatch && index + j <
totalNum; j++) {
TRemoteTabletSnapshot remoteTabletSnapshot = new
TRemoteTabletSnapshot();
-
- SnapshotInfo info = beSnapshotInfos.get(index++);
+ SnapshotInfo info = beSnapshotInfos.get(index + j);
Table tbl = db.getTableNullable(info.getTblId());
if (tbl == null) {
status = new Status(ErrCode.NOT_FOUND,
"restored table "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]