This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7e290c7d4f4 branch-3.1: [fix](cloud) Fix the issue where it takes a 
long time to come alive on first boot #58152 (#58195)
7e290c7d4f4 is described below

commit 7e290c7d4f42ce4c785abc7b16ccfbf4345abcb4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 25 15:21:33 2025 +0800

    branch-3.1: [fix](cloud) Fix the issue where it takes a long time to come 
alive on first boot #58152 (#58195)
    
    Cherry-picked from #58152
    
    Co-authored-by: deardeng <[email protected]>
---
 .../transaction/CloudGlobalTransactionMgr.java     |  8 ++++
 .../java/org/apache/doris/system/HeartbeatMgr.java | 38 +++++++++++++---
 .../test_cloud_add_backend_heartbeat.groovy        | 52 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 6 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c3a8bcc450c..98ea8b211d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1973,6 +1973,14 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             response = MetaServiceProxy
                 .getInstance().abortTxnWithCoordinator(request);
             LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
+            if 
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
+                LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow 
enabled, sleep 15s");
+                try {
+                    Thread.sleep(15 * 1000);
+                } catch (InterruptedException ie) {
+                    LOG.info("error ", ie);
+                }
+            }
         } catch (RpcException e) {
             LOG.warn("Abort txn on coordinate BE {} failed, msg={}", 
coordinateHost, e.getMessage());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 8b0e351f306..077db66712a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -78,6 +78,7 @@ public class HeartbeatMgr extends MasterDaemon {
     private final ExecutorService executor;
     private SystemInfoService nodeMgr;
     private HeartbeatFlags heartbeatFlags;
+    private final ExecutorService abortTxnExecutor;
 
     private static volatile AtomicReference<TMasterInfo> masterInfo = new 
AtomicReference<>();
 
@@ -86,6 +87,8 @@ public class HeartbeatMgr extends MasterDaemon {
         this.nodeMgr = nodeMgr;
         this.executor = 
ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
                 Config.heartbeat_mgr_blocking_queue_size, 
"heartbeat-mgr-pool", needRegisterMetric);
+        this.abortTxnExecutor = ThreadPoolManager.newDaemonFixedThreadPool(1,
+                Config.heartbeat_mgr_blocking_queue_size, 
"abort-txn-executor", needRegisterMetric);
         this.heartbeatFlags = new HeartbeatFlags();
     }
 
@@ -192,18 +195,21 @@ public class HeartbeatMgr extends MasterDaemon {
                     boolean isChanged = be.handleHbResponse(hbResponse, 
isReplay);
                     if (hbResponse.getStatus() == HbStatus.OK) {
                         long newStartTime = be.getLastStartTime();
+                        // oldStartTime > 0 means it is not the first heartbeat
                         if (!isReplay && 
Config.enable_abort_txn_by_checking_coordinator_be
-                                && oldStartTime != newStartTime) {
-                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
-                                    be.getId(), be.getHost(), newStartTime);
+                                && oldStartTime != newStartTime && 
oldStartTime > 0) {
+                            submitAbortTxnTaskByExecutor(() -> 
Env.getCurrentGlobalTransactionMgr()
+                                    
.abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime),
+                                    "restart");
                         }
                     } else {
                         // invalid all connections cached in ClientPool
                         ClientPool.backendPool.clearPool(new 
TNetworkAddress(be.getHost(), be.getBePort()));
                         if (!isReplay && System.currentTimeMillis() - 
be.getLastUpdateMs()
-                                >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
-                            
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
-                                    be.getId(), be.getHost(), 100);
+                                >= 
Config.abort_txn_after_lost_heartbeat_time_second * 1000L
+                                && be.getLastUpdateMs() > 0) {
+                            submitAbortTxnTaskByExecutor(() -> 
Env.getCurrentGlobalTransactionMgr()
+                                    .abortTxnWhenCoordinateBeDown(be.getId(), 
be.getHost(), 100), "down");
                         }
                     }
                     return isChanged;
@@ -230,6 +236,26 @@ public class HeartbeatMgr extends MasterDaemon {
         return false;
     }
 
+    private void submitAbortTxnTaskByExecutor(Runnable task, String reason) {
+        long start = System.currentTimeMillis();
+        try {
+            abortTxnExecutor.submit(() -> {
+                LOG.info("start abort txn task, reason={}, start_ts={}", 
reason, start);
+                try {
+                    task.run();
+                    long duration = System.currentTimeMillis() - start;
+                    LOG.info("finish abort txn task, reason={}, start_ts={}, 
cost_ms={}", reason, start, duration);
+                } catch (Exception e) {
+                    long duration = System.currentTimeMillis() - start;
+                    LOG.warn("abort txn task({}) failed, start_ts={}, 
cost_ms={}", reason, start, duration, e);
+                }
+            });
+        } catch (Exception e) {
+            long duration = System.currentTimeMillis() - start;
+            LOG.warn("failed to submit abort txn task({}), start_ts={}, 
cost_ms={}", reason, start, duration, e);
+        }
+    }
+
     // backend heartbeat
     private class BackendHeartbeatHandler implements 
Callable<HeartbeatResponse> {
         private Backend backend;
diff --git 
a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
 
b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
new file mode 100644
index 00000000000..bd1e5b9d0b5
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonSlurper
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_add_backend_heartbeat", 'p0, docker') {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.enableDebugPoints()
+    options.cloudMode = true
+
+    docker(options) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host, 
ms.httpPort, msHttpPort)
+
+        
GetDebugPoint().enableDebugPointForAllFEs("FE.abortTxnWhenCoordinateBeRestart.slow")
+
+        cluster.addBackend(10, "new_cluster")
+
+        sql """admin set frontend 
config("cloud_tablet_rebalancer_interval_second"="3");"""
+
+        cluster.restartBackends();
+
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to