This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a78a237997d [fix](merge-cloud) Remove expired loadJob when
replayEndLoadJob (#34020)
a78a237997d is described below
commit a78a237997da88872057feb41d2e5309964410ad
Author: Lei Zhang <[email protected]>
AuthorDate: Tue Apr 23 21:02:17 2024 +0800
[fix](merge-cloud) Remove expired loadJob when replayEndLoadJob (#34020)
* When starting fe with lots of finished job, it maybe `OOM`, so
we remove expired loadJob when replaying editlog
---
.../main/java/org/apache/doris/load/loadv2/LoadManager.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 0b67ba544c6..4249aa88444 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -60,6 +60,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -343,6 +344,11 @@ public class LoadManager implements Writable {
job.unprotectReadEndOperation(operation);
LOG.info(new LogBuilder(LogKey.LOAD_JOB,
operation.getId()).add("operation", operation)
.add("msg", "replay end load job").build());
+
+ // When idToLoadJob size increase 10000 roughly, we run
removeOldLoadJob to reduce mem used
+ if ((idToLoadJob.size() > 0) && (idToLoadJob.size() % 10000 == 0)) {
+ removeOldLoadJob();
+ }
}
/**
@@ -465,6 +471,8 @@ public class LoadManager implements Writable {
}
private void removeLoadJobIf(Predicate<LoadJob> pred) {
+ long removeJobNum = 0;
+ StopWatch stopWatch = StopWatch.createStarted();
writeLock();
try {
Iterator<Map.Entry<Long, LoadJob>> iter =
idToLoadJob.entrySet().iterator();
@@ -473,10 +481,14 @@ public class LoadManager implements Writable {
if (pred.test(job)) {
iter.remove();
jobRemovedTrigger(job);
+ removeJobNum++;
}
}
} finally {
writeUnlock();
+ stopWatch.stop();
+ LOG.info("end to removeOldLoadJob, removeJobNum:{} cost:{} ms",
+ removeJobNum, stopWatch.getTime());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]