This is an automated email from the ASF dual-hosted git repository.
xuyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f9cb405b1f7 [Bug](dead lock) Fix dead lock in Tablet Stat Mgr (#46959)
f9cb405b1f7 is described below
commit f9cb405b1f7f914d142108736bf85da6028983fe
Author: xy720 <[email protected]>
AuthorDate: Mon Jan 20 13:49:52 2025 +0800
[Bug](dead lock) Fix dead lock in Tablet Stat Mgr (#46959)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #28608
Problem Summary:
In TabletStatMgr, We use stream().parallel() or parallelStream() in a
ForkJoinTask,when they are called, the stream will allocate the
`ForEach` task to multiple threads. However, when the stream is within a
ForkJoinTask, it will attempt to steal threads from the ForkJoinPool.
When the number of threads in the ForkJoinPool is small, thread
competition is very likely to occur, ultimately leading to a deadlock.
This commit will abandon ForkJoinPool and use a regular thread pool
instead.
---
.../main/java/org/apache/doris/common/Config.java | 4 ++
.../org/apache/doris/catalog/TabletStatMgr.java | 58 +++++++++++++++++++---
.../java/org/apache/doris/metric/MetricRepo.java | 6 +++
3 files changed, 62 insertions(+), 6 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 8bda7f6e754..a162a0c4878 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -303,6 +303,10 @@ public class Config extends ConfigBase {
"Queue size to store heartbeat task in heartbeat_mgr"})
public static int heartbeat_mgr_blocking_queue_size = 1024;
+ @ConfField(masterOnly = true, description = {"TabletStatMgr线程数",
+ "Num of thread to update tablet stat"})
+ public static int tablet_stat_mgr_threads_num = -1;
+
@ConfField(masterOnly = true, description = {"Agent任务线程池的线程数",
"Num of thread to handle agent task in agent task thread-pool"})
public static int max_agent_task_threads_num = 4096;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
index f79ed89215b..14dc88eb509 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java
@@ -21,7 +21,11 @@ import
org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
@@ -34,7 +38,9 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/*
* TabletStatMgr is for collecting tablet(replica) statistics from backends.
@@ -43,7 +49,13 @@ import java.util.concurrent.ForkJoinPool;
public class TabletStatMgr extends MasterDaemon {
private static final Logger LOG =
LogManager.getLogger(TabletStatMgr.class);
- private ForkJoinPool taskPool = new
ForkJoinPool(Runtime.getRuntime().availableProcessors());
+ private final ExecutorService executor =
ThreadPoolManager.newDaemonFixedThreadPool(
+ Config.tablet_stat_mgr_threads_num > 0
+ ? Config.tablet_stat_mgr_threads_num
+ : Runtime.getRuntime().availableProcessors(),
+ 1024, "tablet-stat-mgr", true);
+
+ private MarkedCountDownLatch<Long, Backend> updateTabletStatsLatch = null;
public TabletStatMgr() {
super("tablet stat mgr", Config.tablet_stat_update_interval_second *
1000);
@@ -59,9 +71,13 @@ public class TabletStatMgr extends MasterDaemon {
return;
}
long start = System.currentTimeMillis();
- taskPool.submit(() -> {
- // no need to get tablet stat if backend is not alive
-
backends.values().stream().filter(Backend::isAlive).parallel().forEach(backend
-> {
+ // no need to get tablet stat if backend is not alive
+ List<Backend> aliveBackends =
backends.values().stream().filter(Backend::isAlive)
+ .collect(Collectors.toList());
+ updateTabletStatsLatch = new
MarkedCountDownLatch<>(aliveBackends.size());
+ aliveBackends.forEach(backend -> {
+ updateTabletStatsLatch.addMark(backend.getId(), backend);
+ executor.submit(() -> {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
@@ -74,8 +90,10 @@ public class TabletStatMgr extends MasterDaemon {
result.getTabletsStatsSize());
}
updateTabletStat(backend.getId(), result);
+ updateTabletStatsLatch.markedCountDown(backend.getId(),
backend);
ok = true;
} catch (Throwable e) {
+
updateTabletStatsLatch.markedCountDownWithStatus(backend.getId(), backend,
Status.CANCELLED);
LOG.warn("task exec error. backend[{}]", backend.getId(),
e);
}
@@ -89,7 +107,9 @@ public class TabletStatMgr extends MasterDaemon {
LOG.warn("client pool recyle error. backend[{}]",
backend.getId(), e);
}
});
- }).join();
+ });
+ waitForTabletStatUpdate();
+
if (LOG.isDebugEnabled()) {
LOG.debug("finished to get tablet stat of all backends. cost: {}
ms",
(System.currentTimeMillis() - start));
@@ -222,6 +242,32 @@ public class TabletStatMgr extends MasterDaemon {
(System.currentTimeMillis() - start));
}
+ public void waitForTabletStatUpdate() {
+ boolean ok = true;
+ try {
+ if (!updateTabletStatsLatch.await(600, TimeUnit.SECONDS)) {
+ LOG.info("timeout waiting {} update tablet stats tasks finish
after {} seconds.",
+ updateTabletStatsLatch.getCount(), 600);
+ ok = false;
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException, {}", this, e);
+ }
+ if (!ok || !updateTabletStatsLatch.getStatus().ok()) {
+ List<Long> unfinishedBackendIds =
updateTabletStatsLatch.getLeftMarks().stream()
+ .map(Map.Entry::getKey).collect(Collectors.toList());
+ Status status = Status.TIMEOUT;
+ if (!updateTabletStatsLatch.getStatus().ok()) {
+ status = updateTabletStatsLatch.getStatus();
+ }
+ LOG.warn("Failed to update tablet stats reason: {}, unfinished
backends: {}",
+ status.getErrorMsg(), unfinishedBackendIds);
+ if (MetricRepo.isInit) {
+ MetricRepo.COUNTER_UPDATE_TABLET_STAT_FAILED.increase(1L);
+ }
+ }
+ }
+
private void updateTabletStat(Long beId, TTabletStatResult result) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
if (result.isSetTabletStatList()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index 2f87515509a..9705e6a0542 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -101,6 +101,8 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_CACHE_HIT_SQL;
public static LongCounterMetric COUNTER_CACHE_HIT_PARTITION;
+ public static LongCounterMetric COUNTER_UPDATE_TABLET_STAT_FAILED;
+
public static LongCounterMetric COUNTER_EDIT_LOG_WRITE;
public static LongCounterMetric COUNTER_EDIT_LOG_READ;
public static LongCounterMetric COUNTER_EDIT_LOG_CURRENT;
@@ -495,6 +497,10 @@ public final class MetricRepo {
"counter of failed transactions");
COUNTER_TXN_FAILED.addLabel(new MetricLabel("type", "failed"));
DORIS_METRIC_REGISTER.addMetrics(COUNTER_TXN_FAILED);
+ COUNTER_UPDATE_TABLET_STAT_FAILED = new
LongCounterMetric("update_tablet_stat_failed", MetricUnit.REQUESTS,
+ "counter of failed to update tablet stat");
+ COUNTER_UPDATE_TABLET_STAT_FAILED.addLabel(new MetricLabel("type",
"failed"));
+ DORIS_METRIC_REGISTER.addMetrics(COUNTER_UPDATE_TABLET_STAT_FAILED);
HISTO_TXN_EXEC_LATENCY = METRIC_REGISTER.histogram(
MetricRegistry.name("txn", "exec", "latency", "ms"));
HISTO_TXN_PUBLISH_LATENCY = METRIC_REGISTER.histogram(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]