This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 7e290c7d4f4 branch-3.1: [fix](cloud) Fix the issue where it takes a
long time to come alive on first boot #58152 (#58195)
7e290c7d4f4 is described below
commit 7e290c7d4f42ce4c785abc7b16ccfbf4345abcb4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 25 15:21:33 2025 +0800
branch-3.1: [fix](cloud) Fix the issue where it takes a long time to come
alive on first boot #58152 (#58195)
Cherry-picked from #58152
Co-authored-by: deardeng <[email protected]>
---
.../transaction/CloudGlobalTransactionMgr.java | 8 ++++
.../java/org/apache/doris/system/HeartbeatMgr.java | 38 +++++++++++++---
.../test_cloud_add_backend_heartbeat.groovy | 52 ++++++++++++++++++++++
3 files changed, 92 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c3a8bcc450c..98ea8b211d8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -1973,6 +1973,14 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
response = MetaServiceProxy
.getInstance().abortTxnWithCoordinator(request);
LOG.info("AbortTxnWithCoordinatorResponse: {}", response);
+ if
(DebugPointUtil.isEnable("FE.abortTxnWhenCoordinateBeRestart.slow")) {
+ LOG.info("debug point FE.abortTxnWhenCoordinateBeRestart.slow
enabled, sleep 15s");
+ try {
+ Thread.sleep(15 * 1000);
+ } catch (InterruptedException ie) {
+ LOG.info("error ", ie);
+ }
+ }
} catch (RpcException e) {
LOG.warn("Abort txn on coordinate BE {} failed, msg={}",
coordinateHost, e.getMessage());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 8b0e351f306..077db66712a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -78,6 +78,7 @@ public class HeartbeatMgr extends MasterDaemon {
private final ExecutorService executor;
private SystemInfoService nodeMgr;
private HeartbeatFlags heartbeatFlags;
+ private final ExecutorService abortTxnExecutor;
private static volatile AtomicReference<TMasterInfo> masterInfo = new
AtomicReference<>();
@@ -86,6 +87,8 @@ public class HeartbeatMgr extends MasterDaemon {
this.nodeMgr = nodeMgr;
this.executor =
ThreadPoolManager.newDaemonFixedThreadPool(Config.heartbeat_mgr_threads_num,
Config.heartbeat_mgr_blocking_queue_size,
"heartbeat-mgr-pool", needRegisterMetric);
+ this.abortTxnExecutor = ThreadPoolManager.newDaemonFixedThreadPool(1,
+ Config.heartbeat_mgr_blocking_queue_size,
"abort-txn-executor", needRegisterMetric);
this.heartbeatFlags = new HeartbeatFlags();
}
@@ -192,18 +195,21 @@ public class HeartbeatMgr extends MasterDaemon {
boolean isChanged = be.handleHbResponse(hbResponse,
isReplay);
if (hbResponse.getStatus() == HbStatus.OK) {
long newStartTime = be.getLastStartTime();
+ // oldStartTime > 0 means it is not the first heartbeat
if (!isReplay &&
Config.enable_abort_txn_by_checking_coordinator_be
- && oldStartTime != newStartTime) {
-
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeRestart(
- be.getId(), be.getHost(), newStartTime);
+ && oldStartTime != newStartTime &&
oldStartTime > 0) {
+ submitAbortTxnTaskByExecutor(() ->
Env.getCurrentGlobalTransactionMgr()
+
.abortTxnWhenCoordinateBeRestart(be.getId(), be.getHost(), newStartTime),
+ "restart");
}
} else {
// invalid all connections cached in ClientPool
ClientPool.backendPool.clearPool(new
TNetworkAddress(be.getHost(), be.getBePort()));
if (!isReplay && System.currentTimeMillis() -
be.getLastUpdateMs()
- >=
Config.abort_txn_after_lost_heartbeat_time_second * 1000L) {
-
Env.getCurrentGlobalTransactionMgr().abortTxnWhenCoordinateBeDown(
- be.getId(), be.getHost(), 100);
+ >=
Config.abort_txn_after_lost_heartbeat_time_second * 1000L
+ && be.getLastUpdateMs() > 0) {
+ submitAbortTxnTaskByExecutor(() ->
Env.getCurrentGlobalTransactionMgr()
+ .abortTxnWhenCoordinateBeDown(be.getId(),
be.getHost(), 100), "down");
}
}
return isChanged;
@@ -230,6 +236,26 @@ public class HeartbeatMgr extends MasterDaemon {
return false;
}
+ private void submitAbortTxnTaskByExecutor(Runnable task, String reason) {
+ long start = System.currentTimeMillis();
+ try {
+ abortTxnExecutor.submit(() -> {
+ LOG.info("start abort txn task, reason={}, start_ts={}",
reason, start);
+ try {
+ task.run();
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("finish abort txn task, reason={}, start_ts={},
cost_ms={}", reason, start, duration);
+ } catch (Exception e) {
+ long duration = System.currentTimeMillis() - start;
+ LOG.warn("abort txn task({}) failed, start_ts={},
cost_ms={}", reason, start, duration, e);
+ }
+ });
+ } catch (Exception e) {
+ long duration = System.currentTimeMillis() - start;
+ LOG.warn("failed to submit abort txn task({}), start_ts={},
cost_ms={}", reason, start, duration, e);
+ }
+ }
+
// backend heartbeat
private class BackendHeartbeatHandler implements
Callable<HeartbeatResponse> {
private Backend backend;
diff --git
a/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
new file mode 100644
index 00000000000..bd1e5b9d0b5
--- /dev/null
+++
b/regression-test/suites/cloud_p0/node_mgr/test_cloud_add_backend_heartbeat.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+import groovy.json.JsonSlurper
+import groovy.json.JsonOutput
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_cloud_add_backend_heartbeat", 'p0, docker') {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.cloudMode = true
+
+ docker(options) {
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ logger.info("ms1 addr={}, port={}, ms endpoint={}", ms.host,
ms.httpPort, msHttpPort)
+
+
GetDebugPoint().enableDebugPointForAllFEs("FE.abortTxnWhenCoordinateBeRestart.slow")
+
+ cluster.addBackend(10, "new_cluster")
+
+ sql """admin set frontend
config("cloud_tablet_rebalancer_interval_second"="3");"""
+
+ cluster.restartBackends();
+
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]