This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 67fe2c86dc5 [Feature](executor)Support ShowProcessStmt Show all Fe
connection (#30907) (#30996)
67fe2c86dc5 is described below
commit 67fe2c86dc520d62946ccf6a8d37d14922e1466b
Author: wangbo <[email protected]>
AuthorDate: Thu Feb 8 20:01:38 2024 +0800
[Feature](executor)Support ShowProcessStmt Show all Fe connection (#30907)
(#30996)
---
.../apache/doris/analysis/ShowProcesslistStmt.java | 24 +++++++++-
.../doris/common/proc/FrontendsProcNode.java | 18 ++++++++
.../doris/httpv2/controller/SessionController.java | 2 +-
.../java/org/apache/doris/qe/ConnectContext.java | 7 ++-
.../java/org/apache/doris/qe/ConnectScheduler.java | 11 +++++
.../java/org/apache/doris/qe/SessionVariable.java | 11 +++++
.../java/org/apache/doris/qe/ShowExecutor.java | 51 ++++++++++++++++++++--
.../apache/doris/service/FrontendServiceImpl.java | 16 +++++++
.../org/apache/doris/qe/ConnectContextTest.java | 2 +-
gensrc/thrift/FrontendService.thrift | 10 +++++
10 files changed, 145 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
index 3b6c67b1bba..96e8a082249 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowProcesslistStmt.java
@@ -20,6 +20,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
// SHOW PROCESSLIST statement.
@@ -39,7 +40,23 @@ public class ShowProcesslistStmt extends ShowStmt {
.addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
.addColumn(new Column("Info", ScalarType.STRING)).build();
+ private static final ShowResultSetMetaData ALL_META_DATA =
ShowResultSetMetaData.builder()
+ .addColumn(new Column("CurrentConnected",
ScalarType.createVarchar(16)))
+ .addColumn(new Column("Id",
ScalarType.createType(PrimitiveType.BIGINT)))
+ .addColumn(new Column("User", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Host", ScalarType.createVarchar(16)))
+ .addColumn(new Column("LoginTime", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Catalog", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Db", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Command", ScalarType.createVarchar(16)))
+ .addColumn(new Column("Time",
ScalarType.createType(PrimitiveType.INT)))
+ .addColumn(new Column("State", ScalarType.createVarchar(64)))
+ .addColumn(new Column("QueryId", ScalarType.createVarchar(64)))
+ .addColumn(new Column("Info", ScalarType.STRING))
+ .addColumn(new Column("FE", ScalarType.createVarchar(16))).build();
+
private boolean isFull;
+ private boolean isShowAllFe;
public ShowProcesslistStmt(boolean isFull) {
this.isFull = isFull;
@@ -51,6 +68,11 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public void analyze(Analyzer analyzer) {
+ this.isShowAllFe =
ConnectContext.get().getSessionVariable().getShowAllFeConnection();
+ }
+
+ public boolean isShowAllFe() {
+ return isShowAllFe;
}
@Override
@@ -65,6 +87,6 @@ public class ShowProcesslistStmt extends ShowStmt {
@Override
public ShowResultSetMetaData getMetaData() {
- return META_DATA;
+ return isShowAllFe ? ALL_META_DATA : META_DATA;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
index a88315ee858..b5d2fa7d704 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/FrontendsProcNode.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.proc;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
@@ -71,6 +72,23 @@ public class FrontendsProcNode implements ProcNodeInterface {
return result;
}
+ public static List<Pair<String, Integer>> getFrontendWithRpcPort(Env env,
boolean includeSelf) {
+ List<Pair<String, Integer>> allFe = new ArrayList<>();
+ List<Frontend> frontends = env.getFrontends(null);
+
+ String selfNode = Env.getCurrentEnv().getSelfNode().getHost();
+ if (ConnectContext.get() != null &&
!Strings.isNullOrEmpty(ConnectContext.get().getCurrentConnectedFEIp())) {
+ selfNode = ConnectContext.get().getCurrentConnectedFEIp();
+ }
+
+ String finalSelfNode = selfNode;
+ frontends.stream()
+ .filter(fe -> (!fe.getHost().equals(finalSelfNode) ||
includeSelf))
+ .map(fe -> Pair.of(fe.getHost(), fe.getRpcPort()))
+ .forEach(allFe::add);
+ return allFe;
+ }
+
public static void getFrontendsInfo(Env env, List<List<String>> infos) {
InetSocketAddress master = null;
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
index 4f20fc8c9ef..10577421982 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/SessionController.java
@@ -74,7 +74,7 @@ public class SessionController extends BaseController {
long nowMs = System.currentTimeMillis();
for (ConnectContext.ThreadInfo info : threadInfos) {
- rows.add(info.toRow(-1, nowMs));
+ rows.add(info.toRow(-1, nowMs, false));
}
for (List<String> row : rows) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index aeb230557fd..6f9527f274b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -822,7 +822,7 @@ public class ConnectContext {
public class ThreadInfo {
public boolean isFull;
- public List<String> toRow(int connId, long nowMs) {
+ public List<String> toRow(int connId, long nowMs, boolean showFe) {
List<String> row = Lists.newArrayList();
if (connId == connectionId) {
row.add("Yes");
@@ -850,6 +850,11 @@ public class ConnectContext {
} else {
row.add("");
}
+
+ if (showFe) {
+ row.add(Env.getCurrentEnv().getSelfNode().getHost());
+ }
+
return row;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index bbc31a32dc7..b8f2c64390f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
@@ -159,6 +160,16 @@ public class ConnectScheduler {
return infos;
}
+ // used for thrift
+ public List<List<String>> listConnectionWithoutAuth(boolean isShowFullSql,
boolean isShowFeHost) {
+ List<List<String>> list = new ArrayList<>();
+ long nowMs = System.currentTimeMillis();
+ for (ConnectContext ctx : connectionMap.values()) {
+ list.add(ctx.toThreadInfo(isShowFullSql).toRow(-1, nowMs,
isShowFeHost));
+ }
+ return list;
+ }
+
public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
traceId2QueryId.put(traceId, queryId);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 09fd164eb2d..f6f232fb92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -443,6 +443,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String FORCE_JNI_SCANNER = "force_jni_scanner";
+ public static final String SHOW_ALL_FE_CONNECTION =
"show_all_fe_connection";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -1331,6 +1333,11 @@ public class SessionVariable implements Serializable,
Writable {
public static final String IGNORE_RUNTIME_FILTER_IDS =
"ignore_runtime_filter_ids";
+ @VariableMgr.VarAttr(name = SHOW_ALL_FE_CONNECTION,
+ description = {"when it's true show processlist statement list all
fe's connection",
+ "当变量为true时,show processlist命令展示所有fe的连接"})
+ public boolean showAllFeConnection = false;
+
public Set<Integer> getIgnoredRuntimeFilterIds() {
return Arrays.stream(ignoreRuntimeFilterIds.split(",[\\s]*"))
.map(v -> {
@@ -2847,5 +2854,9 @@ public class SessionVariable implements Serializable,
Writable {
public void setForceJniScanner(boolean force) {
forceJniScanner = force;
}
+
+ public boolean getShowAllFeConnection() {
+ return this.showAllFeConnection;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index abbaed302c8..c110fdd3797 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -144,6 +144,7 @@ import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
+import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
@@ -209,7 +210,11 @@ import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.SnapshotTask;
+import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TCheckStorageFormatResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUnit;
import org.apache.doris.transaction.GlobalTransactionMgr;
@@ -453,13 +458,53 @@ public class ShowExecutor {
// Handle show processlist
private void handleShowProcesslist() {
ShowProcesslistStmt showStmt = (ShowProcesslistStmt) stmt;
- List<List<String>> rowSet = Lists.newArrayList();
+ boolean isShowFullSql = showStmt.isFull();
+ boolean isShowAllFe = showStmt.isShowAllFe();
+ List<List<String>> rowSet = Lists.newArrayList();
List<ConnectContext.ThreadInfo> threadInfos = ctx.getConnectScheduler()
- .listConnection(ctx.getQualifiedUser(), showStmt.isFull());
+ .listConnection(ctx.getQualifiedUser(), isShowFullSql);
long nowMs = System.currentTimeMillis();
for (ConnectContext.ThreadInfo info : threadInfos) {
- rowSet.add(info.toRow(ctx.getConnectionId(), nowMs));
+ rowSet.add(info.toRow(ctx.getConnectionId(), nowMs, isShowAllFe));
+ }
+
+ if (isShowAllFe) {
+ try {
+ TShowProcessListRequest request = new
TShowProcessListRequest();
+ request.setShowFullSql(isShowFullSql);
+ List<Pair<String, Integer>> frontends =
FrontendsProcNode.getFrontendWithRpcPort(Env.getCurrentEnv(),
+ false);
+ FrontendService.Client client = null;
+ for (Pair<String, Integer> fe : frontends) {
+ TNetworkAddress thriftAddress = new
TNetworkAddress(fe.key(), fe.value());
+ try {
+ client =
ClientPool.frontendPool.borrowObject(thriftAddress, 3000);
+ } catch (Exception e) {
+ LOG.warn("Failed to get frontend {} client. exception:
{}", fe.key(), e);
+ continue;
+ }
+
+ boolean isReturnToPool = false;
+ try {
+ TShowProcessListResult result =
client.showProcessList(request);
+ if (result.process_list != null &&
result.process_list.size() > 0) {
+ rowSet.addAll(result.process_list);
+ }
+ isReturnToPool = true;
+ } catch (Exception e) {
+ LOG.warn("Failed to request processlist to fe: {} .
exception: {}", fe.key(), e);
+ } finally {
+ if (isReturnToPool) {
+
ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+
ClientPool.frontendPool.invalidateObject(thriftAddress, client);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn(" fetch process list from other fe failed, ", t);
+ }
}
resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d70243804a6..d1cad65d49d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -172,6 +172,8 @@ import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
+import org.apache.doris.thrift.TShowProcessListRequest;
+import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -3284,4 +3286,18 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw e;
}
}
+
+ @Override
+ public TShowProcessListResult showProcessList(TShowProcessListRequest
request) {
+ boolean isShowFullSql = false;
+ if (request.isSetShowFullSql()) {
+ isShowFullSql = request.isShowFullSql();
+ }
+ List<List<String>> processList =
ExecuteEnv.getInstance().getScheduler()
+ .listConnectionWithoutAuth(isShowFullSql, true);
+ TShowProcessListResult result = new TShowProcessListResult();
+ result.setProcessList(processList);
+ return result;
+ }
+
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
index 85b60f53e3a..a15bdd8178a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java
@@ -104,7 +104,7 @@ public class ConnectContextTest {
// Thread info
Assert.assertNotNull(ctx.toThreadInfo(false));
- List<String> row = ctx.toThreadInfo(false).toRow(101, 1000);
+ List<String> row = ctx.toThreadInfo(false).toRow(101, 1000, false);
Assert.assertEquals(12, row.size());
Assert.assertEquals("Yes", row.get(0));
Assert.assertEquals("101", row.get(1));
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 910a3763028..eaf8d0cd330 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1245,6 +1245,14 @@ struct TGetBackendMetaResult {
3: optional Types.TNetworkAddress master_address
}
+struct TShowProcessListRequest {
+ 1: optional bool show_full_sql
+}
+
+struct TShowProcessListResult {
+ 1: optional list<list<string>> process_list
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -1318,4 +1326,6 @@ service FrontendService {
TGetBackendMetaResult getBackendMeta(1: TGetBackendMetaRequest request)
Status.TStatus invalidateStatsCache(1:
TInvalidateFollowerStatsCacheRequest request)
+
+ TShowProcessListResult showProcessList(1: TShowProcessListRequest request)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]