This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c89346972 [refactor](profile report) Remove old profile reporting. 
(#40185)
25c89346972 is described below

commit 25c89346972e61201bc9dde60776b9cdb4d84145
Author: zhiqiang <[email protected]>
AuthorDate: Mon Sep 23 08:51:56 2024 +0800

    [refactor](profile report) Remove old profile reporting. (#40185)
    
    1. remove the old profile report
    2. Add FragmentLevelProfile in new profile report.
---
 be/src/pipeline/pipeline_fragment_context.cpp      | 23 +++++----
 be/src/pipeline/pipeline_fragment_context.h        |  2 +-
 be/src/runtime/fragment_mgr.cpp                    | 49 ++------------------
 be/src/runtime/query_context.cpp                   |  3 --
 be/src/runtime/query_context.h                     |  2 -
 be/src/runtime/runtime_query_statistics_mgr.cpp    |  5 +-
 .../doris/common/profile/ExecutionProfile.java     | 54 +---------------------
 .../java/org/apache/doris/qe/QeProcessorImpl.java  | 19 --------
 8 files changed, 24 insertions(+), 133 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 8fb750b9e97..e3d7f56d8f3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -242,14 +242,14 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
         _timeout = request.query_options.execution_timeout;
     }
 
-    _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
-    _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
+    _fragment_level_profile = 
std::make_unique<RuntimeProfile>("PipelineContext");
+    _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime");
     SCOPED_TIMER(_prepare_timer);
-    _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
-    _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
-    _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile, 
"PlanLocalShuffleTime");
-    _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
-    _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, 
"PrepareAllPipelinesTime");
+    _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, 
"BuildPipelinesTime");
+    _init_context_timer = ADD_TIMER(_fragment_level_profile, 
"InitContextTime");
+    _plan_local_shuffle_timer = ADD_TIMER(_fragment_level_profile, 
"PlanLocalShuffleTime");
+    _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime");
+    _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, 
"PrepareAllPipelinesTime");
     {
         SCOPED_TIMER(_init_context_timer);
         _num_instances = request.local_params.size();
@@ -1727,7 +1727,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
         return;
     }
     Defer defer_op {[&]() { _is_fragment_instance_closed = true; }};
-    
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
+    
_fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     static_cast<void>(send_report(true));
     // Print profile content in info log is a tempoeray solution for stream 
load and external_connector.
     // Since stream load does not have someting like coordinator on FE, so
