This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 43c8ec58925 [fix](cloud) port CloudLoadManager#removeCopyJobs() to
master (#35915)
43c8ec58925 is described below
commit 43c8ec58925377a4b6687cd261622695326121fa
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Jun 8 23:51:45 2024 +0800
[fix](cloud) port CloudLoadManager#removeCopyJobs() to master (#35915)
Remove completed CopyJobs when copy job count per table exceeding
`cloud_max_copy_job_per_table`.
To prevent memory leak.
---
.../apache/doris/cloud/load/CloudLoadManager.java | 101 +++++++++++++++++++++
1 file changed, 101 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index f4543c6b066..9a8ea0fa4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
@@ -42,13 +43,18 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.DataInput;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -256,5 +262,100 @@ public class CloudLoadManager extends LoadManager {
}
return loadJobList2;
}
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ removeCopyJobs();
+ }
+
+ @Override
+ public void removeOldLoadJob() {
+ super.removeOldLoadJob();
+ removeCopyJobs();
+ }
+
+ private void removeCopyJobs() {
+ if (Config.cloud_max_copy_job_per_table <= 0) {
+ return;
+ }
+ Map<Long, Set<String>> dbToLabels = new HashMap<>();
+ readLock();
+ long start = System.currentTimeMillis();
+ try {
+ // group jobs by table
+ Map<String, List<LoadJob>> tableToLoadJobs =
dbIdToLabelToLoadJobs.values().stream()
+ .flatMap(loadJobsMap -> loadJobsMap.values().stream())
+ .flatMap(loadJobs -> loadJobs.stream())
+ .filter(loadJob -> (loadJob instanceof CopyJob) &&
StringUtils.isNotEmpty(
+ ((CopyJob) loadJob).getTableName()))
+ .map(copyJob -> Pair.of(copyJob.getDbId() + "#" +
((CopyJob) copyJob).getTableName(), copyJob))
+ .collect(Collectors.groupingBy(v -> v.first,
+ Collectors.mapping(jobPairs -> jobPairs.second,
Collectors.toList())));
+ // find labels to remove
+ for (List<LoadJob> jobs : tableToLoadJobs.values()) {
+ if (jobs.size() <= Config.cloud_max_copy_job_per_table) {
+ continue;
+ }
+ jobs.sort((o1, o2) -> Long.compare(o2.getFinishTimestamp(),
o1.getFinishTimestamp()));
+ int finishJobCount = 0;
+ boolean found = false;
+ for (LoadJob job : jobs) {
+ if (!found) {
+ if (job.getState() == JobState.FINISHED) {
+ finishJobCount++;
+ if (finishJobCount >=
Config.cloud_max_copy_job_per_table) {
+ found = true;
+ }
+ }
+ } else {
+ if (job.isCompleted()) {
+ dbToLabels.computeIfAbsent(job.getDbId(), (k) ->
new HashSet<>()).add(job.getLabel());
+ }
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to remove copy jobs", e);
+ } finally {
+ readUnlock();
+ }
+ if (dbToLabels.isEmpty()) {
+ return;
+ }
+ writeLock();
+ long copyJobNum = idToLoadJob.size();
+ try {
+ for (Map.Entry<Long, Set<String>> entry : dbToLabels.entrySet()) {
+ long dbId = entry.getKey();
+ if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+ continue;
+ }
+ Map<String, List<LoadJob>> labelToJob =
dbIdToLabelToLoadJobs.get(dbId);
+ for (String label : entry.getValue()) {
+ List<LoadJob> jobs = labelToJob.get(label);
+ if (jobs == null) {
+ continue;
+ }
+ Iterator<LoadJob> iter = jobs.iterator();
+ while (iter.hasNext()) {
+ CopyJob job = (CopyJob) iter.next();
+ iter.remove();
+ idToLoadJob.remove(job.getId());
+ job.recycleProgress();
+ }
+ if (jobs.isEmpty()) {
+ labelToJob.remove(label);
+ }
+ }
+ }
+ LOG.info("remove copy jobs from {} to {}, cost={}ms", copyJobNum,
idToLoadJob.size(),
+ System.currentTimeMillis() - start);
+ } catch (Throwable e) {
+ LOG.warn("Failed to remove copy jobs", e);
+ } finally {
+ writeUnlock();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]