This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 24e80b23a500334df16b4d699509a40cb7156752
Author: wangbo <[email protected]>
AuthorDate: Thu Feb 8 15:51:47 2024 +0800

     [Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907)
---
 .../apache/doris/analysis/ShowProcesslistStmt.java | 24 +++++++++-
 .../doris/httpv2/controller/SessionController.java |  3 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  8 ++--
 .../java/org/apache/doris/qe/ConnectScheduler.java | 11 +++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 12 +++++
 .../java/org/apache/doris/qe/ShowExecutor.java     | 51 ++++++++++++++++++++--
 .../apache/doris/service/FrontendServiceImpl.java  | 16 +++++++
 gensrc/thrift/FrontendService.thrift               | 10 +++++
 8 files changed, 127 insertions(+), 8 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
index 3b6c67b1bba..96e8a082249 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.ShowResultSetMetaData;
 
 // SHOW PROCESSLIST statement.
@@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt {
             .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
             .addColumn(new Column("Info", ScalarType.STRING)).build();
 
+    private static final ShowResultSetMetaData ALL_META_DATA = 
ShowResultSetMetaData.builder()
+            .addColumn(new Column("CurrentConnected", 
ScalarType.createVarchar(16)))
+            .addColumn(new Column("Id", 
ScalarType.createType(PrimitiveType.BIGINT)))
+            .addColumn(new Column("User", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Host", ScalarType.createVarchar(16)))
+            .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Db", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Command", ScalarType.createVarchar(16)))
+            .addColumn(new Column("Time", 
ScalarType.createType(PrimitiveType.INT)))
+            .addColumn(new Column("State", ScalarType.createVarchar(64)))
+            .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
+            .addColumn(new Column("Info", ScalarType.STRING))
+            .addColumn(new Column("FE", ScalarType.createVarchar(16))).build();
+
     private boolean isFull;
+    private boolean isShowAllFe;
 
     public ShowProcesslistStmt(boolean isFull) {
         this.isFull = isFull;
@@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt {
 
     @Override
     public void analyze(Analyzer analyzer) {
+        this.isShowAllFe = 
ConnectContext.get().getSessionVariable().getShowAllFeConnection();
+    }
+
+    public boolean isShowAllFe() {
+        return isShowAllFe;
     }
 
     @Override
@@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt {
 
     @Override
     public ShowResultSetMetaData getMetaData() {
-        return META_DATA;
+        return isShowAllFe ? ALL_META_DATA : META_DATA;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 27a28dbb4f4..440e9b40433 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -53,7 +53,7 @@ public class SessionController extends RestBaseController {
 
     private static final List<String> SESSION_TABLE_HEADER = 
Lists.newArrayList();
 
-    private static final List<String> ALL_SESSION_TABLE_HEADER = 
Lists.newArrayList("FE");
+    private static final List<String> ALL_SESSION_TABLE_HEADER = 
Lists.newArrayList();
 
     private static final Logger LOG = 
LogManager.getLogger(SessionController.class);
 
@@ -71,6 +71,7 @@ public class SessionController extends RestBaseController {
         SESSION_TABLE_HEADER.add("QueryId");
         SESSION_TABLE_HEADER.add("Info");
         ALL_SESSION_TABLE_HEADER.addAll(SESSION_TABLE_HEADER);
+        ALL_SESSION_TABLE_HEADER.add("FE");
     }
 
     @RequestMapping(path = "/session/all", method = RequestMethod.GET)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 5dc43c3a635..09db53e4735 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -980,9 +980,6 @@ public class ConnectContext {
 
         public List<String> toRow(int connId, long nowMs, boolean showFe) {
             List<String> row = Lists.newArrayList();
-            if (showFe) {
-                row.add(Env.getCurrentEnv().getSelfNode().getHost());
-            }
             if (connId == connectionId) {
                 row.add("Yes");
             } else {
@@ -1009,6 +1006,11 @@ public class ConnectContext {
             } else {
                 row.add("");
             }
+
+            if (showFe) {
+                row.add(Env.getCurrentEnv().getSelfNode().getHost());
+            }
+
             return row;
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 60658310ffe..31a55649b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TimerTask;
@@ -171,6 +172,16 @@ public class ConnectScheduler {
         return infos;
     }
 
+    // used for thrift
+    public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql, 
boolean isShowFeHost) {
+        List<List<String>> list = new ArrayList<>();
+        long nowMs = System.currentTimeMillis();
+        for (ConnectContext ctx : connectionMap.values()) {
+            list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs, 
isShowFeHost));
+        }
+        return list;
+    }
+
     public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
         traceId2QueryId.put(traceId, queryId);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7e332b91d2c..acf5bbf8a9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -507,6 +507,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
 
+    public static final String SHOW_ALL_FE_CONNECTION = 
"show_all_fe_connection";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -1610,6 +1612,11 @@ public class SessionVariable implements Serializable, 
Writable {
                 "use other health replica when the use_fix_replica meet error" 
})
     public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
 
+    @VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION,
+            description = {"when it's true show processlist statement list all 
fe's connection",
+                    "当变量为true时,show processlist命令展示所有fe的连接"})
+    public boolean showAllFeConnection = false;
+
     // CLOUD_VARIABLES_BEGIN
     @VariableMgr.VarAttr(name = CLOUD_CLUSTER)
     public String cloudCluster = "";
@@ -3365,4 +3372,9 @@ public class SessionVariable implements Serializable, 
Writable {
     public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
         this.forceToLocalShuffle = forceToLocalShuffle;
     }
+
+    public boolean getShowAllFeConnection() {
+        return this.showAllFeConnection;
+    }
+
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 66e155a2e8f..84657e0d9aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -145,6 +145,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.ConfigBase;
 import org.apache.doris.common.DdlException;
@@ -208,7 +209,11 @@ import org.apache.doris.task.AgentClient;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TCheckStorageFormatResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
 import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.GlobalTransactionMgr;
@@ -454,13 +459,53 @@ public class ShowExecutor {
     // Handle show processlist
     private void handleShowProcesslist() {
         ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
-        List<List<String>> rowSet = Lists.newArrayList();
+        boolean isShowFullSql = showStmt.isFull();
+        boolean isShowAllFe = showStmt.isShowAllFe();
 
+        List<List<String>> rowSet = Lists.newArrayList();
         List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
-                .listConnection(ctx.getQualifiedUser(), showStmt.isFull());
+                .listConnection(ctx.getQualifiedUser(), isShowFullSql);
         long nowMs = System.currentTimeMillis();
         for (ConnectContext.ThreadInfo info : threadInfos) {
-            rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, false));
+            rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe));
+        }
+
+        if (isShowAllFe) {
+            try {
+                TShowProcessListRequest request = new 
TShowProcessListRequest();
+                request.setShowFullSql(isShowFullSql);
+                List<Pair<String, Integer>> frontends = 
FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
+                        false);
+                FrontendService.Client client = null;
+                for (Pair<String, Integer> fe : frontends) {
+                    TNetworkAddress thriftAddress = new 
TNetworkAddress(fe.key(), fe.value());
+                    try {
+                        client = 
ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to get frontend {} client. exception: 
{}", fe.key(), e);
+                        continue;
+                    }
+
+                    boolean isReturnToPool = false;
+                    try {
+                        TShowProcessListResult result = 
client.showProcessList(request);
+                        if (result.process_list != null && 
result.process_list.size() > 0) {
+                            rowSet.addAll(result.process_list);
+                        }
+                        isReturnToPool = true;
+                    } catch (Exception e) {
+                        LOG.warn("Failed to request processlist to fe: {} . 
exception: {}", fe.key(), e);
+                    } finally {
+                        if (isReturnToPool) {
+                            
ClientPool.frontendPool.returnObject(thriftAddress, client);
+                        } else {
+                            
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                LOG.warn(" fetch process list from other fe failed, ", t);
+            }
         }
 
         resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 9f6ee5baf68..7b970605dc8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -209,6 +209,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
 import org.apache.doris.thrift.TRestoreSnapshotResult;
 import org.apache.doris.thrift.TRollbackTxnRequest;
 import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
 import org.apache.doris.thrift.TShowVariableRequest;
 import org.apache.doris.thrift.TShowVariableResult;
 import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -3555,4 +3557,18 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw e;
         }
     }
+
+    @Override
+    public TShowProcessListResult showProcessList(TShowProcessListRequest 
request) {
+        boolean isShowFullSql = false;
+        if (request.isSetShowFullSql()) {
+            isShowFullSql = request.isShowFullSql();
+        }
+        List<List<String>> processList = 
ExecuteEnv.getInstance().getScheduler()
+                .listConnectionWithoutAuth(isShowFullSql, true);
+        TShowProcessListResult result = new TShowProcessListResult();
+        result.setProcessList(processList);
+        return result;
+    }
+
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 93b229a11d7..88cf25b9c67 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1372,6 +1372,14 @@ struct TGetColumnInfoResult {
     2: optional list<TColumnInfo> columns
 }
 
+struct TShowProcessListRequest {
+    1: optional bool show_full_sql
+}
+
+struct TShowProcessListResult {
+    1: optional list<list<string>> process_list
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1454,4 +1462,6 @@ service FrontendService {
     TGetColumnInfoResult getColumnInfo(1: TGetColumnInfoRequest request)
 
     Status.TStatus invalidateStatsCache(1: 
TInvalidateFollowerStatsCacheRequest request)
+
+    TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to