This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7754791146e [improvement](disk balance) Prevent duplicate disk balance 
tasks afte… (#25990)
7754791146e is described below

commit 7754791146e36efd53b5fe271189615c1206f9e2
Author: deardeng <[email protected]>
AuthorDate: Fri Nov 10 10:14:42 2023 +0800

    [improvement](disk balance) Prevent duplicate disk balance tasks afte… 
(#25990)
---
 be/src/agent/task_worker_pool.cpp                  | 10 ++++++---
 .../org/apache/doris/clone/DiskRebalancer.java     | 26 ++++++++++++++++++++++
 .../org/apache/doris/clone/TabletScheduler.java    | 13 +++++++++++
 3 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 6717af3ce4b..698a71aec16 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1949,6 +1949,10 @@ void 
StorageMediumMigrateTaskPool::_storage_medium_migrate_worker_thread_callbac
             EngineStorageMigrationTask engine_task(tablet, dest_store);
             status = StorageEngine::instance()->execute_task(&engine_task);
         }
+        // fe should ignore this err
+        if (status.is<FILE_ALREADY_EXIST>()) {
+            status = Status::OK();
+        }
         if (!status.ok()) {
             LOG_WARNING("failed to migrate storage medium")
                     .tag("signature", agent_task_req.signature)
@@ -2011,8 +2015,9 @@ Status 
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
         *dest_store = stores[0];
     }
     if (tablet->data_dir()->path() == (*dest_store)->path()) {
-        return Status::InternalError("tablet is already on specified path {}",
-                                     tablet->data_dir()->path());
+        LOG_WARNING("tablet is already on specified path").tag("path", 
tablet->data_dir()->path());
+        return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on 
specified path: {}",
+                                                        
tablet->data_dir()->path());
     }
 
     // check local disk capacity
@@ -2021,7 +2026,6 @@ Status 
StorageMediumMigrateTaskPool::_check_migrate_request(const TStorageMedium
         return Status::InternalError("reach the capacity limit of path {}, 
tablet_size={}",
                                      (*dest_store)->path(), tablet_size);
     }
-
     return Status::OK();
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
index 5edca914441..0a7ce1b8f54 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DiskRebalancer.java
@@ -120,6 +120,7 @@ public class DiskRebalancer extends Rebalancer {
     @Override
     protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
             LoadStatisticForTag clusterStat, TStorageMedium medium) {
+        LOG.info("dx test enter selectAlternativeTabletsForCluster");
         List<TabletSchedCtx> alternativeTablets = Lists.newArrayList();
 
         // get classification of backends
@@ -185,7 +186,17 @@ public class DiskRebalancer extends Rebalancer {
             Set<Long> pathHigh = Sets.newHashSet();
             // we only select tablets from available high load path
             beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, medium);
+            LOG.info("dx test select before low={} mid={} high={} medium={}", 
pathLow, pathMid, pathHigh, medium);
             // check if BE has low and high paths for balance after 
reclassification
+            pathHigh.add(-2606726262674133323L);
+            pathHigh.add(384536254535458899L);
+            pathHigh.add(528047762753362128L);
+            pathLow.add(1252949013258184268L);
+            pathMid.remove(384536254535458899L);
+            pathMid.remove(528047762753362128L);
+            pathMid.remove(-2606726262674133323L);
+            pathMid.remove(1252949013258184268L);
+            LOG.info("dx test select after low={} mid={} high={} medium={}", 
pathLow, pathMid, pathHigh, medium);
             if (!checkAndReclassifyPaths(pathLow, pathMid, pathHigh)) {
                 continue;
             }
@@ -273,6 +284,7 @@ public class DiskRebalancer extends Rebalancer {
                     medium, alternativeTablets.size(),
                     
alternativeTablets.stream().mapToLong(TabletSchedCtx::getTabletId).toArray());
         }
+        LOG.info("dx test out selectAlternativeTabletsForCluster, 
alternativeTablets={}", alternativeTablets);
         return alternativeTablets;
     }
 
@@ -284,6 +296,7 @@ public class DiskRebalancer extends Rebalancer {
      */
     @Override
     public void completeSchedCtx(TabletSchedCtx tabletCtx) throws 
SchedException {
+        LOG.info("dx test enter completeSchedCtx");
         LoadStatisticForTag clusterStat = statisticMap.get(tabletCtx.getTag());
         if (clusterStat == null) {
             throw new SchedException(Status.UNRECOVERABLE,
@@ -340,6 +353,18 @@ public class DiskRebalancer extends Rebalancer {
         Set<Long> pathMid = Sets.newHashSet();
         Set<Long> pathHigh = Sets.newHashSet();
         beStat.getPathStatisticByClass(pathLow, pathMid, pathHigh, 
tabletCtx.getStorageMedium());
+        LOG.info("dx test complete before low={} mid={} high={} medium={}",
+                pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
+        pathHigh.add(-2606726262674133323L);
+        pathHigh.add(384536254535458899L);
+        pathHigh.add(528047762753362128L);
+        pathLow.add(1252949013258184268L);
+        pathMid.remove(384536254535458899L);
+        pathMid.remove(528047762753362128L);
+        pathMid.remove(-2606726262674133323L);
+        pathMid.remove(1252949013258184268L);
+        LOG.info("dx test complete after low={} mid={} high={} medium={}",
+                pathLow, pathMid, pathHigh, tabletCtx.getStorageMedium());
         if (pathHigh.contains(replica.getPathHash())) {
             pathLow.addAll(pathMid);
         } else if (!pathMid.contains(replica.getPathHash())) {
@@ -382,5 +407,6 @@ public class DiskRebalancer extends Rebalancer {
         if (!setDest) {
             throw new SchedException(Status.UNRECOVERABLE, "unable to find low 
load path");
         }
+        LOG.info("dx test out completeSchedCtx");
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
index 1d4592501f2..beee677d2cc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java
@@ -79,6 +79,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
@@ -615,6 +616,17 @@ public class TabletScheduler extends MasterDaemon {
         }
     }
 
+    public void updateDestPathHash(TabletSchedCtx tabletCtx) {
+        // find dest replica
+        Optional<Replica> destReplica = tabletCtx.getReplicas()
+                .stream().filter(replica -> replica.getBackendId() == 
tabletCtx.getDestBackendId()).findAny();
+        if (destReplica.isPresent() && tabletCtx.getDestPathHash() != -1) {
+            LOG.info("dx test success report old {} : new {}",
+                    destReplica.get().getPathHash(), 
tabletCtx.getDestPathHash());
+            destReplica.get().setPathHash(tabletCtx.getDestPathHash());
+        }
+    }
+
     public void updateDiskBalanceLastSuccTime(long beId, long pathHash) {
         PathSlot pathSlot = backendsWorkingSlots.get(beId);
         if (pathSlot == null) {
@@ -1642,6 +1654,7 @@ public class TabletScheduler extends MasterDaemon {
             // if we have a success task, then stat must be refreshed before 
schedule a new task
             updateDiskBalanceLastSuccTime(tabletCtx.getSrcBackendId(), 
tabletCtx.getSrcPathHash());
             updateDiskBalanceLastSuccTime(tabletCtx.getDestBackendId(), 
tabletCtx.getDestPathHash());
+            updateDestPathHash(tabletCtx);
             finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.FINISHED, 
Status.FINISHED, "finished");
         } else {
             finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.CANCELLED, 
Status.UNRECOVERABLE,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to