This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fb406cc1885 [Feature](executor)Support ShowProcessStmt Show all Fe
connection (#30907)
fb406cc1885 is described below
commit fb406cc188503256b1ccff112656bf96e93f4a75
Author: wangbo <[email protected]>
AuthorDate: Thu Feb 8 15:51:47 2024 +0800
[Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907)
---
.../apache/doris/analysis/ShowProcesslistStmt.java | 24 +++++++++-
.../doris/httpv2/controller/SessionController.java | 3 +-
.../java/org/apache/doris/qe/ConnectContext.java | 8 ++--
.../java/org/apache/doris/qe/ConnectScheduler.java | 11 +++++
.../java/org/apache/doris/qe/SessionVariable.java | 12 +++++
.../java/org/apache/doris/qe/ShowExecutor.java | 51 ++++++++++++++++++++--
.../apache/doris/service/FrontendServiceImpl.java | 16 +++++++
gensrc/thrift/FrontendService.thrift | 10 +++++
8 files changed, 127 insertions(+), 8 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
index 3b6c67b1bba..96e8a082249 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
// SHOW PROCESSLIST statement.
@@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt {
.addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
.addColumn(new Column("Info", ScalarType.STRING)).build();
+ private static final ShowResultSetMetaData ALL_META_DATA =
ShowResultSetMetaData.builder()
+ .addColumn(new Column("CurrentConnected",
ScalarType.createVarchar(16)))
+ .addColumn(new Column("Id",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("User", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Host", ScalarType.createVarchar(16)))
+ .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Db", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Command", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Time",
ScalarType.createType(PrimitiveType.INT)))
+ .addColumn(new Column("State", ScalarType.createVarchar(64)))
+ .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
+ .addColumn(new Column("Info", ScalarType.STRING))
+ .addColumn(new Column("FE", ScalarType.createVarchar(16))).build();
+
private boolean isFull;
+ private boolean isShowAllFe;
public ShowProcesslistStmt(boolean isFull) {
this.isFull = isFull;
@@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public void analyze(Analyzer analyzer) {
+ this.isShowAllFe =
ConnectContext.get().getSessionVariable().getShowAllFeConnection();
+ }
+
+ public boolean isShowAllFe() {
+ return isShowAllFe;
}
@Override
@@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public ShowResultSetMetaData getMetaData() {
- return META_DATA;
+ return isShowAllFe ? ALL_META_DATA : META_DATA;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 27a28dbb4f4..440e9b40433 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -53,7 +53,7 @@ public class SessionController extends RestBaseController {
private static final List<String> SESSION_TABLE_HEADER =
Lists.newArrayList();
- private static final List<String> ALL_SESSION_TABLE_HEADER =
Lists.newArrayList("FE");
+ private static final List<String> ALL_SESSION_TABLE_HEADER =
Lists.newArrayList();
private static final Logger LOG =
LogManager.getLogger(SessionController.class);
@@ -71,6 +71,7 @@ public class SessionController extends RestBaseController {
SESSION_TABLE_HEADER.add("QueryId");
SESSION_TABLE_HEADER.add("Info");
ALL_SESSION_TABLE_HEADER.addAll(SESSION_TABLE_HEADER);
+ ALL_SESSION_TABLE_HEADER.add("FE");
}
@RequestMapping(path = "/session/all", method = RequestMethod.GET)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 6e32a896895..a10a80e6d20 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1006,9 +1006,6 @@ public class ConnectContext {
public List<String> toRow(int connId, long nowMs, boolean showFe) {
List<String> row = Lists.newArrayList();
- if (showFe) {
- row.add(Env.getCurrentEnv().getSelfNode().getHost());
- }
if (connId == connectionId) {
row.add("Yes");
} else {
@@ -1035,6 +1032,11 @@ public class ConnectContext {
} else {
row.add("");
}
+
+ if (showFe) {
+ row.add(Env.getCurrentEnv().getSelfNode().getHost());
+ }
+
return row;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 60658310ffe..31a55649b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@@ -171,6 +172,16 @@ public class ConnectScheduler {
return infos;
}
+ // used for thrift
+ public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql,
boolean isShowFeHost) {
+ List<List<String>> list = new ArrayList<>();
+ long nowMs = System.currentTimeMillis();
+ for (ConnectContext ctx : connectionMap.values()) {
+ list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs,
isShowFeHost));
+ }
+ return list;
+ }
+
public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
traceId2QueryId.put(traceId, queryId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8effd6adae2..74ec4b94b91 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -507,6 +507,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
+ public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -1610,6 +1612,11 @@ public class SessionVariable implements Serializable,
Writable {
"use other health replica when the use_fix_replica meet error"
})
public boolean fallbackOtherReplicaWhenFixedCorrupt = false;
+ @VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION,
+ description = {"when it's true show processlist statement list all
fe's connection",
+ "当变量为true时,show processlist命令展示所有fe的连接"})
+ public boolean showAllFeConnection = false;
+
// CLOUD_VARIABLES_BEGIN
@VariableMgr.VarAttr(name = CLOUD_CLUSTER)
public String cloudCluster = "";
@@ -3383,4 +3390,9 @@ public class SessionVariable implements Serializable,
Writable {
public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
this.forceToLocalShuffle = forceToLocalShuffle;
}
+
+ public boolean getShowAllFeConnection() {
+ return this.showAllFeConnection;
+ }
+
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index a628132e3aa..275d55c7315 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -145,6 +145,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
@@ -208,7 +209,11 @@ import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TCheckStorageFormatResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
@@ -454,13 +459,53 @@ public class ShowExecutor {
// Handle show processlist
private void handleShowProcesslist() {
ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
- List<List<String>> rowSet = Lists.newArrayList();
+ boolean isShowFullSql = showStmt.isFull();
+ boolean isShowAllFe = showStmt.isShowAllFe();
+ List<List<String>> rowSet = Lists.newArrayList();
List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
- .listConnection(ctx.getQualifiedUser(), showStmt.isFull());
+ .listConnection(ctx.getQualifiedUser(), isShowFullSql);
long nowMs = System.currentTimeMillis();
for (ConnectContext.ThreadInfo info : threadInfos) {
- rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, false));
+ rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe));
+ }
+
+ if (isShowAllFe) {
+ try {
+ TShowProcessListRequest request = new
TShowProcessListRequest();
+ request.setShowFullSql(isShowFullSql);
+ List<Pair<String, Integer>> frontends =
FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
+ false);
+ FrontendService.Client client = null;
+ for (Pair<String, Integer> fe : frontends) {
+ TNetworkAddress thriftAddress = new
TNetworkAddress(fe.key(), fe.value());
+ try {
+ client =
ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
+ } catch (Exception e) {
+ LOG.warn("Failed to get frontend {} client. exception:
{}", fe.key(), e);
+ continue;
+ }
+
+ boolean isReturnToPool = false;
+ try {
+ TShowProcessListResult result =
client.showProcessList(request);
+ if (result.process_list != null &&
result.process_list.size() > 0) {
+ rowSet.addAll(result.process_list);
+ }
+ isReturnToPool = true;
+ } catch (Exception e) {
+ LOG.warn("Failed to request processlist to fe: {} .
exception: {}", fe.key(), e);
+ } finally {
+ if (isReturnToPool) {
+
ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn(" fetch process list from other fe failed, ", t);
+ }
}
resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 16a4bfa8145..78eb936e4d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -210,6 +210,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -3567,4 +3569,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw e;
}
}
+
+ @Override
+ public TShowProcessListResult showProcessList(TShowProcessListRequest
request) {
+ boolean isShowFullSql = false;
+ if (request.isSetShowFullSql()) {
+ isShowFullSql = request.isShowFullSql();
+ }
+ List<List<String>> processList =
ExecuteEnv.getInstance().getScheduler()
+ .listConnectionWithoutAuth(isShowFullSql, true);
+ TShowProcessListResult result = new TShowProcessListResult();
+ result.setProcessList(processList);
+ return result;
+ }
+
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 15f424736ef..2fdea36f88c 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1390,6 +1390,14 @@ struct TGetColumnInfoResult {
2: optional list<TColumnInfo> columns
}
+struct TShowProcessListRequest {
+ 1: optional bool show_full_sql
+}
+
+struct TShowProcessListResult {
+ 1: optional list<list<string>> process_list
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1472,4 +1480,6 @@ service FrontendService {
TGetColumnInfoResult getColumnInfo(1: TGetColumnInfoRequest request)
Status.TStatus invalidateStatsCache(1:
TInvalidateFollowerStatsCacheRequest request)
+
+ TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]