This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9219689ffb5 branch-3.0: [Enhancement](cloud) Add concurrency for
tablet stats processing #43903 (#44763)
9219689ffb5 is described below
commit 9219689ffb539e20b3f7830830fd9cc7c4315c7d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 24 18:01:20 2024 +0800
branch-3.0: [Enhancement](cloud) Add concurrency for tablet stats
processing #43903 (#44763)
Cherry-picked from #43903
Co-authored-by: abmdocrt <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 4 ++
.../apache/doris/catalog/CloudTabletStatMgr.java | 54 ++++++++++++++--------
2 files changed, 39 insertions(+), 19 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 3ad6986c417..01d981efdd9 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3202,6 +3202,10 @@ public class Config extends ConfigBase {
+ "model is set to 300 times, which is approximately 5 minutes by
default."})
public static int auto_start_wait_to_resume_times = 300;
+ @ConfField(description = {"Get tablet stat task的最大并发数。",
+ "Maximal concurrent num of get tablet stat job."})
+ public static int max_get_tablet_stat_task_threads_num = 4;
+
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
// ATTN: DONOT add any config not related to cloud mode here
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
index d92af142242..3babb0e001a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java
@@ -36,6 +36,10 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/*
* CloudTabletStatMgr is for collecting tablet(replica) statistics from
backends.
@@ -47,6 +51,9 @@ public class CloudTabletStatMgr extends MasterDaemon {
// <(dbId, tableId) -> OlapTable.Statistics>
private volatile Map<Pair<Long, Long>, OlapTable.Statistics>
cloudTableStatsMap = new HashMap<>();
+ private static final ExecutorService GET_TABLET_STATS_THREAD_POOL =
Executors.newFixedThreadPool(
+ Config.max_get_tablet_stat_task_threads_num);
+
public CloudTabletStatMgr() {
super("cloud tablet stat mgr",
Config.tablet_stat_update_interval_second * 1000);
}
@@ -103,28 +110,37 @@ public class CloudTabletStatMgr extends MasterDaemon {
reqList.add(builder.build());
}
+ List<Future<Void>> futures = new ArrayList<>();
for (GetTabletStatsRequest req : reqList) {
- GetTabletStatsResponse resp;
- try {
- resp = getTabletStats(req);
- } catch (RpcException e) {
- LOG.info("get tablet stats exception:", e);
- continue;
- }
-
- if (resp.getStatus().getCode() != MetaServiceCode.OK) {
- continue;
- }
-
- if (LOG.isDebugEnabled()) {
- int i = 0;
- for (TabletIndexPB idx : req.getTabletIdxList()) {
- LOG.debug("db_id: {} table_id: {} index_id: {} tablet_id:
{} size: {}",
- idx.getDbId(), idx.getTableId(), idx.getIndexId(),
idx.getTabletId(),
- resp.getTabletStats(i++).getDataSize());
+ futures.add(GET_TABLET_STATS_THREAD_POOL.submit(() -> {
+ GetTabletStatsResponse resp =
GetTabletStatsResponse.newBuilder().build();
+ try {
+ resp = getTabletStats(req);
+ } catch (RpcException e) {
+ LOG.warn("get tablet stats exception:", e);
+ }
+ if (resp.getStatus().getCode() != MetaServiceCode.OK) {
+ LOG.warn("get tablet stats return failed.");
+ }
+ if (LOG.isDebugEnabled()) {
+ int i = 0;
+ for (TabletIndexPB idx : req.getTabletIdxList()) {
+ LOG.debug("db_id: {} table_id: {} index_id: {}
tablet_id: {} size: {}",
+ idx.getDbId(), idx.getTableId(),
idx.getIndexId(),
+ idx.getTabletId(),
resp.getTabletStats(i++).getDataSize());
+ }
}
+ updateTabletStat(resp);
+ return null;
+ }));
+ }
+
+ try {
+ for (Future<Void> future : futures) {
+ future.get();
}
- updateTabletStat(resp);
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Error waiting for get tablet stats tasks to complete",
e);
}
LOG.info("finished to get tablet stat of all backends. cost: {} ms",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]