@@ -1803,8 +1803,6 @@ Status PipelineFragmentContext::send_report(bool done) {
 
     ReportStatusRequest req {exec_status,
                              runtime_states,
-                             _runtime_profile.get(),
-                             _runtime_state->load_channel_profile(),
                              done || !exec_status.ok(),
                              _query_ctx->coord_addr,
                              _query_id,
@@ -1846,6 +1844,11 @@ PipelineFragmentContext::collect_realtime_profile() 
const {
         return res;
     }
 
+    // Make sure first profile is fragment level profile
+    auto fragment_profile = std::make_shared<TRuntimeProfileTree>();
+    _fragment_level_profile->to_thrift(fragment_profile.get());
+    res.push_back(fragment_profile);
+
     // pipeline_id_to_profile is initialized in prepare stage
     for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) {
         auto profile_ptr = std::make_shared<TRuntimeProfileTree>();
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f95eb03fb12..c221d076455 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -209,7 +209,7 @@ private:
     // When submit fail, `_total_tasks` is equal to the number of tasks 
submitted.
     std::atomic<int> _total_tasks = 0;
 
-    std::unique_ptr<RuntimeProfile> _runtime_profile;
+    std::unique_ptr<RuntimeProfile> _fragment_level_profile;
     bool _is_report_success = false;
 
     std::unique_ptr<RuntimeState> _runtime_state;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 53ff4fea2ca..7a4687b50d1 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -322,57 +322,18 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     params.__set_status(exec_status.to_thrift());
     params.__set_done(req.done);
     params.__set_query_type(req.runtime_state->query_type());
+    params.__isset.profile = false;
 
     DCHECK(req.runtime_state != nullptr);
 
     if (req.runtime_state->query_type() == TQueryType::LOAD) {
         params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
         params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
-    }
-    params.__isset.detailed_report = true;
-    DCHECK(!req.runtime_states.empty());
-    const bool enable_profile = 
(*req.runtime_states.begin())->enable_profile();
-    if (enable_profile) {
-        params.__isset.profile = true;
-        params.__isset.loadChannelProfile = false;
-        for (auto* rs : req.runtime_states) {
-            DCHECK(req.load_channel_profile);
-            TDetailedReportParams detailed_param;
-            
rs->load_channel_profile()->to_thrift(&detailed_param.loadChannelProfile);
-            // merge all runtime_states.loadChannelProfile to 
req.load_channel_profile
-            
req.load_channel_profile->update(detailed_param.loadChannelProfile);
-        }
-        req.load_channel_profile->to_thrift(&params.loadChannelProfile);
     } else {
-        params.__isset.profile = false;
-    }
-
-    if (enable_profile) {
-        DCHECK(req.profile != nullptr);
-        TDetailedReportParams detailed_param;
-        detailed_param.__isset.fragment_instance_id = false;
-        detailed_param.__isset.profile = true;
-        detailed_param.__isset.loadChannelProfile = false;
-        detailed_param.__set_is_fragment_level(true);
-        req.profile->to_thrift(&detailed_param.profile);
-        params.detailed_report.push_back(detailed_param);
-        for (auto pipeline_profile : 
req.runtime_state->pipeline_id_to_profile()) {
-            TDetailedReportParams detailed_param;
-            detailed_param.__isset.fragment_instance_id = false;
-            detailed_param.__isset.profile = true;
-            detailed_param.__isset.loadChannelProfile = false;
-            pipeline_profile->to_thrift(&detailed_param.profile);
-            params.detailed_report.push_back(std::move(detailed_param));
-        }
-    }
-    if (!req.runtime_state->output_files().empty()) {
-        params.__isset.delta_urls = true;
-        for (auto& it : req.runtime_state->output_files()) {
-            params.delta_urls.push_back(to_http_path(it));
-        }
-    } else if (!req.runtime_states.empty()) {
-        for (auto* rs : req.runtime_states) {
-            for (auto& it : rs->output_files()) {
+        DCHECK(!req.runtime_states.empty());
+        if (!req.runtime_state->output_files().empty()) {
+            params.__isset.delta_urls = true;
+            for (auto& it : req.runtime_state->output_files()) {
                 params.delta_urls.push_back(to_http_path(it));
             }
         }
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 10f5ca19add..055a78471e3 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -368,9 +368,6 @@ void QueryContext::add_fragment_profile(
 
 void QueryContext::_report_query_profile() {
     std::lock_guard<std::mutex> lg(_profile_mutex);
-    LOG_INFO(
-            "Pipeline x query context, register query profile, query {}, 
fragment profile count {}",
-            print_id(_query_id), _profile_map.size());
 
     for (auto& [fragment_id, fragment_profile] : _profile_map) {
         std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 7a6d6d3c53d..d1d78573923 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -50,8 +50,6 @@ class PipelineFragmentContext;
 struct ReportStatusRequest {
     const Status status;
     std::vector<RuntimeState*> runtime_states;
-    RuntimeProfile* profile = nullptr;
-    RuntimeProfile* load_channel_profile = nullptr;
     bool done;
     TNetworkAddress coord_addr;
     TUniqueId query_id;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 77fd80cd528..75dd4ed0321 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -117,7 +117,7 @@ TReportExecStatusParams 
RuntimeQueryStatisticsMgr::create_report_exec_status_par
         int32_t fragment_id = entry.first;
         const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
fragment_profile = entry.second;
         std::vector<TDetailedReportParams> detailed_params;
-
+        bool is_first = true;
         for (auto pipeline_profile : fragment_profile) {
             if (pipeline_profile == nullptr) {
                 auto msg = fmt::format("Register fragment profile {} {} 
failed, profile is null",
@@ -129,6 +129,9 @@ TReportExecStatusParams 
RuntimeQueryStatisticsMgr::create_report_exec_status_par
 
             TDetailedReportParams tmp;
             THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile);
+            // First profile is fragment level
+            tmp.__set_is_fragment_level(is_first);
+            is_first = false;
             // tmp.fragment_instance_id is not needed for pipeline x
             detailed_params.push_back(std::move(tmp));
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 7828a38e6eb..d2300cd667d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -17,17 +17,14 @@
 
 package org.apache.doris.common.profile;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TQueryProfile;
-import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TRuntimeProfileTree;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
@@ -235,8 +232,6 @@ public class ExecutionProfile {
             List<TDetailedReportParams> fragmentProfile = entry.getValue();
             int pipelineIdx = 0;
             List<RuntimeProfile> taskProfile = Lists.newArrayList();
-            // The naming rule must be same with the one in 
updateProfile(TReportExecStatusParams params)
-            // Because we relay on the name of RuntimeProfile to eliminate the 
duplicate profile
             String suffix = " (host=" + backendHBAddress + ")";
             for (TDetailedReportParams pipelineProfile : fragmentProfile) {
                 String name = "";
@@ -246,6 +241,7 @@ public class ExecutionProfile {
                     name = "Pipeline :" + pipelineIdx + " " + suffix;
                     pipelineIdx++;
                 }
+
                 RuntimeProfile profileNode = new RuntimeProfile(name);
                 // The taskprofile is used to save the profile of the 
pipeline, without
                 // considering the FragmentLevel.
@@ -273,54 +269,6 @@ public class ExecutionProfile {
         return new Status(TStatusCode.OK, "Success");
     }
 
-    public void updateProfile(TReportExecStatusParams params) {
-        Backend backend  = null;
-        if (params.isSetBackendId()) {
-            backend = 
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
-            if (backend == null) {
-                LOG.warn("could not find backend with id {}", 
params.getBackendId());
-                return;
-            }
-        } else {
-            LOG.warn("backend id is not set in report profile request, bad 
message");
-            return;
-        }
-
-        int pipelineIdx = 0;
-        List<RuntimeProfile> taskProfile = Lists.newArrayList();
-        String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
-        // Each datailed report params is a fragment level profile or a 
pipeline profile
-        for (TDetailedReportParams param : params.detailed_report) {
-            String name = "";
-            if (param.isSetIsFragmentLevel() && param.is_fragment_level) {
-                name = "Fragment Level Profile: " + suffix;
-            } else {
-                name = "Pipeline :" + pipelineIdx + " " + suffix;
-                pipelineIdx++;
-            }
-            RuntimeProfile profile = new RuntimeProfile(name);
-            // The taskprofile is used to save the profile of the pipeline, 
without
-            // considering the FragmentLevel.
-            if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
-                taskProfile.add(profile);
-            }
-            if (param.isSetProfile()) {
-                profile.update(param.profile);
-            }
-            if (params.done) {
-                profile.setIsDone(true);
-            }
-            profile.sortChildren();
-            fragmentProfiles.get(params.fragment_id).addChild(profile);
-        }
-        // TODO ygl: is this right? there maybe multi Backends, what does
-        // update load profile do???
-        if (params.isSetLoadChannelProfile()) {
-            loadChannelProfile.update(params.loadChannelProfile);
-        }
-        setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(), 
taskProfile);
-    }
-
     public synchronized void addFragmentBackend(PlanFragmentId fragmentId, 
Long backendId) {
         fragmentIdBeNum.put(fragmentId.asInt(), 
fragmentIdBeNum.get(fragmentId.asInt()) + 1);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 7d501fb5c11..667d15de167 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -244,25 +244,6 @@ public final class QeProcessorImpl implements QeProcessor {
             }
         }
 
-        if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
-            LOG.info("Reporting profile, query_id={}, fragment {} backend num: 
{}, ip: {}",
-                    DebugUtil.printId(params.query_id), 
params.getFragmentId(), params.backend_num, beAddr);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("params: {}", params);
-            }
-            ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(params.query_id);
-            if (executionProfile != null) {
-                // Update profile may cost a lot of time, use a seperate pool 
to deal with it.
-                writeProfileExecutor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        executionProfile.updateProfile(params);
-                    }
-                });
-            } else {
-                LOG.info("Could not find execution profile with query id {}", 
DebugUtil.printId(params.query_id));
-            }
-        }
         final TReportExecStatusResult result = new TReportExecStatusResult();
 
         if (params.isSetReportWorkloadRuntimeStatus()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to