This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cb64eac292b [fix](scheduler) Fix TaskDisruptor event loss due to wrong 
ProducerType (#60238)
cb64eac292b is described below

commit cb64eac292bd2ee5ed8ff73d1b93f4c28f63505b
Author: Xin Liao <[email protected]>
AuthorDate: Thu Jan 29 17:39:04 2026 +0800

    [fix](scheduler) Fix TaskDisruptor event loss due to wrong ProducerType 
(#60238)
    
    Change ProducerType from SINGLE to MULTI in TaskDisruptor to fix
    concurrent event publishing issue. Multiple thrift-server-pool threads
    can submit export tasks simultaneously, but SINGLE producer mode is not
    thread-safe and causes events to be lost when concurrent publishing
    occurs.
---
 .../main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 8144ca22ea2..9be30124c9e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -75,7 +75,7 @@ public class TaskDisruptor implements Closeable {
     public void start() {
         CustomThreadFactory exportTaskThreadFactory = new 
CustomThreadFactory("export-task-consumer");
         disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
-                ProducerType.SINGLE, new LiteTimeoutBlockingWaitStrategy(10, 
TimeUnit.MILLISECONDS));
+                ProducerType.MULTI, new LiteTimeoutBlockingWaitStrategy(10, 
TimeUnit.MILLISECONDS));
         WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
         for (int i = 0; i < consumerThreadCount; i++) {
             workers[i] = new TaskHandler();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to