This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 06a6d0c5e98 branch-2.1: [fix](agent) cancel agent task when it is 
rejected by agent-task-pool #51138 (#51212)
06a6d0c5e98 is described below

commit 06a6d0c5e98eaebf10d898759e3e4b49098c6080
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 28 17:24:15 2025 +0800

    branch-2.1: [fix](agent) cancel agent task when it is rejected by 
agent-task-pool #51138 (#51212)
    
    Cherry-picked from #51138
    
    Co-authored-by: walter <[email protected]>
---
 .../java/org/apache/doris/master/ReportHandler.java   | 10 +---------
 .../main/java/org/apache/doris/task/AgentTask.java    | 11 +++++++++++
 .../java/org/apache/doris/task/AgentTaskExecutor.java | 19 ++++++++++++++++---
 .../java/org/apache/doris/task/AgentTaskQueue.java    | 13 +++++++++++++
 4 files changed, 41 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index e11038a20a5..cb5cc6e8d33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -606,14 +606,7 @@ public class ReportHandler extends Daemon {
         AgentBatchTask batchTask = new 
AgentBatchTask(Config.report_resend_batch_task_num_per_rpc);
         long taskReportTime = System.currentTimeMillis();
         for (AgentTask task : diffTasks) {
-            // these tasks no need to do diff
-            // 1. CREATE
-            // 2. SYNC DELETE
-            // 3. CHECK_CONSISTENCY
-            // 4. STORAGE_MDEIUM_MIGRATE
-            if (task.getTaskType() == TTaskType.CREATE
-                    || task.getTaskType() == TTaskType.CHECK_CONSISTENCY
-                    || task.getTaskType() == TTaskType.STORAGE_MEDIUM_MIGRATE) 
{
+            if (!task.isNeedResendType()) {
                 continue;
             }
 
@@ -621,7 +614,6 @@ public class ReportHandler extends Daemon {
             if (task.shouldResend(taskReportTime)) {
                 batchTask.addTask(task);
             }
-
         }
 
         if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index 1294b408754..e22ae1df4a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -144,6 +144,17 @@ public abstract class AgentTask {
         return isFinished;
     }
 
+    public boolean isNeedResendType() {
+        // these tasks no need to do diff
+        // 1. CREATE
+        // 2. SYNC DELETE
+        // 3. CHECK_CONSISTENCY
+        // 4. STORAGE_MEDIUM_MIGRATE
+        return !(taskType == TTaskType.CREATE
+                || taskType == TTaskType.CHECK_CONSISTENCY
+                || taskType == TTaskType.STORAGE_MEDIUM_MIGRATE);
+    }
+
     public boolean shouldResend(long currentTimeMillis) {
         return createTime == -1 || currentTimeMillis - createTime > 
Config.agent_task_resend_wait_time_ms;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
index 8297ef2fff8..c8f5c977848 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskExecutor.java
@@ -21,21 +21,34 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.ThreadPoolManager;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
 
 public class AgentTaskExecutor {
 
-    private static final ExecutorService EXECUTOR = 
ThreadPoolManager.newDaemonCacheThreadPool(
+    private static final ExecutorService EXECUTOR = 
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
             Config.max_agent_task_threads_num, "agent-task-pool", true);
 
     public AgentTaskExecutor() {
-
     }
 
     public static void submit(AgentBatchTask task) {
         if (task == null) {
             return;
         }
-        EXECUTOR.submit(task);
+        try {
+            EXECUTOR.submit(task);
+        } catch (RejectedExecutionException e) {
+            String msg = "Task is rejected, because the agent-task-pool is 
full, "
+                    + "consider increasing the max_agent_task_threads_num 
config";
+            for (AgentTask t : task.getAllTasks()) {
+                // Skip the task if it is a resend type and already exists in 
the queue, because it will be
+                // re-submit to the executor later.
+                if (t.isNeedResendType() && AgentTaskQueue.contains(t)) {
+                    continue;
+                }
+                t.failedWithMsg(msg);
+            }
+        }
     }
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index bd68d87f191..97e1a3cc676 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -132,6 +132,19 @@ public class AgentTaskQueue {
         }
     }
 
+    public static synchronized boolean contains(AgentTask task) {
+        long backendId = task.getBackendId();
+        TTaskType type = task.getTaskType();
+        long signature = task.getSignature();
+
+        if (!tasks.contains(backendId, type)) {
+            return false;
+        }
+
+        Map<Long, AgentTask> signatureMap = tasks.get(backendId, type);
+        return signatureMap.containsKey(signature);
+    }
+
     public static synchronized AgentTask getTask(long backendId, TTaskType 
type, long signature) {
         if (!tasks.contains(backendId, type)) {
             return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to