This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d27cab04dcb [improve](restore) Split batch creating replica task by
table id #42235 (#42343)
d27cab04dcb is described below
commit d27cab04dcbebbfceaeb237cfe02bbfc371bead0
Author: walter <[email protected]>
AuthorDate: Thu Oct 24 11:34:39 2024 +0800
[improve](restore) Split batch creating replica task by table id #42235
(#42343)
cherry pick from #42235
---
.../java/org/apache/doris/backup/RestoreJob.java | 36 ++++++++++++++++------
1 file changed, 26 insertions(+), 10 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a589b7cef33..c287ca78038 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -656,7 +656,7 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
Map<Long, TabletRef> tabletBases = new HashMap<>();
// Check and prepare meta objects.
- AgentBatchTask batchTask = new AgentBatchTask();
+ Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
db.readLock();
try {
for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry :
jobInfo.backupOlapTableObjects.entrySet()) {
@@ -892,6 +892,11 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
BackupPartitionInfo backupPartitionInfo
=
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
+ AgentBatchTask batchTask =
batchTaskPerTable.get(localTbl.getId());
+ if (batchTask == null) {
+ batchTask = new AgentBatchTask();
+ batchTaskPerTable.put(localTbl.getId(), batchTask);
+ }
createReplicas(db, batchTask, localTbl, restorePart);
genFileMapping(localTbl, restorePart, remoteTbl.getId(),
backupPartitionInfo,
@@ -903,6 +908,11 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
if (restoreTbl.getType() == TableType.OLAP) {
OlapTable restoreOlapTable = (OlapTable) restoreTbl;
for (Partition restorePart :
restoreOlapTable.getPartitions()) {
+ AgentBatchTask batchTask =
batchTaskPerTable.get(restoreTbl.getId());
+ if (batchTask == null) {
+ batchTask = new AgentBatchTask();
+ batchTaskPerTable.put(restoreTbl.getId(),
batchTask);
+ }
createReplicas(db, batchTask, restoreOlapTable,
restorePart, tabletBases);
BackupOlapTableInfo backupOlapTableInfo =
jobInfo.getOlapTableInfo(restoreOlapTable.getName());
genFileMapping(restoreOlapTable, restorePart,
backupOlapTableInfo.id,
@@ -940,20 +950,26 @@ public class RestoreJob extends AbstractJob implements
GsonPostProcessable {
// Send create replica task to BE outside the db lock
boolean ok = false;
- MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
- if (batchTask.getTaskNum() > 0) {
- for (AgentTask task : batchTask.getAllTasks()) {
- latch.addMark(task.getBackendId(), task.getTabletId());
- ((CreateReplicaTask) task).setLatch(latch);
- AgentTaskQueue.addTask(task);
+ int numBatchTasks = batchTaskPerTable.values()
+ .stream()
+ .mapToInt(AgentBatchTask::getTaskNum)
+ .sum();
+ MarkedCountDownLatch<Long, Long> latch = new
MarkedCountDownLatch<Long, Long>(numBatchTasks);
+ if (batchTaskPerTable.size() > 0) {
+ for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
+ for (AgentTask task : batchTask.getAllTasks()) {
+ latch.addMark(task.getBackendId(), task.getTabletId());
+ ((CreateReplicaTask) task).setLatch(latch);
+ AgentTaskQueue.addTask(task);
+ }
+ AgentTaskExecutor.submit(batchTask);
}
- AgentTaskExecutor.submit(batchTask);
// estimate timeout
- long timeout =
DbUtil.getCreateReplicasTimeoutMs(batchTask.getTaskNum());
+ long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks);
try {
LOG.info("begin to send create replica tasks to BE for
restore. total {} tasks. timeout: {}",
- batchTask.getTaskNum(), timeout);
+ numBatchTasks, timeout);
ok = latch.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]