This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fc0222a64cb [opt](info) processlist schema table support show all fe
(#38701) (#38953)
fc0222a64cb is described below
commit fc0222a64cb58415f05d913c724929b04fab476c
Author: wangbo <[email protected]>
AuthorDate: Wed Aug 7 11:01:46 2024 +0800
[opt](info) processlist schema table support show all fe (#38701) (#38953)
pick #38701
---
be/src/exec/schema_scanner.h | 1 +
.../schema_scanner/schema_processlist_scanner.cpp | 11 ++++++++---
be/src/pipeline/exec/schema_scan_operator.cpp | 11 +++++++++++
.../java/org/apache/doris/catalog/SchemaTable.java | 17 ++++++++++++++++-
.../org/apache/doris/planner/SchemaScanNode.java | 20 ++++++++++++++++++++
gensrc/thrift/PlanNodes.thrift | 1 +
6 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 4666657af21..da61d58b943 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -69,6 +69,7 @@ struct SchemaScannerCommonParam {
int32_t port; // frontend thrift port
int64_t thread_id;
const std::string* catalog = nullptr;
+ std::set<TNetworkAddress> fe_addr_list;
};
// scanner parameter from frontend
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index f1071359d0a..c65e1d14c2c 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -55,9 +55,14 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
TShowProcessListRequest request;
request.__set_show_full_sql(true);
-
RETURN_IF_ERROR(SchemaHelper::show_process_list(*(_param->common_param->ip),
-
_param->common_param->port, request,
- &_process_list_result));
+ for (const auto& fe_addr : _param->common_param->fe_addr_list) {
+ TShowProcessListResult tmp_ret;
+ RETURN_IF_ERROR(
+ SchemaHelper::show_process_list(fe_addr.hostname,
fe_addr.port, request, &tmp_ret));
+
_process_list_result.process_list.insert(_process_list_result.process_list.end(),
+ tmp_ret.process_list.begin(),
+ tmp_ret.process_list.end());
+ }
return Status::OK();
}
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 8ff05cc41b7..d5353655ab0 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -133,6 +133,17 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
_common_scanner_param->catalog =
state->obj_pool()->add(new
std::string(tnode.schema_scan_node.catalog));
}
+
+ if (tnode.schema_scan_node.__isset.fe_addr_list) {
+ for (const auto& fe_addr : tnode.schema_scan_node.fe_addr_list) {
+ _common_scanner_param->fe_addr_list.insert(fe_addr);
+ }
+ } else if (tnode.schema_scan_node.__isset.ip &&
tnode.schema_scan_node.__isset.port) {
+ TNetworkAddress fe_addr;
+ fe_addr.hostname = tnode.schema_scan_node.ip;
+ fe_addr.port = tnode.schema_scan_node.port;
+ _common_scanner_param->fe_addr_list.insert(fe_addr);
+ }
return Status::OK();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 0995884dc6f..53b00b0880a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -504,7 +504,7 @@ public class SchemaTable extends Table {
.column("QUERY_ID",
ScalarType.createVarchar(256))
.column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
.column("FE",
-
ScalarType.createVarchar(64)).build()))
+
ScalarType.createVarchar(64)).build(), true))
.put("workload_policy",
new SchemaTable(SystemIdGenerator.getNextId(),
"workload_policy", TableType.SCHEMA,
builder().column("ID",
ScalarType.createType(PrimitiveType.BIGINT))
@@ -518,10 +518,17 @@ public class SchemaTable extends Table {
.build()))
.build();
+ private boolean fetchAllFe = false;
+
protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema) {
super(id, name, type, baseSchema);
}
+ protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema, boolean fetchAllFe) {
+ this(id, name, type, baseSchema);
+ this.fetchAllFe = fetchAllFe;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException("Do not allow to write
SchemaTable to image.");
@@ -535,6 +542,14 @@ public class SchemaTable extends Table {
return new Builder();
}
+ public static boolean isShouldFetchAllFe(String schemaTableName) {
+ Table table = TABLE_MAP.get(schemaTableName);
+ if (table != null && table instanceof SchemaTable) {
+ return ((SchemaTable) table).fetchAllFe;
+ }
+ return false;
+ }
+
/**
* For TABLE_MAP.
**/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 5a52c79e953..9418f4f6cf3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -19,6 +19,7 @@ package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
@@ -27,6 +28,8 @@ import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Frontend;
+import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -38,6 +41,7 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -84,6 +88,21 @@ public class SchemaScanNode extends ScanNode {
frontendPort = Config.rpc_port;
}
+ private void setFeAddrList(TPlanNode msg) {
+ if (SchemaTable.isShouldFetchAllFe(tableName)) {
+ List<TNetworkAddress> feAddrList = new ArrayList();
+ if (ConnectContext.get().getSessionVariable().showAllFeConnection)
{
+ List<Frontend> feList = Env.getCurrentEnv().getFrontends(null);
+ for (Frontend fe : feList) {
+ feAddrList.add(new TNetworkAddress(fe.getHost(),
fe.getRpcPort()));
+ }
+ } else {
+ feAddrList.add(new TNetworkAddress(frontendIP, frontendPort));
+ }
+ msg.schema_scan_node.setFeAddrList(feAddrList);
+ }
+ }
+
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.SCHEMA_SCAN_NODE;
@@ -116,6 +135,7 @@ public class SchemaScanNode extends ScanNode {
TUserIdentity tCurrentUser =
ConnectContext.get().getCurrentUserIdentity().toThrift();
msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
+ setFeAddrList(msg);
}
@Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f8a08dd708f..fc1a6e6baf5 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -672,6 +672,7 @@ struct TSchemaScanNode {
12: optional bool show_hidden_cloumns = false
// 13: optional list<TSchemaTableStructure> table_structure // deprecated
14: optional string catalog
+ 15: optional list<Types.TNetworkAddress> fe_addr_list
}
struct TMetaScanNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]