This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4fda6b051ec branch-3.1: [opt](agent-task) Add a daemon thread to clean 
up agent tasks on dead BEs #57591 (#58231)
4fda6b051ec is described below

commit 4fda6b051ecd0ad11ee00bc754884ff7095c7779
Author: Siyang Tang <[email protected]>
AuthorDate: Wed Dec 17 17:21:21 2025 +0800

    branch-3.1: [opt](agent-task) Add a daemon thread to clean up agent tasks 
on dead BEs #57591 (#58231)
    
    picked from #57591
---
 .../main/java/org/apache/doris/common/Config.java  |  6 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  7 ++
 .../apache/doris/task/AgentTaskCleanupDaemon.java  | 84 ++++++++++++++++++++++
 .../java/org/apache/doris/task/AgentTaskQueue.java | 13 ++++
 .../main/java/org/apache/doris/task/PushTask.java  |  2 +-
 .../test_sc_fail_when_be_down.groovy               | 64 +++++++++++++++++
 .../test_sc_success_when_be_down.groovy            | 56 +++++++++++++++
 7 files changed, 231 insertions(+), 1 deletion(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index eb7d9b8acc3..ff0aa27f0f0 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3505,4 +3505,10 @@ public class Config extends ConfigBase {
 
     @ConfField(mutable = true)
     public static String aws_credentials_provider_version = "v2";
+
+    @ConfField(description = {
+            "agent tasks 健康检查的时间间隔,默认五分钟,小于等于0时不做健康检查",
+            "agent tasks health check interval, default is five minutes, no 
health check when less than or equal to 0"
+    })
+    public static long agent_task_health_check_intervals_ms = 5 * 60 * 1000L; 
// 5 min
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 4357b34e790..e28a88c8a6e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -278,6 +278,7 @@ import org.apache.doris.system.HeartbeatMgr;
 import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.system.SystemInfoService.HostInfo;
 import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskCleanupDaemon;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.CleanTrashTask;
 import org.apache.doris.task.CleanUDFCacheTask;
@@ -594,6 +595,8 @@ public class Env {
 
     private KeyManagerInterface keyManager;
 
+    private AgentTaskCleanupDaemon agentTaskCleanupDaemon;
+
     // if a config is relative to a daemon thread. record the relation here. 
we will proactively change interval of it.
     private final Map<String, Supplier<MasterDaemon>> configtoThreads = 
ImmutableMap
             .of("dynamic_partition_check_interval_seconds", 
this::getDynamicPartitionScheduler);
@@ -852,6 +855,9 @@ public class Env {
         this.tokenManager = new TokenManager();
         this.keyManagerStore = new KeyManagerStore();
         this.keyManager = KeyManagerFactory.getKeyManager();
+        if (Config.agent_task_health_check_intervals_ms > 0) {
+            this.agentTaskCleanupDaemon = new AgentTaskCleanupDaemon();
+        }
     }
 
     public static Map<String, Long> getSessionReportTimeMap() {
@@ -1958,6 +1964,7 @@ public class Env {
         if (keyManager != null) {
             keyManager.init();
         }
+        agentTaskCleanupDaemon.start();
     }
 
     // start threads that should run on all FE
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
new file mode 100644
index 00000000000..505f5a3b64e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskCleanupDaemon.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class AgentTaskCleanupDaemon extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(AgentTaskCleanupDaemon.class);
+
+    public static final Integer MAX_FAILURE_TIMES = 3;
+
+    private final Map<Long, Integer> beInactiveCheckFailures = 
Maps.newHashMap();
+
+    public AgentTaskCleanupDaemon() {
+        super("agent-task-cleanup", 
Config.agent_task_health_check_intervals_ms);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        LOG.info("Begin to clean up inactive agent tasks");
+        SystemInfoService infoService = Env.getCurrentSystemInfo();
+        infoService.getAllClusterBackendsNoException()
+                .values()
+                .forEach(backend -> {
+                    long id = backend.getId();
+                    if (backend.isAlive()) {
+                        beInactiveCheckFailures.remove(id);
+                    } else {
+                        Integer failureTimes = 
beInactiveCheckFailures.compute(id, (beId, failures) -> {
+                            int updated = (failures == null ? 1 : failures + 
1);
+                            if (updated >= MAX_FAILURE_TIMES) {
+                                removeInactiveBeAgentTasks(beId);
+                            }
+                            return updated;
+                        });
+                        LOG.info("Check failure on be={}, times={}", 
failureTimes, failureTimes);
+                    }
+                });
+
+        LOG.info("Finish to clean up inactive agent tasks");
+    }
+
+    private void removeInactiveBeAgentTasks(Long beId) {
+        AgentTaskQueue.removeTask(beId, (agentTask -> {
+            String errMsg = "BE down, this agent task is aborted";
+            if (agentTask instanceof PushTask) {
+                PushTask task = ((PushTask) agentTask);
+                task.countDownLatchWithStatus(beId, agentTask.getTabletId(), 
new Status(TStatusCode.ABORTED, errMsg));
+            }
+            agentTask.setFinished(true);
+            agentTask.setErrorCode(TStatusCode.ABORTED);
+            agentTask.setErrorMsg(errMsg);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("BE down, remove agent task: {}", agentTask);
+            }
+        }));
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
index 97e1a3cc676..b61a3aa708d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTaskQueue.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 
 /**
  * Task queue
@@ -96,6 +97,18 @@ public class AgentTaskQueue {
         --taskNum;
     }
 
+    public static synchronized void removeTask(long backendId, 
Consumer<AgentTask> onTaskRemoved) {
+        Map<TTaskType, Map<Long, AgentTask>> tasks = 
AgentTaskQueue.tasks.row(backendId);
+        tasks.forEach((type, taskSet) -> {
+            Iterator<Map.Entry<Long, AgentTask>> it = 
taskSet.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<Long, AgentTask> entry = it.next();
+                it.remove();
+                onTaskRemoved.accept(entry.getValue());
+            }
+        });
+    }
+
     /*
      * we cannot define a push task with only 'backendId', 'signature' and 
'TTaskType'
      * add version and TPushType to help
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
index af98e5bd2b3..a5e3b0f4032 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushTask.java
@@ -61,7 +61,7 @@ public class PushTask extends AgentTask {
     private TPushType pushType;
     private List<Predicate> conditions;
     // for synchronous delete
-    private MarkedCountDownLatch latch;
+    private MarkedCountDownLatch<Long, Long> latch;
 
     // lzop decompress or not
     private boolean needDecompress;
diff --git 
a/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy 
b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
new file mode 100644
index 00000000000..411ffa893a1
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_sc_fail_when_be_down.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_fail_when_be_down", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.beNum = 3
+    options.feNum = 2
+    options.enableDebugPoints()
+    options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+    docker(options) {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        def tblName = "test_sc_fail_when_be_down"
+        sql """ DROP TABLE IF EXISTS ${tblName} """
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tblName} (
+                    `k` int NOT NULL,
+                    `v0` int NOT NULL,
+                    `v1` int NOT NULL
+                ) 
+                DUPLICATE KEY(`k`)
+                DISTRIBUTED BY HASH(`k`) BUCKETS 24
+                PROPERTIES (
+                    "replication_allocation" = "tag.location.default: 3"
+                )
+        """
+        sql """ INSERT INTO ${tblName} SELECT number, number, number from 
numbers("number" = "1024") """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+        try {
+            sql """ ALTER TABLE ${tblName} MODIFY COLUMN v1 VARCHAR(100) """
+            sleep(1000)
+            cluster.stopBackends(1, 2)
+            sleep(10000)
+            def ret = sql """ SHOW ALTER TABLE COLUMN WHERE 
TableName='test_sc_stuck_when_be_down' ORDER BY createtime DESC LIMIT 1 """
+            println(ret)
+            waitForSchemaChangeDone {
+                sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' 
ORDER BY createtime DESC LIMIT 1 """
+                time 600
+            }
+            assertTrue(false)
+        } catch (Throwable ignore) {
+            // do nothing
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy 
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
new file mode 100644
index 00000000000..e5eb2bd3028
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_sc_success_when_be_down.groovy
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_sc_success_when_be_down", "docker") {
+    def options = new ClusterOptions()
+    options.cloudMode = false
+    options.beNum = 3
+    options.feNum = 2
+    options.enableDebugPoints()
+    options.feConfigs += ["agent_task_health_check_intervals_ms=5000"]
+
+    docker(options) {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+
+        def tblName = "test_sc_success_when_be_down"
+        sql """ DROP TABLE IF EXISTS ${tblName} """
+        sql """
+                CREATE TABLE IF NOT EXISTS ${tblName} (
+                    `k` int NOT NULL,
+                    `v0` int NOT NULL,
+                    `v1` int NOT NULL
+                ) 
+                DUPLICATE KEY(`k`)
+                DISTRIBUTED BY HASH(`k`) BUCKETS 24
+                PROPERTIES (
+                    "replication_allocation" = "tag.location.default: 3"
+                )
+        """
+        sql """ INSERT INTO ${tblName} SELECT number, number, number from 
numbers("number" = "1024") """
+
+        
GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep")
+        sql """ ALTER TABLE ${tblName} MODIFY COLUMN v0 VARCHAR(100) """
+        sleep(3000)
+        cluster.stopBackends(1)
+        waitForSchemaChangeDone {
+        sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tblName}' ORDER BY 
createtime DESC LIMIT 1 """
+        time 600
+}
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to