This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 77ac531786e [fix](export) fix concurrent modification issue with 
export job (#43051)
77ac531786e is described below

commit 77ac531786eceb270d394c468112d615f4a8143f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Nov 1 18:07:16 2024 +0800

    [fix](export) fix concurrent modification issue with export job (#43051)
    
    ### What problem does this PR solve?
    
    Related PR: #42950
    
    Problem Summary:
    
    PR #42950 change some logic in ExportJob, by removing the
    `taskIdToExecutor`, which is
    a thread safe ConcurrentHashMap.
    But there is a problem that, when cancelling a export job, it will clear
    the `jobExecutorList` in ExportJob,
    and meanwhile, this `jobExecutorList` may being traversed when creating
    the export job,
    causing concurrent modification exception.
    
    This PR fix it by locking the writeLock of ExportMgr when cancelling the
    export job.
---
 .../src/main/java/org/apache/doris/load/ExportMgr.java    | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 5636f1aaad3..80f738a4cdf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -67,8 +67,8 @@ public class ExportMgr {
     // dbid -> <label -> job>
     private Map<Long, Map<String, Long>> dbTolabelToExportJobId = 
Maps.newHashMap();
 
-    // lock for export job
-    // lock is private and must use after db lock
+    // lock for protecting export jobs.
+    // need to be added when creating or cancelling export job.
     private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
 
     public ExportMgr() {
@@ -141,6 +141,11 @@ public class ExportMgr {
 
         // check auth
         checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, 
stmt.getDbName(), matchExportJobs);
+        // Must add lock to protect export job.
+        // Because job may be cancelled when generating task executors,
+        // the cancel process may clear the task executor list at same time,
+        // which will cause ConcurrentModificationException
+        writeLock();
         try {
             for (ExportJob exportJob : matchExportJobs) {
                 // exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, 
"user cancel");
@@ -149,6 +154,8 @@ public class ExportMgr {
             }
         } catch (JobException e) {
             throw new AnalysisException(e.getMessage());
+        } finally {
+            writeUnlock();
         }
     }
 
@@ -463,7 +470,7 @@ public class ExportMgr {
     }
 
     public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
-        readLock();
+        writeLock();
         try {
             ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
             job.replayExportJobState(stateTransfer.getState());
@@ -472,7 +479,7 @@ public class ExportMgr {
             job.setFailMsg(stateTransfer.getFailMsg());
             job.setOutfileInfo(stateTransfer.getOutFileInfo());
         } finally {
-            readUnlock();
+            writeUnlock();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to