477972324wlx opened a new issue, #62118:
URL: https://github.com/apache/doris/issues/62118

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   **Doris 4.0.3-rc03**
   The problem occurs both in Cloud Mode and non-Cloud Mode
   
   
   ### What's Wrong?
   
   The stream load api may cause a slow memory leak in LoadManager. 
   **http://FE/api/db/table/_stream_load&group_commit=async_mode**
   
   
   The LoadManager may have a large number of pending tasks
   ```bash
   # Check the count of PENDING tasks in LoadManager
   [arthas@2844843]$ ognl 
'@org.apache.doris.catalog.Env@getCurrentEnv().getLoadManager().idToLoadJob.values().{?
 #this.getState().name() == "PENDING"}.size()'
   @Integer[4048847]
   ```
   
   Looking at the task details, many of these PENDING jobs were created several 
days ago and have not been purged, despite the **label_keep_max_second** being 
set to 600 seconds.
   ```bash
   # Sampling PENDING tasks (showing State--CreateTimestamp:Label#Id)
   [arthas@2844843]$ ognl 
'#[email protected]@getCurrentEnv().getLoadManager().idToLoadJob.values().{?
 #this.getState().name() == "PENDING"}, #size=#list.size(), #end=#size > 10 ? 
10 : #size, new java.util.ArrayList(#list.subList(0, 
#end)).{#this.getState().name()+"--"+#this.getCreateTimestamp() + ":" + 
#this.getLabel()+"#"+#this.getId()}'
   
   @ArrayList[
       
@String[PENDING--1774695394786:group_commit_834ce3a6d7aa549c_0ec5c85be1a6128d#1774602956553],
       
@String[PENDING--1774695393686:group_commit_1f4c5fa642f4e779_ddcc3cc03a68428a#1774602956552],
       
@String[PENDING--1774695395590:group_commit_274264d3e3c52a75_c3abcdab7dc1d7bd#1774602956555],
       ...
   ]
   ```
   
   And In the source Code, it seems that a **PENDING** task will be never 
removed
   ```java
       public void removeOldLoadJob() {
           long currentTimeMs = System.currentTimeMillis();
           removeLoadJobIf(job -> job.isExpired(currentTimeMs));
       }
   
       public boolean isExpired(long currentTimeMs) {
          // Logic Issue: If the job remains in PENDING state, isCompleted() is 
false,
           // so the job will NEVER be removed by the cleaner.
           if (!isCompleted()) {
               return false;
           }
           long expireTime = Config.label_keep_max_second;
           if (jobType == EtlJobType.INSERT) {
               expireTime = Config.streaming_label_keep_max_second;
           }
   
           return (currentTimeMs - getFinishTimestamp()) / 1000 > expireTime;
       }
   
       public boolean isCompleted() {
           return state == JobState.FINISHED || state == JobState.CANCELLED || 
state == JobState.UNKNOWN;
       }
   ```
   
   I found the place where the job is loaded
   ```java
      public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
               Optional<InsertCommandContext> insertCtx, boolean emptyInsert, 
long jobId) {
           this.ctx = ctx;
           this.database = table.getDatabase();
           this.insertLoadJob = new InsertLoadJob(database.getId(), labelName, 
jobId);
           // Do not add load job if job id is -1.
           if (jobId != -1) {
               ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
           } 
           this.coordinator = EnvFactory.getInstance().createCoordinator(
                   ctx, planner, ctx.getStatsErrorEstimator(), 
insertLoadJob.getId());
           this.labelName = labelName;
           this.table = table;
           this.insertCtx = insertCtx;
           this.emptyInsert = emptyInsert;
           this.jobId = jobId;
       }
   ```
   
   I tried this, do not load job if it is groupCommit, but doesn't work.
   ```java
           if (jobId != -1 && !ctx.isGroupCommit()) {
               ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
           } 
   /*
           if (jobId != -1) {
               ctx.getEnv().getLoadManager().addLoadJob(insertLoadJob);
           }
   */
   ```
   
   Then I add some logs and find that
   ```java
    private void httpStreamPutImpl(TStreamLoadPutRequest request, 
TStreamLoadPutResult result){
           
          /...
           LOG.info("[DEBUG-request] 
groupCommitMode={}",request.getGroupCommitMode());
           ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();
           LOG.info("[DEBUG-session] 
groupCommitMode={}",ctx.getSessionVariable().groupCommit);
           ....
   }
   ```
   The groupCommitMode here is always null
   ```bash
   [DEBUG-request] groupCommitMode=null
   [DEBUG-session] groupCommitMode=null
   ```
   
   I'm not pretty clear about 
   
   ### What You Expected?
   
   1. There shouldn't be so many old and dead PENDING tasks in the LoadManager 
when groupCommit is on. They should be either cleared automatically or just 
skip the loadJob function.
   2. Inside **_httpStreamPutImpl_** , the groupCommitMode shouldn't be null
   
   ### How to Reproduce?
   
   Very easy to reproduce:
   1. Download and extract the apache-doris-4.0.3,  all use default 
configurations, Start fe and be
   2. Use HttpStream load api 
**http://FE/api/db/table/_stream_load&group_commit=async_mode**, 
   3. Use Arthas can see the steady increase of the LoadManager
   ```bash
   [arthas@2844843]$ ognl 
'@org.apache.doris.catalog.Env@getCurrentEnv().getLoadManager().idToLoadJob.values().{?
 #this.getState().name() == "PENDING"}.size()'
   ```
   
   
   ### Anything Else?
   
   The problem occur in both Cloud Mode and non-Cloud Mode
   The problem doesn't occur in 3.x.x
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to