This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 25c89346972 [refactor](profile report) Remove old profile reporting.
(#40185)
25c89346972 is described below
commit 25c89346972e61201bc9dde60776b9cdb4d84145
Author: zhiqiang <[email protected]>
AuthorDate: Mon Sep 23 08:51:56 2024 +0800
[refactor](profile report) Remove old profile reporting. (#40185)
1. remove the old profile report
2. Add FragmentLevelProfile in new profile report.
---
be/src/pipeline/pipeline_fragment_context.cpp | 23 +++++----
be/src/pipeline/pipeline_fragment_context.h | 2 +-
be/src/runtime/fragment_mgr.cpp | 49 ++------------------
be/src/runtime/query_context.cpp | 3 --
be/src/runtime/query_context.h | 2 -
be/src/runtime/runtime_query_statistics_mgr.cpp | 5 +-
.../doris/common/profile/ExecutionProfile.java | 54 +---------------------
.../java/org/apache/doris/qe/QeProcessorImpl.java | 19 --------
8 files changed, 24 insertions(+), 133 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8fb750b9e97..e3d7f56d8f3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -242,14 +242,14 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_timeout = request.query_options.execution_timeout;
}
- _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
- _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
+ _fragment_level_profile =
std::make_unique<RuntimeProfile>("PipelineContext");
+ _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
- _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
- _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
- _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile,
"PlanLocalShuffleTime");
- _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
- _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile,
"PrepareAllPipelinesTime");
+ _build_pipelines_timer = ADD_TIMER(_fragment_level_profile,
"BuildPipelinesTime");
+ _init_context_timer = ADD_TIMER(_fragment_level_profile,
"InitContextTime");
+ _plan_local_shuffle_timer = ADD_TIMER(_fragment_level_profile,
"PlanLocalShuffleTime");
+ _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
+ _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile,
"PrepareAllPipelinesTime");
{
SCOPED_TIMER(_init_context_timer);
_num_instances = request.local_params.size();
@@ -1727,7 +1727,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
return;
}
Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
-
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
+
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
static_cast<void>(send_report(true));
// Print profile content in info log is a tempoeray solution for stream
load and external_connector.
// Since stream load does not have someting like coordinator on FE, so
@@ -1803,8 +1803,6 @@ Status PipelineFragmentContext::send_report(bool done) {
ReportStatusRequest req {exec_status,
runtime_states,
- _runtime_profile.get(),
- _runtime_state->load_channel_profile(),
done || !exec_status.ok(),
_query_ctx->coord_addr,
_query_id,
@@ -1846,6 +1844,11 @@ PipelineFragmentContext::collect_realtime_profile()
const {
return res;
}
+ // Make sure first profile is fragment level profile
+ auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
+ _fragment_level_profile->to_thrift(fragment_profile.get());
+ res.push_back(fragment_profile);
+
// pipeline_id_to_profile is initialized in prepare stage
for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f95eb03fb12..c221d076455 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -209,7 +209,7 @@ private:
// When submit fail, `_total_tasks` is equal to the number of tasks
submitted.
std::atomic<int> _total_tasks = 0;
- std::unique_ptr<RuntimeProfile> _runtime_profile;
+ std::unique_ptr<RuntimeProfile> _fragment_level_profile;
bool _is_report_success = false;
std::unique_ptr<RuntimeState> _runtime_state;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 53ff4fea2ca..7a4687b50d1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -322,57 +322,18 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
params.__set_status(exec_status.to_thrift());
params.__set_done(req.done);
params.__set_query_type(req.runtime_state->query_type());
+ params.__isset.profile = false;
DCHECK(req.runtime_state != nullptr);
if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
- }
- params.__isset.detailed_report = true;
- DCHECK(!req.runtime_states.empty());
- const bool enable_profile =
(*req.runtime_states.begin())->enable_profile();
- if (enable_profile) {
- params.__isset.profile = true;
- params.__isset.loadChannelProfile = false;
- for (auto* rs : req.runtime_states) {
- DCHECK(req.load_channel_profile);
- TDetailedReportParams detailed_param;
-
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
- // merge all runtime_states.loadChannelProfile to
req.load_channel_profile
-
req.load_channel_profile->update(detailed_param.loadChannelProfile);
- }
- req.load_channel_profile->to_thrift(¶ms.loadChannelProfile);
} else {
- params.__isset.profile = false;
- }
-
- if (enable_profile) {
- DCHECK(req.profile != nullptr);
- TDetailedReportParams detailed_param;
- detailed_param.__isset.fragment_instance_id = false;
- detailed_param.__isset.profile = true;
- detailed_param.__isset.loadChannelProfile = false;
- detailed_param.__set_is_fragment_level(true);
- req.profile->to_thrift(&detailed_param.profile);
- params.detailed_report.push_back(detailed_param);
- for (auto pipeline_profile :
req.runtime_state->pipeline_id_to_profile()) {
- TDetailedReportParams detailed_param;
- detailed_param.__isset.fragment_instance_id = false;
- detailed_param.__isset.profile = true;
- detailed_param.__isset.loadChannelProfile = false;
- pipeline_profile->to_thrift(&detailed_param.profile);
- params.detailed_report.push_back(std::move(detailed_param));
- }
- }
- if (!req.runtime_state->output_files().empty()) {
- params.__isset.delta_urls = true;
- for (auto& it : req.runtime_state->output_files()) {
- params.delta_urls.push_back(to_http_path(it));
- }
- } else if (!req.runtime_states.empty()) {
- for (auto* rs : req.runtime_states) {
- for (auto& it : rs->output_files()) {
+ DCHECK(!req.runtime_states.empty());
+ if (!req.runtime_state->output_files().empty()) {
+ params.__isset.delta_urls = true;
+ for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f5ca19add..055a78471e3 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -368,9 +368,6 @@ void QueryContext::add_fragment_profile(
void QueryContext::_report_query_profile() {
std::lock_guard<std::mutex> lg(_profile_mutex);
- LOG_INFO(
- "Pipeline x query context, register query profile, query {},
fragment profile count {}",
- print_id(_query_id), _profile_map.size());
for (auto& [fragment_id, fragment_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7a6d6d3c53d..d1d78573923 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -50,8 +50,6 @@ class PipelineFragmentContext;
struct ReportStatusRequest {
const Status status;
std::vector<RuntimeState*> runtime_states;
- RuntimeProfile* profile = nullptr;
- RuntimeProfile* load_channel_profile = nullptr;
bool done;
TNetworkAddress coord_addr;
TUniqueId query_id;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 77fd80cd528..75dd4ed0321 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -117,7 +117,7 @@ TReportExecStatusParams
RuntimeQueryStatisticsMgr::create_report_exec_status_par
int32_t fragment_id = entry.first;
const std::vector<std::shared_ptr<TRuntimeProfileTree>>&
fragment_profile = entry.second;
std::vector<TDetailedReportParams> detailed_params;
-
+ bool is_first = true;
for (auto pipeline_profile : fragment_profile) {
if (pipeline_profile == nullptr) {
auto msg = fmt::format("Register fragment profile {} {}
failed, profile is null",
@@ -129,6 +129,9 @@ TReportExecStatusParams
RuntimeQueryStatisticsMgr::create_report_exec_status_par
TDetailedReportParams tmp;
THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile);
+ // First profile is fragment level
+ tmp.__set_is_fragment_level(is_first);
+ is_first = false;
// tmp.fragment_instance_id is not needed for pipeline x
detailed_params.push_back(std::move(tmp));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 7828a38e6eb..d2300cd667d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -17,17 +17,14 @@
package org.apache.doris.common.profile;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryProfile;
-import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TRuntimeProfileTree;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
@@ -235,8 +232,6 @@ public class ExecutionProfile {
List<TDetailedReportParams> fragmentProfile = entry.getValue();
int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
- // The naming rule must be same with the one in
updateProfile(TReportExecStatusParams params)
- // Because we relay on the name of RuntimeProfile to eliminate the
duplicate profile
String suffix = " (host=" + backendHBAddress + ")";
for (TDetailedReportParams pipelineProfile : fragmentProfile) {
String name = "";
@@ -246,6 +241,7 @@ public class ExecutionProfile {
name = "Pipeline :" + pipelineIdx + " " + suffix;
pipelineIdx++;
}
+
RuntimeProfile profileNode = new RuntimeProfile(name);
// The taskprofile is used to save the profile of the
pipeline, without
// considering the FragmentLevel.
@@ -273,54 +269,6 @@ public class ExecutionProfile {
return new Status(TStatusCode.OK, "Success");
}
- public void updateProfile(TReportExecStatusParams params) {
- Backend backend = null;
- if (params.isSetBackendId()) {
- backend =
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
- if (backend == null) {
- LOG.warn("could not find backend with id {}",
params.getBackendId());
- return;
- }
- } else {
- LOG.warn("backend id is not set in report profile request, bad
message");
- return;
- }
-
- int pipelineIdx = 0;
- List<RuntimeProfile> taskProfile = Lists.newArrayList();
- String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
- // Each datailed report params is a fragment level profile or a
pipeline profile
- for (TDetailedReportParams param : params.detailed_report) {
- String name = "";
- if (param.isSetIsFragmentLevel() && param.is_fragment_level) {
- name = "Fragment Level Profile: " + suffix;
- } else {
- name = "Pipeline :" + pipelineIdx + " " + suffix;
- pipelineIdx++;
- }
- RuntimeProfile profile = new RuntimeProfile(name);
- // The taskprofile is used to save the profile of the pipeline,
without
- // considering the FragmentLevel.
- if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
- taskProfile.add(profile);
- }
- if (param.isSetProfile()) {
- profile.update(param.profile);
- }
- if (params.done) {
- profile.setIsDone(true);
- }
- profile.sortChildren();
- fragmentProfiles.get(params.fragment_id).addChild(profile);
- }
- // TODO ygl: is this right? there maybe multi Backends, what does
- // update load profile do???
- if (params.isSetLoadChannelProfile()) {
- loadChannelProfile.update(params.loadChannelProfile);
- }
- setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(),
taskProfile);
- }
-
public synchronized void addFragmentBackend(PlanFragmentId fragmentId,
Long backendId) {
fragmentIdBeNum.put(fragmentId.asInt(),
fragmentIdBeNum.get(fragmentId.asInt()) + 1);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 7d501fb5c11..667d15de167 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -244,25 +244,6 @@ public final class QeProcessorImpl implements QeProcessor {
}
}
- if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
- LOG.info("Reporting profile, query_id={}, fragment {} backend num:
{}, ip: {}",
- DebugUtil.printId(params.query_id),
params.getFragmentId(), params.backend_num, beAddr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("params: {}", params);
- }
- ExecutionProfile executionProfile =
ProfileManager.getInstance().getExecutionProfile(params.query_id);
- if (executionProfile != null) {
- // Update profile may cost a lot of time, use a seperate pool
to deal with it.
- writeProfileExecutor.submit(new Runnable() {
- @Override
- public void run() {
- executionProfile.updateProfile(params);
- }
- });
- } else {
- LOG.info("Could not find execution profile with query id {}",
DebugUtil.printId(params.query_id));
- }
- }
final TReportExecStatusResult result = new TReportExecStatusResult();
if (params.isSetReportWorkloadRuntimeStatus()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]