This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch 2.0.10-decimal-patch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/2.0.10-decimal-patch by this
push:
new 25fc5d24826 Fix insert select missing audit log when connect follower
FE (#36597)
25fc5d24826 is described below
commit 25fc5d24826481ce080c7b36ed2ea8f6cc97a883
Author: wangbo <[email protected]>
AuthorDate: Thu Jun 20 15:23:24 2024 +0800
Fix insert select missing audit log when connect follower FE (#36597)
## Proposed changes
pick #36454
---
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/query_context.h | 7 +-
be/src/runtime/runtime_query_statistics_mgr.cpp | 34 ++++--
.../apache/doris/planner/StreamLoadPlanner.java | 2 +
.../java/org/apache/doris/qe/ConnectContext.java | 9 ++
.../main/java/org/apache/doris/qe/Coordinator.java | 14 +++
.../WorkloadRuntimeStatusMgr.java | 121 +++++++++++----------
gensrc/thrift/PaloInternalService.thrift | 3 +
8 files changed, 119 insertions(+), 75 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1529d66def2..66538529c3f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -692,9 +692,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
}
query_ctx->coord_addr = params.coord;
+ query_ctx->current_connect_fe = params.current_connect_fe;
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi,
query_ctx->query_id.lo)
<< " coord_addr " << query_ctx->coord_addr
- << " total fragment num on current host: " <<
params.fragment_num_on_host;
+ << " total fragment num on current host: " <<
params.fragment_num_on_host
+ << " report audit fe:" << params.current_connect_fe;
query_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 8746483df4c..e47e09e5921 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -182,7 +182,7 @@ public:
void register_query_statistics(std::shared_ptr<QueryStatistics> qs) {
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id),
qs,
-
coord_addr);
+
current_connect_fe);
}
std::shared_ptr<QueryStatistics> get_query_statistics() {
@@ -198,7 +198,7 @@ public:
if (_exec_env &&
_exec_env->runtime_query_statistics_mgr()) { // for ut
FragmentMgrTest.normal
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- query_id_str, qs, coord_addr);
+ query_id_str, qs, current_connect_fe);
}
} else {
LOG(INFO) << " query " << query_id_str << " get memory query
statistics failed ";
@@ -212,7 +212,7 @@ public:
if (_exec_env &&
_exec_env->runtime_query_statistics_mgr()) { // for ut
FragmentMgrTest.normal
_exec_env->runtime_query_statistics_mgr()->register_query_statistics(
- print_id(query_id), _cpu_statistics, coord_addr);
+ print_id(query_id), _cpu_statistics,
current_connect_fe);
}
}
}
@@ -226,6 +226,7 @@ public:
std::string user;
std::string group;
TNetworkAddress coord_addr;
+ TNetworkAddress current_connect_fe;
TQueryGlobals query_globals;
/// In the current implementation, for multiple fragments executed by a
query on the same BE node,
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 5c40ea61763..0ed8cbeb79c 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -83,8 +83,8 @@ void
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
if (!coord_status.ok()) {
std::stringstream ss;
- LOG(WARNING) << "could not get client " << add_str
- << " when report workload runtime stats, reason is "
+ LOG(WARNING) << "[report_query_statistics]could not get client "
<< add_str
+ << " when report workload runtime stats, reason:"
<< coord_status.to_string();
continue;
}
@@ -103,26 +103,38 @@ void
RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
coord->reportExecStatus(res, params);
rpc_result[addr] = true;
} catch (apache::thrift::TApplicationException& e) {
- LOG(WARNING) << "fe " << add_str
- << " throw exception when report statistics, reason="
<< e.what()
+ LOG(WARNING) << "[report_query_statistics]fe " << add_str
+ << " throw exception when report statistics, reason:"
<< e.what()
<< " , you can see fe log for details.";
} catch (apache::thrift::transport::TTransportException& e) {
- LOG(WARNING) << "report workload runtime statistics to " << add_str
- << " failed, err: " << e.what();
+ LOG(WARNING) << "[report_query_statistics]report workload runtime
statistics to "
+ << add_str << " failed, reason: " << e.what();
rpc_status = coord.reopen();
if (!rpc_status.ok()) {
- LOG(WARNING)
- << "reopen thrift client failed when report workload
runtime statistics to"
- << add_str;
+ LOG(WARNING) << "[report_query_statistics]reopen thrift client
failed when report "
+ "workload runtime statistics to"
+ << add_str;
} else {
try {
coord->reportExecStatus(res, params);
rpc_result[addr] = true;
} catch (apache::thrift::transport::TTransportException& e2) {
- LOG(WARNING) << "retry report workload runtime stats to "
<< add_str
- << " failed, err: " << e2.what();
+ LOG(WARNING)
+ << "[report_query_statistics]retry report workload
runtime stats to "
+ << add_str << " failed, reason: " << e2.what();
+ } catch (std::exception& e) {
+ LOG_WARNING(
+ "[report_query_statistics]unknow exception when
report workload "
+ "runtime statistics to {}, "
+ "reason:{}. ",
+ add_str, e.what());
}
}
+ } catch (std::exception& e) {
+ LOG_WARNING(
+ "[report_query_statistics]unknown exception when report
workload runtime "
+ "statistics to {}, reason:{}. ",
+ add_str, e.what());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 16c9e8f32eb..26c0f1fae65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -283,6 +283,7 @@ public class StreamLoadPlanner {
params.setDescTbl(analyzer.getDescTbl().toThrift());
params.setCoord(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+ params.setCurrentConnectFe(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
// user load id (streamLoadTask.id) as query id
@@ -501,6 +502,7 @@ public class StreamLoadPlanner {
pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
pipParams.setCoord(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
+ pipParams.setCurrentConnectFe(new
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
pipParams.setQueryId(loadId);
pipParams.per_exch_num_senders = Maps.newHashMap();
pipParams.destinations = Lists.newArrayList();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 2840021aea9..217c9633153 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -187,6 +187,10 @@ public class ConnectContext {
private String workloadGroupName = "";
+ // isProxy used for forward request from other FE and used in one thread
+ // it's default thread-safe
+ private boolean isProxy = false;
+
public void setUserQueryTimeout(int queryTimeout) {
if (queryTimeout > 0) {
sessionVariable.setQueryTimeoutS(queryTimeout);
@@ -298,6 +302,7 @@ public class ConnectContext {
mysqlChannel = new MysqlChannel(connection, this);
} else if (isProxy) {
mysqlChannel = new ProxyMysqlChannel();
+ this.isProxy = isProxy;
} else {
mysqlChannel = new DummyMysqlChannel();
}
@@ -914,5 +919,9 @@ public class ConnectContext {
public void setUserVars(Map<String, LiteralExpr> userVars) {
this.userVars = userVars;
}
+
+ public boolean isProxy() {
+ return isProxy;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e2889310428..e50bf77650d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -121,6 +121,7 @@ import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
@@ -178,6 +179,10 @@ public class Coordinator implements CoordInterface {
private final TQueryGlobals queryGlobals = new TQueryGlobals();
private TQueryOptions queryOptions;
private TNetworkAddress coordAddress;
+ // fe audit log in connected FE,if a query is forward
+ // we should send the connected FE to be,
+ // then be report query statistics to the connected FE
+ private TNetworkAddress currentConnectFE;
// protects all fields below
private final Lock lock = new ReentrantLock();
@@ -497,6 +502,13 @@ public class Coordinator implements CoordInterface {
}
coordAddress = new TNetworkAddress(localIP, Config.rpc_port);
+ if (ConnectContext.get() != null && ConnectContext.get().isProxy() &&
!StringUtils.isEmpty(
+ ConnectContext.get().getCurrentConnectedFEIp())) {
+ currentConnectFE = new
TNetworkAddress(ConnectContext.get().getCurrentConnectedFEIp(),
+ Config.rpc_port);
+ } else {
+ currentConnectFE = coordAddress;
+ }
this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend();
if (LOG.isDebugEnabled()) {
@@ -3206,6 +3218,7 @@ public class Coordinator implements CoordInterface {
params.params.setSenderId(i);
params.params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
+ params.setCurrentConnectFe(currentConnectFE);
params.setBackendNum(backendNum++);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
@@ -3292,6 +3305,7 @@ public class Coordinator implements CoordInterface {
params.setDestinations(destinations);
params.setNumSenders(instanceExecParams.size());
params.setCoord(coordAddress);
+ params.setCurrentConnectFe(currentConnectFE);
params.setQueryGlobals(queryGlobals);
params.setQueryOptions(queryOptions);
params.query_options.setEnablePipelineEngine(true);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index 623c3c9aaa9..3e9f7a381c4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -19,6 +19,7 @@ package org.apache.doris.resource.workloadschedpolicy;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.thrift.TQueryStatistics;
@@ -30,22 +31,36 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+// NOTE: not using a lock for beToQueryStatsMap's update because it should
void global lock for all be
+// this may cause in some corner case missing statistics update,for example:
+// time1: clear logic judge query 1 is timeout
+// time2: query 1 is update by report
+// time3: clear logic remove query 1
+// in this case, lost query stats is allowed. because query report time out is
60s by default,
+// when this case happens, we should first to find why be not report for so
long.
public class WorkloadRuntimeStatusMgr {
private static final Logger LOG =
LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
- private Map<Long, Map<String, TQueryStatistics>> beToQueryStatsMap =
Maps.newConcurrentMap();
- private Map<Long, Long> beLastReportTime = Maps.newConcurrentMap();
- private Map<String, Long> queryLastReportTime = Maps.newConcurrentMap();
+ private Map<Long, BeReportInfo> beToQueryStatsMap =
Maps.newConcurrentMap();
private final ReentrantReadWriteLock queryAuditEventLock = new
ReentrantReadWriteLock();
private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();
+ private class BeReportInfo {
+ volatile long beLastReportTime;
+
+ BeReportInfo(long beLastReportTime) {
+ this.beLastReportTime = beLastReportTime;
+ }
+
+ Map<String, Pair<Long, TQueryStatistics>> queryStatsMap =
Maps.newConcurrentMap();
+ }
+
class WorkloadRuntimeStatsThread extends Daemon {
WorkloadRuntimeStatusMgr workloadStatsMgr;
@@ -130,41 +145,65 @@ public class WorkloadRuntimeStatusMgr {
return;
}
long beId = params.backend_id;
- Map<String, TQueryStatistics> queryIdMap = beToQueryStatsMap.get(beId);
- beLastReportTime.put(beId, System.currentTimeMillis());
- if (queryIdMap == null) {
- queryIdMap = Maps.newConcurrentMap();
- queryIdMap.putAll(params.query_statistics_map);
- beToQueryStatsMap.put(beId, queryIdMap);
+ // NOTE(wb) one be sends update request one by one,
+ // so there is no need a global lock for beToQueryStatsMap here,
+ // just keep one be's put/remove/get is atomic operation is enough
+ long currentTime = System.currentTimeMillis();
+ BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+ if (beReportInfo == null) {
+ beReportInfo = new BeReportInfo(currentTime);
+ beToQueryStatsMap.put(beId, beReportInfo);
} else {
- long currentTime = System.currentTimeMillis();
- for (Map.Entry<String, TQueryStatistics> entry :
params.query_statistics_map.entrySet()) {
- queryIdMap.put(entry.getKey(), entry.getValue());
- queryLastReportTime.put(entry.getKey(), currentTime);
+ beReportInfo.beLastReportTime = currentTime;
+ }
+ for (Map.Entry<String, TQueryStatistics> entry :
params.query_statistics_map.entrySet()) {
+ beReportInfo.queryStatsMap.put(entry.getKey(),
Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
+ }
+ }
+
+ void clearReportTimeoutBeStatistics() {
+ // 1 clear report timeout be
+ Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
+ Long currentTime = System.currentTimeMillis();
+ for (Long beId : currentBeIdSet) {
+ BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+ if (currentTime - beReportInfo.beLastReportTime >
Config.be_report_query_statistics_timeout_ms) {
+ beToQueryStatsMap.remove(beId);
+ continue;
+ }
+ Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
+ for (String queryId : queryIdSet) {
+ Pair<Long, TQueryStatistics> pair =
beReportInfo.queryStatsMap.get(queryId);
+ long queryLastReportTime = pair.first;
+ if (currentTime - queryLastReportTime >
Config.be_report_query_statistics_timeout_ms) {
+ beReportInfo.queryStatsMap.remove(queryId);
+ }
}
}
}
+ // NOTE: currently getQueryStatisticsMap must be called before clear
beToQueryStatsMap
+ // so there is no need lock or null check when visit beToQueryStatsMap
public Map<String, TQueryStatistics> getQueryStatisticsMap() {
// 1 merge query stats in all be
Set<Long> beIdSet = beToQueryStatsMap.keySet();
- Map<String, TQueryStatistics> retQueryMap = Maps.newHashMap();
+ Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
for (Long beId : beIdSet) {
- Map<String, TQueryStatistics> currentQueryMap =
beToQueryStatsMap.get(beId);
- Set<String> queryIdSet = currentQueryMap.keySet();
+ BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
+ Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
for (String queryId : queryIdSet) {
- TQueryStatistics retQuery = retQueryMap.get(queryId);
+ TQueryStatistics curQueryStats =
beReportInfo.queryStatsMap.get(queryId).second;
+
+ TQueryStatistics retQuery = resultQueryMap.get(queryId);
if (retQuery == null) {
retQuery = new TQueryStatistics();
- retQueryMap.put(queryId, retQuery);
+ resultQueryMap.put(queryId, retQuery);
}
-
- TQueryStatistics curQueryStats = currentQueryMap.get(queryId);
mergeQueryStatistics(retQuery, curQueryStats);
}
}
- return retQueryMap;
+ return resultQueryMap;
}
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics
src) {
@@ -176,44 +215,6 @@ public class WorkloadRuntimeStatusMgr {
}
}
- void clearReportTimeoutBeStatistics() {
- // 1 clear report timeout be
- Set<Long> beNeedToRemove = new HashSet<>();
- Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
- Long currentTime = System.currentTimeMillis();
- for (Long beId : currentBeIdSet) {
- Long lastReportTime = beLastReportTime.get(beId);
- if (lastReportTime != null
- && currentTime - lastReportTime >
Config.be_report_query_statistics_timeout_ms) {
- beNeedToRemove.add(beId);
- }
- }
- for (Long beId : beNeedToRemove) {
- beToQueryStatsMap.remove(beId);
- beLastReportTime.remove(beId);
- }
-
- // 2 clear report timeout query
- Set<String> queryNeedToClear = new HashSet<>();
- Long newCurrentTime = System.currentTimeMillis();
- Set<String> queryLastReportTimeKeySet = queryLastReportTime.keySet();
- for (String queryId : queryLastReportTimeKeySet) {
- Long lastReportTime = queryLastReportTime.get(queryId);
- if (lastReportTime != null
- && newCurrentTime - lastReportTime >
Config.be_report_query_statistics_timeout_ms) {
- queryNeedToClear.add(queryId);
- }
- }
-
- Set<Long> beIdSet = beToQueryStatsMap.keySet();
- for (String queryId : queryNeedToClear) {
- for (Long beId : beIdSet) {
- beToQueryStatsMap.get(beId).remove(queryId);
- }
- queryLastReportTime.remove(queryId);
- }
- }
-
private void queryAuditEventLogWriteLock() {
queryAuditEventLock.writeLock().lock();
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 22422aeabac..4b6a19783f7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -453,6 +453,8 @@ struct TExecPlanFragmentParams {
// scan node id -> scan range params, only for external file scan
24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
+
+ 32: optional Types.TNetworkAddress current_connect_fe
}
struct TExecPlanFragmentParamsList {
@@ -667,6 +669,7 @@ struct TPipelineFragmentParams {
28: optional string table_name
// scan node id -> scan range params, only for external file scan
29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
+ 43: optional Types.TNetworkAddress current_connect_fe
}
struct TPipelineFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]