This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4da628deb77 [improve](task) Support splitting agent batch tasks
automatically #42703 (#42989)
4da628deb77 is described below
commit 4da628deb77bbfd7df8206034b8866dc0660098b
Author: walter <[email protected]>
AuthorDate: Thu Oct 31 17:38:30 2024 +0800
[improve](task) Support splitting agent batch tasks automatically #42703
(#42989)
cherry pick from #42703
---
.../main/java/org/apache/doris/common/Config.java | 7 +++++
.../java/org/apache/doris/backup/BackupJob.java | 9 +++---
.../java/org/apache/doris/backup/RestoreJob.java | 14 +++++-----
.../java/org/apache/doris/task/AgentBatchTask.java | 32 ++++++++++++++++++----
4 files changed, 44 insertions(+), 18 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 819f2247158..c91a487466b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2687,6 +2687,13 @@ public class Config extends ConfigBase {
})
public static int restore_download_task_num_per_be = 3;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "备份恢复过程中,单次 RPC 分配给每个be的任务最大个数,默认值为10000个。",
+ "The max number of batched tasks per RPC assigned to each be
during the backup/restore process, "
+ + "the default value is 10000."
+ })
+ public static int backup_restore_batch_task_num_per_rpc = 10000;
+
@ConfField(description = {"是否开启通过http接口获取log文件的功能",
"Whether to enable the function of getting log files through http
interface"})
public static boolean enable_get_log_file_api = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 1ba9fb129b2..b14fae1ed3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -208,11 +208,10 @@ public class BackupJob extends AbstractJob {
task.getIndexId(), task.getTabletId(),
task.getVersion(),
task.getSchemaHash(), timeoutMs, false /* not restore task
*/);
- AgentBatchTask batchTask = new AgentBatchTask();
- batchTask.addTask(newTask);
unfinishedTaskIds.put(tablet.getId(),
replica.getBackendIdWithoutException());
//send task
+ AgentBatchTask batchTask = new AgentBatchTask(newTask);
AgentTaskQueue.addTask(newTask);
AgentTaskExecutor.submit(batchTask);
@@ -474,7 +473,7 @@ public class BackupJob extends AbstractJob {
// copy all related schema at this moment
List<Table> copiedTables = Lists.newArrayList();
List<Resource> copiedResources = Lists.newArrayList();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (TableRef tableRef : tableRefs) {
String tblName = tableRef.getName().getTbl();
Table tbl = db.getTableNullable(tblName);
@@ -729,7 +728,7 @@ public class BackupJob extends AbstractJob {
beToSnapshots.put(info.getBeId(), info);
}
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (Long beId : beToSnapshots.keySet()) {
List<SnapshotInfo> infos = beToSnapshots.get(beId);
int totalNum = infos.size();
@@ -892,7 +891,7 @@ public class BackupJob extends AbstractJob {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null,
info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 94df414a96c..cf43afc6597 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -894,7 +894,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
AgentBatchTask batchTask =
batchTaskPerTable.get(localTbl.getId());
if (batchTask == null) {
- batchTask = new AgentBatchTask();
+ batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(localTbl.getId(), batchTask);
}
createReplicas(db, batchTask, localTbl, restorePart);
@@ -910,7 +910,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
for (Partition restorePart :
restoreOlapTable.getPartitions()) {
AgentBatchTask batchTask =
batchTaskPerTable.get(restoreTbl.getId());
if (batchTask == null) {
- batchTask = new AgentBatchTask();
+ batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
batchTaskPerTable.put(restoreTbl.getId(),
batchTask);
}
createReplicas(db, batchTask, restoreOlapTable,
restorePart, tabletBases);
@@ -1167,7 +1167,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
taskProgress.clear();
taskErrMsg.clear();
Multimap<Long, Long> bePathsMap = HashMultimap.create();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
db.readLock();
try {
for (Map.Entry<IdChain, IdChain> entry :
fileMapping.getMapping().entrySet()) {
@@ -1652,7 +1652,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -1812,7 +1812,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (long dbId : dbToSnapshotInfos.keySet()) {
List<SnapshotInfo> infos = dbToSnapshotInfos.get(dbId);
@@ -1992,7 +1992,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
unfinishedSignatureToId.clear();
taskProgress.clear();
taskErrMsg.clear();
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
// tablet id->(be id -> download info)
for (Cell<Long, Long, SnapshotInfo> cell : snapshotInfos.cellSet()) {
SnapshotInfo info = cell.getValue();
@@ -2182,7 +2182,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
}
// we do not care about the release snapshot tasks' success or failure,
// the GC thread on BE will sweep the snapshot, finally.
- AgentBatchTask batchTask = new AgentBatchTask();
+ AgentBatchTask batchTask = new
AgentBatchTask(Config.backup_restore_batch_task_num_per_rpc);
for (SnapshotInfo info : snapshotInfos.values()) {
ReleaseSnapshotTask releaseTask = new ReleaseSnapshotTask(null,
info.getBeId(), info.getDbId(),
info.getTabletId(), info.getPath());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index bf73f9b83fe..be698776cac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -55,6 +55,7 @@ import org.apache.doris.thrift.TVisibleVersionReq;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -67,6 +68,8 @@ import java.util.Map;
public class AgentBatchTask implements Runnable {
private static final Logger LOG =
LogManager.getLogger(AgentBatchTask.class);
+ private int batchSize = Integer.MAX_VALUE;
+
// backendId -> AgentTask List
private Map<Long, List<AgentTask>> backendIdToTasks;
@@ -74,6 +77,12 @@ public class AgentBatchTask implements Runnable {
this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
}
+ public AgentBatchTask(int batchSize) {
+ this.backendIdToTasks = new HashMap<Long, List<AgentTask>>();
+ this.batchSize = batchSize;
+ assert batchSize > 0;
+ }
+
public AgentBatchTask(AgentTask singleTask) {
this();
addTask(singleTask);
@@ -172,14 +181,12 @@ public class AgentBatchTask implements Runnable {
List<TAgentTaskRequest> agentTaskRequests = new
LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
agentTaskRequests.add(toAgentTaskRequest(task));
- }
- client.submitTasks(agentTaskRequests);
- if (LOG.isDebugEnabled()) {
- for (AgentTask task : tasks) {
- LOG.debug("send task: type[{}], backend[{}],
signature[{}]",
- task.getTaskType(), backendId,
task.getSignature());
+ if (agentTaskRequests.size() >= batchSize) {
+ submitTasks(backendId, client, agentTaskRequests);
+ agentTaskRequests.clear();
}
}
+ submitTasks(backendId, client, agentTaskRequests);
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
@@ -198,6 +205,19 @@ public class AgentBatchTask implements Runnable {
} // end for backend
}
+ private static void submitTasks(long backendId,
+ BackendService.Client client, List<TAgentTaskRequest>
agentTaskRequests) throws TException {
+ if (!agentTaskRequests.isEmpty()) {
+ client.submitTasks(agentTaskRequests);
+ }
+ if (LOG.isDebugEnabled()) {
+ for (TAgentTaskRequest req : agentTaskRequests) {
+ LOG.debug("send task: type[{}], backend[{}], signature[{}]",
+ req.getTaskType(), backendId, req.getSignature());
+ }
+ }
+ }
+
private TAgentTaskRequest toAgentTaskRequest(AgentTask task) {
TAgentTaskRequest tAgentTaskRequest = new TAgentTaskRequest();
tAgentTaskRequest.setProtocolVersion(TAgentServiceVersion.V1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]