This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new c1bc0615f92 [improve](restore) Split creating replica task by table id 
(#42239)
c1bc0615f92 is described below

commit c1bc0615f927d6978dc23d7e6070586a80c656a3
Author: walter <[email protected]>
AuthorDate: Wed Oct 23 18:52:13 2024 +0800

    [improve](restore) Split creating replica task by table id (#42239)
    
    pick #42235
---
 .../java/org/apache/doris/backup/RestoreJob.java   | 36 ++++++++++++++++------
 1 file changed, 26 insertions(+), 10 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 6e2fb3bcb6a..28bcba4adf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -611,7 +611,7 @@ public class RestoreJob extends AbstractJob {
         Map<Long, TabletRef> tabletBases = new HashMap<>();
 
         // Check and prepare meta objects.
-        AgentBatchTask batchTask = new AgentBatchTask();
+        Map<Long, AgentBatchTask> batchTaskPerTable = new HashMap<>();
         db.readLock();
         try {
             for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : 
jobInfo.backupOlapTableObjects.entrySet()) {
@@ -846,6 +846,11 @@ public class RestoreJob extends AbstractJob {
                 BackupPartitionInfo backupPartitionInfo
                         = 
jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
 
+                AgentBatchTask batchTask = 
batchTaskPerTable.get(localTbl.getId());
+                if (batchTask == null) {
+                    batchTask = new AgentBatchTask();
+                    batchTaskPerTable.put(localTbl.getId(), batchTask);
+                }
                 createReplicas(db, batchTask, localTbl, restorePart);
 
                 genFileMapping(localTbl, restorePart, remoteTbl.getId(), 
backupPartitionInfo,
@@ -857,6 +862,11 @@ public class RestoreJob extends AbstractJob {
                 if (restoreTbl.getType() == TableType.OLAP) {
                     OlapTable restoreOlapTable = (OlapTable) restoreTbl;
                     for (Partition restorePart : 
restoreOlapTable.getPartitions()) {
+                        AgentBatchTask batchTask = 
batchTaskPerTable.get(restoreTbl.getId());
+                        if (batchTask == null) {
+                            batchTask = new AgentBatchTask();
+                            batchTaskPerTable.put(restoreTbl.getId(), 
batchTask);
+                        }
                         createReplicas(db, batchTask, restoreOlapTable, 
restorePart, tabletBases);
                         BackupOlapTableInfo backupOlapTableInfo = 
jobInfo.getOlapTableInfo(restoreOlapTable.getName());
                         genFileMapping(restoreOlapTable, restorePart, 
backupOlapTableInfo.id,
@@ -890,20 +900,26 @@ public class RestoreJob extends AbstractJob {
 
         // Send create replica task to BE outside the db lock
         boolean ok = false;
-        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
-        if (batchTask.getTaskNum() > 0) {
-            for (AgentTask task : batchTask.getAllTasks()) {
-                latch.addMark(task.getBackendId(), task.getTabletId());
-                ((CreateReplicaTask) task).setLatch(latch);
-                AgentTaskQueue.addTask(task);
+        int numBatchTasks = batchTaskPerTable.values()
+                .stream()
+                .mapToInt(AgentBatchTask::getTaskNum)
+                .sum();
+        MarkedCountDownLatch<Long, Long> latch = new 
MarkedCountDownLatch<Long, Long>(numBatchTasks);
+        if (batchTaskPerTable.size() > 0) {
+            for (AgentBatchTask batchTask : batchTaskPerTable.values()) {
+                for (AgentTask task : batchTask.getAllTasks()) {
+                    latch.addMark(task.getBackendId(), task.getTabletId());
+                    ((CreateReplicaTask) task).setLatch(latch);
+                    AgentTaskQueue.addTask(task);
+                }
+                AgentTaskExecutor.submit(batchTask);
             }
-            AgentTaskExecutor.submit(batchTask);
 
             // estimate timeout
-            long timeout = 
DbUtil.getCreateReplicasTimeoutMs(batchTask.getTaskNum());
+            long timeout = DbUtil.getCreateReplicasTimeoutMs(numBatchTasks);
             try {
                 LOG.info("begin to send create replica tasks to BE for 
restore. total {} tasks. timeout: {}",
-                        batchTask.getTaskNum(), timeout);
+                        numBatchTasks, timeout);
                 ok = latch.await(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOG.warn("InterruptedException: ", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to