This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 76a8b5911a8 [improvement](disk balance) Prevent duplicate disk balance 
tasks afte… (#25990) (#26745)
76a8b5911a8 is described below

commit 76a8b5911a80d5942e4959f13104e1df3b9e822e
Author: deardeng <[email protected]>
AuthorDate: Sat Nov 11 15:20:21 2023 +0800

    [improvement](disk balance) Prevent duplicate disk balance tasks afte… 
(#25990) (#26745)
---
 be/src/agent/task_worker_pool.cpp                             | 10 +++++++---
 .../src/main/java/org/apache/doris/clone/DiskRebalancer.java  |  2 +-
 .../src/main/java/org/apache/doris/clone/TabletScheduler.java | 11 +++++++++++
 3 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 09c46461aca..24c51ad69e7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1924,6 +1924,10 @@ void 
StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
             EngineStorageMigrationTask engine_task(tablet, dest_store);
             status = _env->storage_engine()->execute_task(&engine_task);
         }
+        // fe should ignore this err
+        if (status.is<FILE_ALREADY_EXIST>()) {
+            status = Status::OK();
+        }
         if (!status.ok()) {
             LOG_WARNING("failed to migrate storage medium")
                     .tag("signature", agent_task_req.signature)
@@ -1986,8 +1990,9 @@ Status 
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
         *dest_store = stores[0];
     }
     if (tablet->data_dir()->path() == (*dest_store)->path()) {
-        return Status::InternalError("tablet is already on specified path {}",
-                                     tablet->data_dir()->path());
+        LOG_WARNING("tablet is already on specified path").tag("path", 
tablet->data_dir()->path());
+        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on 
specified path: {}",
+                                                        
tablet->data_dir()->path());
     }
 
     // check local disk capacity
@@ -1996,7 +2001,6 @@ Status 
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
         return Status::InternalError("reach the capacity limit of path {}, 
tablet_size={}",
                                      (*dest_store)->path(), tablet_size);
     }
-
     return Status::OK();
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 5edca914441..c6ac727a405 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -185,7 +185,6 @@ public class DiskRebalancer extends Rebalancer {
             Set<Long> pathHigh = Sets.newHashSet();
             // we only select tablets from available high load path
             beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
-            // check if BE has low and high paths for balance after 
reclassification
             if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
                 continue;
             }
@@ -382,5 +381,6 @@ public class DiskRebalancer extends Rebalancer {
         if (!setDest) {
             throw new SchedException(Status.UNRECOVERABLE, "unable to find low 
load path");
         }
+        LOG.info("dx test out completeSchedCtx");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index f2542243a75..a286e8bb5f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -78,6 +78,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
@@ -608,6 +609,15 @@ public class TabletScheduler extends MasterDaemon {
         }
     }
 
+    public void updateDestPathHash(TabletSchedCtx tabletCtx) {
+        // find dest replica
+        Optional<Replica> destReplica = tabletCtx.getReplicas()
+                .stream().filter(replica -> replica.getBackendId() == 
tabletCtx.getDestBackendId()).findAny();
+        if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
+            destReplica.get().setPathHash(tabletCtx.getDestPathHash());
+        }
+    }
+
     public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
         PathSlot pathSlot = backendsWorkingSlots.get(beId);
         if (pathSlot == null) {
@@ -1630,6 +1640,7 @@ public class TabletScheduler extends MasterDaemon {
             // if we have a success task, then stat must be refreshed before 
schedule a new task
             updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), 
tabletCtx.getSrcPathHash());
             updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), 
tabletCtx.getDestPathHash());
+            updateDestPathHash(tabletCtx);
             finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, 
Status.FINISHED, "finished");
         } else {
             finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, 
Status.UNRECOVERABLE,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to