This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e72da105bf28cb7bc799f645f0230e022ed391fc
Author: xy720 <[email protected]>
AuthorDate: Mon May 27 18:17:50 2024 +0800

    [bug](Fe) fix potential deadlock in show proc statement (#34988)
---
 .../common/proc/DiagnoseClusterBalanceProcDir.java      | 17 +++++++++++------
 .../org/apache/doris/common/proc/StatisticProcNode.java | 17 +++++++++++------
 .../apache/doris/common/proc/TabletHealthProcDir.java   | 17 +++++++++++------
 3 files changed, 33 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java
index e234291ab77..c848b369369 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/DiagnoseClusterBalanceProcDir.java
@@ -44,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -52,6 +53,8 @@ import java.util.stream.Stream;
  */
 public class DiagnoseClusterBalanceProcDir extends SubProcDir {
 
+    private ForkJoinPool taskPool = new ForkJoinPool();
+
     @Override
     public List<DiagnoseItem> getDiagnoseResult() {
         long now = System.currentTimeMillis();
@@ -89,12 +92,14 @@ public class DiagnoseClusterBalanceProcDir extends 
SubProcDir {
         tabletHealth.status = DiagnoseStatus.OK;
 
         Env env = Env.getCurrentEnv();
-        List<DBTabletStatistic> statistics = 
env.getInternalCatalog().getDbIds().parallelStream()
-                // skip information_schema database
-                .flatMap(id -> Stream.of(id == 0 ? null : 
env.getInternalCatalog().getDbNullable(id)))
-                .filter(Objects::nonNull).map(DBTabletStatistic::new)
-                // sort by dbName
-                .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList());
+        List<DBTabletStatistic> statistics = taskPool.submit(() ->
+                env.getInternalCatalog().getDbIds().parallelStream()
+                    // skip information_schema database
+                    .flatMap(id -> Stream.of(id == 0 ? null : 
env.getInternalCatalog().getDbNullable(id)))
+                    .filter(Objects::nonNull).map(DBTabletStatistic::new)
+                    // sort by dbName
+                    .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList())
+        ).join();
 
         DBTabletStatistic total = statistics.stream().reduce(new 
DBTabletStatistic(), DBTabletStatistic::reduce);
         if (total.tabletNum != total.healthyNum) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcNode.java
index c240b7a6813..f12717fb570 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/StatisticProcNode.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -44,6 +45,8 @@ public class StatisticProcNode implements ProcNodeInterface {
             .build();
     private Env env;
 
+    private ForkJoinPool taskPool = new ForkJoinPool();
+
     public StatisticProcNode(Env env) {
         Preconditions.checkNotNull(env);
         this.env = env;
@@ -51,12 +54,14 @@ public class StatisticProcNode implements ProcNodeInterface 
{
 
     @Override
     public ProcResult fetchResult() throws AnalysisException {
-        List<DBStatistic> statistics = 
env.getInternalCatalog().getDbIds().parallelStream()
-                // skip information_schema database
-                .flatMap(id -> Stream.of(id == 0 ? null : 
env.getCatalogMgr().getDbNullable(id)))
-                .filter(Objects::nonNull).map(DBStatistic::new)
-                // sort by dbName
-                .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList());
+        List<DBStatistic> statistics = taskPool.submit(() ->
+                env.getInternalCatalog().getDbIds().parallelStream()
+                    // skip information_schema database
+                    .flatMap(id -> Stream.of(id == 0 ? null : 
env.getCatalogMgr().getDbNullable(id)))
+                    .filter(Objects::nonNull).map(DBStatistic::new)
+                    // sort by dbName
+                    .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList())
+        ).join();
 
         List<List<String>> rows = new ArrayList<>(statistics.size() + 1);
         for (DBStatistic statistic : statistics) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
index ce88a52082e..e28c74c327e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletHealthProcDir.java
@@ -47,6 +47,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -66,6 +67,8 @@ public class TabletHealthProcDir implements ProcDirInterface {
 
     private Env env;
 
+    private ForkJoinPool taskPool = new ForkJoinPool();
+
     public TabletHealthProcDir(Env env) {
         Preconditions.checkNotNull(env);
         this.env = env;
@@ -88,12 +91,14 @@ public class TabletHealthProcDir implements 
ProcDirInterface {
 
     @Override
     public ProcResult fetchResult() throws AnalysisException {
-        List<DBTabletStatistic> statistics = 
env.getInternalCatalog().getDbIds().parallelStream()
-                // skip information_schema database
-                .flatMap(id -> Stream.of(id == 0 ? null : 
env.getInternalCatalog().getDbNullable(id)))
-                .filter(Objects::nonNull).map(DBTabletStatistic::new)
-                // sort by dbName
-                .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList());
+        List<DBTabletStatistic> statistics = taskPool.submit(() ->
+                env.getInternalCatalog().getDbIds().parallelStream()
+                    // skip information_schema database
+                    .flatMap(id -> Stream.of(id == 0 ? null : 
env.getInternalCatalog().getDbNullable(id)))
+                    .filter(Objects::nonNull).map(DBTabletStatistic::new)
+                    // sort by dbName
+                    .sorted(Comparator.comparing(db -> 
db.db.getFullName())).collect(Collectors.toList())
+        ).join();
 
         List<List<String>> rows = new ArrayList<>(statistics.size() + 1);
         for (DBTabletStatistic statistic : statistics) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to