This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 43c8ec58925 [fix](cloud) port CloudLoadManager#removeCopyJobs() to 
master (#35915)
43c8ec58925 is described below

commit 43c8ec58925377a4b6687cd261622695326121fa
Author: Kaijie Chen <[email protected]>
AuthorDate: Sat Jun 8 23:51:45 2024 +0800

    [fix](cloud) port CloudLoadManager#removeCopyJobs() to master (#35915)
    
    Remove completed CopyJobs when copy job count per table exceeding
    `cloud_max_copy_job_per_table`.
    To prevent memory leak.
---
 .../apache/doris/cloud/load/CloudLoadManager.java  | 101 +++++++++++++++++++++
 1 file changed, 101 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
index f4543c6b066..9a8ea0fa4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CloudLoadManager.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.CaseSensibility;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.PatternMatcherWrapper;
 import org.apache.doris.common.UserException;
@@ -42,13 +43,18 @@ import org.apache.doris.qe.ConnectContext;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -256,5 +262,100 @@ public class CloudLoadManager extends LoadManager {
         }
         return loadJobList2;
     }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        removeCopyJobs();
+    }
+
+    @Override
+    public void removeOldLoadJob() {
+        super.removeOldLoadJob();
+        removeCopyJobs();
+    }
+
+    private void removeCopyJobs() {
+        if (Config.cloud_max_copy_job_per_table <= 0) {
+            return;
+        }
+        Map<Long, Set<String>> dbToLabels = new HashMap<>();
+        readLock();
+        long start = System.currentTimeMillis();
+        try {
+            // group jobs by table
+            Map<String, List<LoadJob>> tableToLoadJobs = 
dbIdToLabelToLoadJobs.values().stream()
+                    .flatMap(loadJobsMap -> loadJobsMap.values().stream())
+                    .flatMap(loadJobs -> loadJobs.stream())
+                    .filter(loadJob -> (loadJob instanceof CopyJob) && 
StringUtils.isNotEmpty(
+                            ((CopyJob) loadJob).getTableName()))
+                    .map(copyJob -> Pair.of(copyJob.getDbId() + "#" + 
((CopyJob) copyJob).getTableName(), copyJob))
+                    .collect(Collectors.groupingBy(v -> v.first,
+                            Collectors.mapping(jobPairs -> jobPairs.second, 
Collectors.toList())));
+            // find labels to remove
+            for (List<LoadJob> jobs : tableToLoadJobs.values()) {
+                if (jobs.size() <= Config.cloud_max_copy_job_per_table) {
+                    continue;
+                }
+                jobs.sort((o1, o2) -> Long.compare(o2.getFinishTimestamp(), 
o1.getFinishTimestamp()));
+                int finishJobCount = 0;
+                boolean found = false;
+                for (LoadJob job : jobs) {
+                    if (!found) {
+                        if (job.getState() == JobState.FINISHED) {
+                            finishJobCount++;
+                            if (finishJobCount >= 
Config.cloud_max_copy_job_per_table) {
+                                found = true;
+                            }
+                        }
+                    } else {
+                        if (job.isCompleted()) {
+                            dbToLabels.computeIfAbsent(job.getDbId(), (k) -> 
new HashSet<>()).add(job.getLabel());
+                        }
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            LOG.warn("Failed to remove copy jobs", e);
+        } finally {
+            readUnlock();
+        }
+        if (dbToLabels.isEmpty()) {
+            return;
+        }
+        writeLock();
+        long copyJobNum = idToLoadJob.size();
+        try {
+            for (Map.Entry<Long, Set<String>> entry : dbToLabels.entrySet()) {
+                long dbId = entry.getKey();
+                if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
+                    continue;
+                }
+                Map<String, List<LoadJob>> labelToJob = 
dbIdToLabelToLoadJobs.get(dbId);
+                for (String label : entry.getValue()) {
+                    List<LoadJob> jobs = labelToJob.get(label);
+                    if (jobs == null) {
+                        continue;
+                    }
+                    Iterator<LoadJob> iter = jobs.iterator();
+                    while (iter.hasNext()) {
+                        CopyJob job = (CopyJob) iter.next();
+                        iter.remove();
+                        idToLoadJob.remove(job.getId());
+                        job.recycleProgress();
+                    }
+                    if (jobs.isEmpty()) {
+                        labelToJob.remove(label);
+                    }
+                }
+            }
+            LOG.info("remove copy jobs from {} to {}, cost={}ms", copyJobNum, 
idToLoadJob.size(),
+                    System.currentTimeMillis() - start);
+        } catch (Throwable e) {
+            LOG.warn("Failed to remove copy jobs", e);
+        } finally {
+            writeUnlock();
+        }
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to