This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 22edd1c3684 [fix](export) The export task thread pool uses a blocking 
queue strategy (#51609)
22edd1c3684 is described below

commit 22edd1c3684c8728f52daedf8f2ffc9e91290e45
Author: zy-kkk <[email protected]>
AuthorDate: Wed Jun 18 20:35:37 2025 +0800

    [fix](export) The export task thread pool uses a blocking queue strategy 
(#51609)
    
    ### What problem does this PR solve?
    
    When export parallelism exceeds `maximum_parallelism_of_export_job`,
    excess tasks are discarded by `LogDiscardPolicy`.
---
 .../main/java/org/apache/doris/task/ExportExportingTask.java | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
index d5a18f19d56..ecc0ca26af0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java
@@ -56,9 +56,6 @@ public class ExportExportingTask extends MasterTask {
 
     protected final ExportJob job;
 
-    ThreadPoolExecutor exportExecPool = 
ThreadPoolManager.newDaemonCacheThreadPool(
-            Config.maximum_parallelism_of_export_job, "exporting-pool-", 
false);
-
     public ExportExportingTask(ExportJob job) {
         this.job = job;
         this.signature = job.getId();
@@ -115,6 +112,11 @@ public class ExportExportingTask extends MasterTask {
         List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
 
         int parallelNum = selectStmtList.size();
+        // Create thread pool with queue size based on actual parallelism
+        // Queue size = max(parallelNum, maximum_parallelism_of_export_job) to 
ensure all tasks can be queued
+        int queueSize = Math.max(parallelNum, 
Config.maximum_parallelism_of_export_job);
+        ThreadPoolExecutor exportExecPool = 
ThreadPoolManager.newDaemonFixedThreadPool(
+                Config.maximum_parallelism_of_export_job, queueSize, 
"exporting-pool-", false);
         CompletionService<ExportResult> completionService = new 
ExecutorCompletionService<>(exportExecPool);
 
         // begin exporting
@@ -213,8 +215,10 @@ public class ExportExportingTask extends MasterTask {
                         job.getStmtExecutor(idx).cancel();
                     }
                 }
+                exportExecPool.shutdownNow();
+            } else {
+                exportExecPool.shutdown();
             }
-            exportExecPool.shutdownNow();
         }
 
         if (isFailed) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to