This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77ac531786e [fix](export) fix concurrent modification issue with
export job (#43051)
77ac531786e is described below
commit 77ac531786eceb270d394c468112d615f4a8143f
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Nov 1 18:07:16 2024 +0800
[fix](export) fix concurrent modification issue with export job (#43051)
### What problem does this PR solve?
Related PR: #42950
Problem Summary:
PR #42950 change some logic in ExportJob, by removing the
`taskIdToExecutor`, which is
a thread safe ConcurrentHashMap.
But there is a problem that, when cancelling a export job, it will clear
the `jobExecutorList` in ExportJob,
and meanwhile, this `jobExecutorList` may being traversed when creating
the export job,
causing concurrent modification exception.
This PR fix it by locking the writeLock of ExportMgr when cancelling the
export job.
---
.../src/main/java/org/apache/doris/load/ExportMgr.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
index 5636f1aaad3..80f738a4cdf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
@@ -67,8 +67,8 @@ public class ExportMgr {
// dbid -> <label -> job>
private Map<Long, Map<String, Long>> dbTolabelToExportJobId =
Maps.newHashMap();
- // lock for export job
- // lock is private and must use after db lock
+ // lock for protecting export jobs.
+ // need to be added when creating or cancelling export job.
private final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock(true);
public ExportMgr() {
@@ -141,6 +141,11 @@ public class ExportMgr {
// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME,
stmt.getDbName(), matchExportJobs);
+ // Must add lock to protect export job.
+ // Because job may be cancelled when generating task executors,
+ // the cancel process may clear the task executor list at same time,
+ // which will cause ConcurrentModificationException
+ writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL,
"user cancel");
@@ -149,6 +154,8 @@ public class ExportMgr {
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
+ } finally {
+ writeUnlock();
}
}
@@ -463,7 +470,7 @@ public class ExportMgr {
}
public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
- readLock();
+ writeLock();
try {
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
job.replayExportJobState(stateTransfer.getState());
@@ -472,7 +479,7 @@ public class ExportMgr {
job.setFailMsg(stateTransfer.getFailMsg());
job.setOutfileInfo(stateTransfer.getOutFileInfo());
} finally {
- readUnlock();
+ writeUnlock();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]