This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a78a237997d [fix](merge-cloud) Remove expired loadJob when 
replayEndLoadJob (#34020)
a78a237997d is described below

commit a78a237997da88872057feb41d2e5309964410ad
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Apr 23 21:02:17 2024 +0800

    [fix](merge-cloud) Remove expired loadJob when replayEndLoadJob (#34020)
    
    * When starting fe with lots of finished job, it maybe `OOM`, so
      we remove expired loadJob when replaying editlog
---
 .../main/java/org/apache/doris/load/loadv2/LoadManager.java  | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0b67ba544c6..4249aa88444 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -60,6 +60,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -343,6 +344,11 @@ public class LoadManager implements Writable {
         job.unprotectReadEndOperation(operation);
         LOG.info(new LogBuilder(LogKey.LOAD_JOB, 
operation.getId()).add("operation", operation)
                 .add("msg", "replay end load job").build());
+
+        // When idToLoadJob size increase 10000 roughly, we run 
removeOldLoadJob to reduce mem used
+        if ((idToLoadJob.size() > 0) && (idToLoadJob.size() % 10000 == 0)) {
+            removeOldLoadJob();
+        }
     }
 
     /**
@@ -465,6 +471,8 @@ public class LoadManager implements Writable {
     }
 
     private void removeLoadJobIf(Predicate<LoadJob> pred) {
+        long removeJobNum = 0;
+        StopWatch stopWatch = StopWatch.createStarted();
         writeLock();
         try {
             Iterator<Map.Entry<Long, LoadJob>> iter = 
idToLoadJob.entrySet().iterator();
@@ -473,10 +481,14 @@ public class LoadManager implements Writable {
                 if (pred.test(job)) {
                     iter.remove();
                     jobRemovedTrigger(job);
+                    removeJobNum++;
                 }
             }
         } finally {
             writeUnlock();
+            stopWatch.stop();
+            LOG.info("end to removeOldLoadJob, removeJobNum:{} cost:{} ms",
+                    removeJobNum, stopWatch.getTime());
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to