This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0eb92562744d23d7caacca2dabd0d863dced6af7
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Mar 25 07:29:38 2024 +0800

    [Fix](TransientTask)Export tasks should only be run on the master node 
(#32700)
    
    * [Fix](TransientTask)Export tasks should only be run on the master node
    Add thread name
    
    Export Task runs only on the master node, so it is necessary to explicitly 
start the corresponding resources. At the same time, refactor some code to 
avoid circular dependencies.
    
    * TransientTaskManager is initialized twice. Therefore, the second 
initialization needs to be deleted.
---
 .../src/main/java/org/apache/doris/catalog/Env.java |  3 ++-
 .../doris/catalog/InternalSchemaInitializer.java    |  4 ++++
 .../java/org/apache/doris/master/ReportHandler.java |  1 +
 .../resource/workloadgroup/WorkloadGroupMgr.java    | 17 ++++++++---------
 .../doris/scheduler/disruptor/TaskDisruptor.java    | 21 +++++++--------------
 .../doris/scheduler/disruptor/TaskHandler.java      | 12 ++++--------
 .../scheduler/manager/TransientTaskManager.java     |  5 ++++-
 7 files changed, 30 insertions(+), 33 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index f6fd621e4eb..bc48e61ac4e 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -659,7 +659,6 @@ public class Env {
         this.labelProcessor = new LabelProcessor();
         this.transientTaskManager = new TransientTaskManager();
         this.exportTaskRegister = new ExportTaskRegister(transientTaskManager);
-        this.transientTaskManager = new TransientTaskManager();
 
         this.replayedJournalId = new AtomicLong(0L);
         this.stmtIdCounter = new AtomicLong(0L);
@@ -1626,6 +1625,8 @@ public class Env {
         // Start txn cleaner
         txnCleaner.start();
         jobManager.start();
+        // transient task manager
+        transientTaskManager.start();
         // Alter
         getAlterInstance().start();
         // Consistency checker
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
index 3e3ab3f4ce8..93617ac0f5e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java
@@ -57,6 +57,10 @@ public class InternalSchemaInitializer extends Thread {
 
     private static final Logger LOG = 
LogManager.getLogger(InternalSchemaInitializer.class);
 
+    public InternalSchemaInitializer() {
+        super("InternalSchemaInitializer");
+    }
+
     public void run() {
         if (!FeConstants.enableInternalSchemaDb) {
             return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 6b24bea1ed5..207efa2dfd7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -121,6 +121,7 @@ public class ReportHandler extends Daemon {
     }
 
     public ReportHandler() {
+        super("report-thread");
         GaugeMetric<Long> gauge = new GaugeMetric<Long>(
                 "report_queue_size", MetricUnit.NOUNIT, "report queue size") {
             @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 967efd26e65..08de0ce338a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -88,15 +88,14 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
 
     public void startUpdateThread() {
         WorkloadGroupMgr wgMgr = this;
-        updatePropThread = new Thread(new Runnable() {
-            public void run() {
-                while (true) {
-                    try {
-                        wgMgr.resetQueryQueueProp();
-                        Thread.sleep(Config.query_queue_update_interval_ms);
-                    } catch (Throwable e) {
-                        LOG.warn("reset query queue failed ", e);
-                    }
+        updatePropThread = new Thread(() -> {
+            Thread.currentThread().setName("reset-query-queue-prop");
+            while (true) {
+                try {
+                    wgMgr.resetQueryQueueProp();
+                    Thread.sleep(Config.query_queue_update_interval_ms);
+                } catch (Throwable e) {
+                    LOG.warn("reset query queue failed ", e);
                 }
             }
         });
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index dc065360d09..57df84a0e89 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -18,8 +18,8 @@
 package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.scheduler.constants.TaskType;
-import org.apache.doris.scheduler.manager.TransientTaskManager;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventTranslatorThreeArg;
@@ -27,11 +27,9 @@ import com.lmax.disruptor.TimeoutException;
 import com.lmax.disruptor.WorkHandler;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.util.DaemonThreadFactory;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 import java.io.Closeable;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -43,11 +41,10 @@ import java.util.concurrent.TimeUnit;
  *
  * <p>The work handler also handles system events by scheduling batch 
scheduler tasks.
  */
-@Slf4j
+@Log4j2
 public class TaskDisruptor implements Closeable {
 
     private  Disruptor<TaskEvent> disruptor;
-    private TransientTaskManager transientTaskManager;
     private static final int DEFAULT_RING_BUFFER_SIZE = 
Config.async_task_queen_size;
 
     private static final int consumerThreadCount = 
Config.async_task_consumer_thread_num;
@@ -74,17 +71,13 @@ public class TaskDisruptor implements Closeable {
                 event.setTaskType(taskType);
             };
 
-    public TaskDisruptor(TransientTaskManager transientTaskManager) {
-        this.transientTaskManager = transientTaskManager;
-    }
-
     public void start() {
-        ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
-        disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
-                ProducerType.MULTI, new BlockingWaitStrategy());
+        CustomThreadFactory exportTaskThreadFactory = new 
CustomThreadFactory("export-task-consumer");
+        disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, exportTaskThreadFactory,
+                ProducerType.SINGLE, new BlockingWaitStrategy());
         WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
         for (int i = 0; i < consumerThreadCount; i++) {
-            workers[i] = new TaskHandler(transientTaskManager);
+            workers[i] = new TaskHandler();
         }
         disruptor.handleEventsWithWorkerPool(workers);
         disruptor.start();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 6a0b9f92c5f..de889c1b2e4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -17,12 +17,13 @@
 
 package org.apache.doris.scheduler.disruptor;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
 
 import com.lmax.disruptor.WorkHandler;
-import lombok.extern.slf4j.Slf4j;
+import lombok.extern.log4j.Log4j2;
 
 /**
  * This class represents a work handler for processing event tasks consumed by 
a Disruptor.
@@ -31,16 +32,10 @@ import lombok.extern.slf4j.Slf4j;
  * If the event job execution fails, the work handler logs an error message 
and pauses the event job.
  * The work handler also handles system events by scheduling batch scheduler 
tasks.
  */
-@Slf4j
+@Log4j2
 public class TaskHandler implements WorkHandler<TaskEvent> {
 
 
-    private TransientTaskManager transientTaskManager;
-
-    public TaskHandler(TransientTaskManager transientTaskManager) {
-        this.transientTaskManager = transientTaskManager;
-    }
-
     /**
      * Processes an event task by retrieving the associated event job and 
executing it if it is running.
      * If the event job is not running, it logs an error message.
@@ -62,6 +57,7 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
 
     public void onTransientTaskHandle(TaskEvent taskEvent) {
         Long taskId = taskEvent.getId();
+        TransientTaskManager transientTaskManager = 
Env.getCurrentEnv().getTransientTaskManager();
         TransientTaskExecutor taskExecutor = 
transientTaskManager.getMemoryTaskExecutor(taskId);
         if (taskExecutor == null) {
             log.info("Memory task executor is null, task id: {}", taskId);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
index 5f94fb5d998..51edd4af318 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TransientTaskManager.java
@@ -42,7 +42,10 @@ public class TransientTaskManager {
     private TaskDisruptor disruptor;
 
     public TransientTaskManager() {
-        disruptor = new TaskDisruptor(this);
+        disruptor = new TaskDisruptor();
+    }
+
+    public void start() {
         disruptor.start();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to