This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 22edd1c3684 [fix](export) The export task thread pool uses a blocking
queue strategy (#51609)
22edd1c3684 is described below
commit 22edd1c3684c8728f52daedf8f2ffc9e91290e45
Author: zy-kkk <[email protected]>
AuthorDate: Wed Jun 18 20:35:37 2025 +0800
[fix](export) The export task thread pool uses a blocking queue strategy
(#51609)
### What problem does this PR solve?
When export parallelism exceeds `maximum_parallelism_of_export_job`,
excess tasks are discarded by `LogDiscardPolicy`.
---
.../main/java/org/apache/doris/task/ExportExportingTask.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index d5a18f19d56..ecc0ca26af0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -56,9 +56,6 @@ public class ExportExportingTask extends MasterTask {
protected final ExportJob job;
- ThreadPoolExecutor exportExecPool =
ThreadPoolManager.newDaemonCacheThreadPool(
- Config.maximum_parallelism_of_export_job, "exporting-pool-",
false);
-
public ExportExportingTask(ExportJob job) {
this.job = job;
this.signature = job.getId();
@@ -115,6 +112,11 @@ public class ExportExportingTask extends MasterTask {
List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
int parallelNum = selectStmtList.size();
+ // Create thread pool with queue size based on actual parallelism
+ // Queue size = max(parallelNum, maximum_parallelism_of_export_job) to
ensure all tasks can be queued
+ int queueSize = Math.max(parallelNum,
Config.maximum_parallelism_of_export_job);
+ ThreadPoolExecutor exportExecPool =
ThreadPoolManager.newDaemonFixedThreadPool(
+ Config.maximum_parallelism_of_export_job, queueSize,
"exporting-pool-", false);
CompletionService<ExportResult> completionService = new
ExecutorCompletionService<>(exportExecPool);
// begin exporting
@@ -213,8 +215,10 @@ public class ExportExportingTask extends MasterTask {
job.getStmtExecutor(idx).cancel();
}
}
+ exportExecPool.shutdownNow();
+ } else {
+ exportExecPool.shutdown();
}
- exportExecPool.shutdownNow();
}
if (isFailed) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]