This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b0ead71e33 [pipeline](tracing) Fix pipeline tracing tools (#35595)
2b0ead71e33 is described below

commit 2b0ead71e3347593458adfdec3185276083a1476
Author: Gabriel <[email protected]>
AuthorDate: Thu May 30 09:08:17 2024 +0800

    [pipeline](tracing) Fix pipeline tracing tools (#35595)
---
 be/src/pipeline/pipeline.cpp                  |  9 +++++
 be/src/pipeline/pipeline_fragment_context.cpp |  6 ----
 be/src/pipeline/pipeline_tracing.cpp          | 48 ++++++++++++++++++---------
 be/src/pipeline/pipeline_tracing.h            | 16 +++++++--
 build.sh                                      |  2 +-
 tools/pipeline-tracing/origin-to-show.py      | 47 ++++++++++++++------------
 6 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6ea81b90eeb..4b16552531c 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -44,6 +44,15 @@ Status Pipeline::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(operatorXs.back()->open(state));
     RETURN_IF_ERROR(_sink_x->prepare(state));
     RETURN_IF_ERROR(_sink_x->open(state));
+    _name.append(std::to_string(id()));
+    _name.push_back('-');
+    for (auto& op : operatorXs) {
+        _name.append(std::to_string(op->node_id()));
+        _name.append(op->get_name());
+    }
+    _name.push_back('-');
+    _name.append(std::to_string(_sink_x->node_id()));
+    _name.append(_sink_x->get_name());
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 961d71c11e8..dbfdaba6d91 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -611,8 +611,6 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool,
 
     int node_idx = 0;
 
-    cur_pipe->_name.append(std::to_string(cur_pipe->id()));
-
     RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, 
request, descs, nullptr,
                                         &node_idx, root, cur_pipe, 0));
 
@@ -652,10 +650,6 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
         *root = op;
     }
 
-    cur_pipe->_name.push_back('-');
-    cur_pipe->_name.append(std::to_string(op->node_id()));
-    cur_pipe->_name.append(op->get_name());
-
     // rely on that tnodes is preorder of the plan
     for (int i = 0; i < num_children; i++) {
         ++*node_idx;
diff --git a/be/src/pipeline/pipeline_tracing.cpp 
b/be/src/pipeline/pipeline_tracing.cpp
index d9029951682..fb8a53925f6 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -39,12 +39,31 @@ void PipelineTracerContext::record(ScheduleRecord record) {
     if (_dump_type == RecordType::None) [[unlikely]] {
         return;
     }
-    if (_datas.contains(record.query_id)) {
-        _datas[record.query_id].enqueue(record);
+
+    auto map_ptr = std::atomic_load_explicit(&_data, 
std::memory_order_relaxed);
+    auto it = map_ptr->find({record.query_id});
+    if (it != map_ptr->end()) {
+        it->second->enqueue(record);
     } else {
-        // dump per timeslice may cause this. lead perv records broken. that's 
acceptable
-        std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
-        _datas[record.query_id].enqueue(record);
+        _update([&](QueryTracesMap& new_map) {
+            if (!new_map.contains({record.query_id})) {
+                new_map[{record.query_id}].reset(new OneQueryTraces());
+            }
+            new_map[{record.query_id}]->enqueue(record);
+        });
+    }
+}
+
+void PipelineTracerContext::_update(std::function<void(QueryTracesMap&)>&& 
handler) {
+    auto map_ptr = std::atomic_load_explicit(&_data, 
std::memory_order_relaxed);
+    while (true) {
+        auto new_map = std::make_shared<QueryTracesMap>(*map_ptr);
+        handler(*new_map);
+        if (std::atomic_compare_exchange_strong_explicit(&_data, &map_ptr, 
new_map,
+                                                         
std::memory_order_relaxed,
+                                                         
std::memory_order_relaxed)) {
+            break;
+        }
     }
 }
 
@@ -93,8 +112,7 @@ Status PipelineTracerContext::change_record_params(
 }
 
 void PipelineTracerContext::_dump_query(TUniqueId query_id) {
-    //TODO: when dump, now could append records but can't add new query. try 
use better grained locks.
-    std::unique_lock<std::mutex> l(_data_lock); // can't rehash
+    auto map_ptr = std::atomic_load_explicit(&_data, 
std::memory_order_relaxed);
     auto path = _log_dir / fmt::format("query{}", to_string(query_id));
     int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
                     S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP 
| S_IWOTH | S_IROTH);
@@ -105,7 +123,7 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id) 
{
     auto writer = io::LocalFileWriter {path, fd};
 
     ScheduleRecord record;
-    while (_datas[query_id].try_dequeue(record)) {
+    while ((*map_ptr)[QueryID {query_id}]->try_dequeue(record)) {
         uint64_t v = 0;
         {
             std::unique_lock<std::mutex> l(_tg_lock);
@@ -120,7 +138,8 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id) 
{
 
     _last_dump_time = MonotonicSeconds();
 
-    _datas.erase(query_id);
+    _update([&](QueryTracesMap& new_map) { _data->erase(QueryID {query_id}); 
});
+
     {
         std::unique_lock<std::mutex> l(_tg_lock);
         _id_to_workload_group.erase(query_id);
@@ -128,8 +147,8 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id) 
{
 }
 
 void PipelineTracerContext::_dump_timeslice() {
-    std::unique_lock<std::mutex> l(_data_lock); // can't rehash
-
+    auto new_map = std::make_shared<QueryTracesMap>();
+    new_map.swap(_data);
     //TODO: if long time, per timeslice per file
     auto path = _log_dir /
                 fmt::format("until{}", 
std::chrono::steady_clock::now().time_since_epoch().count());
@@ -142,13 +161,13 @@ void PipelineTracerContext::_dump_timeslice() {
     auto writer = io::LocalFileWriter {path, fd};
 
     // dump all query traces in this time window to one file.
-    for (auto& [query_id, trace] : _datas) {
+    for (auto& [query_id, trace] : (*new_map)) {
         ScheduleRecord record;
-        while (trace.try_dequeue(record)) {
+        while (trace->try_dequeue(record)) {
             uint64_t v = 0;
             {
                 std::unique_lock<std::mutex> l(_tg_lock);
-                v = _id_to_workload_group.at(query_id);
+                v = _id_to_workload_group.at(query_id.query_id);
             }
             auto tmp_str = record.to_string(v);
             auto text = Slice {tmp_str};
@@ -159,7 +178,6 @@ void PipelineTracerContext::_dump_timeslice() {
 
     _last_dump_time = MonotonicSeconds();
 
-    _datas.clear();
     _id_to_workload_group.clear();
 }
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h 
b/be/src/pipeline/pipeline_tracing.h
index 5d446e6f3d3..48d8cd548d8 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -47,12 +47,24 @@ struct ScheduleRecord {
     }
 };
 
+struct QueryID {
+    TUniqueId query_id;
+    bool operator<(const QueryID& query_id_) const {
+        return query_id.hi < query_id_.query_id.hi ||
+               (query_id.hi == query_id_.query_id.hi && query_id.lo < 
query_id_.query_id.lo);
+    }
+    bool operator==(const QueryID& query_id_) const { return query_id == 
query_id_.query_id; }
+};
+
 // all tracing datas of ONE specific query
 using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
+using OneQueryTracesSPtr = 
std::shared_ptr<moodycamel::ConcurrentQueue<ScheduleRecord>>;
+using QueryTracesMap = std::map<QueryID, OneQueryTracesSPtr>;
 
 // belongs to exec_env, for all query, if enabled
 class PipelineTracerContext {
 public:
+    PipelineTracerContext() : _data(std::make_shared<QueryTracesMap>()) {}
     enum class RecordType {
         None,     // disable
         PerQuery, // record per query. one query one file.
@@ -69,11 +81,11 @@ private:
     // dump data to disk. one query or all.
     void _dump_query(TUniqueId query_id);
     void _dump_timeslice();
+    void _update(std::function<void(QueryTracesMap&)>&& handler);
 
     std::filesystem::path _log_dir = fmt::format("{}/pipe_tracing", 
getenv("LOG_DIR"));
 
-    std::mutex _data_lock; // lock for map, not map items.
-    phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
+    std::shared_ptr<QueryTracesMap> _data;
     std::mutex _tg_lock; //TODO: use an lockfree DS
     phmap::flat_hash_map<TUniqueId, uint64_t>
             _id_to_workload_group; // save query's workload group number
diff --git a/build.sh b/build.sh
index f209b6273c1..e4fd7d48e9c 100755
--- a/build.sh
+++ b/build.sh
@@ -877,7 +877,7 @@ EOF
     cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/* 
"${DORIS_OUTPUT}/be/www"/
     copy_common_files "${DORIS_OUTPUT}/be/"
     mkdir -p "${DORIS_OUTPUT}/be/log"
-    mkdir -p "${DORIS_OUTPUT}/be/log/tracing"
+    mkdir -p "${DORIS_OUTPUT}/be/log/pipe_tracing"
     mkdir -p "${DORIS_OUTPUT}/be/storage"
     mkdir -p "${DORIS_OUTPUT}/be/connectors"
 fi
diff --git a/tools/pipeline-tracing/origin-to-show.py 
b/tools/pipeline-tracing/origin-to-show.py
index cbe68545e36..155c199f54c 100644
--- a/tools/pipeline-tracing/origin-to-show.py
+++ b/tools/pipeline-tracing/origin-to-show.py
@@ -20,50 +20,55 @@ import sys
 from typing import List
 import json
 
+
 class Record:
-    def __init__(self, query_id, task_name, core_id, thread_id, start_time, 
end_time, state_name, group_id) -> None:
-        self.query_id : str = query_id
-        self.task_name : str = task_name
-        self.core_id : int = int(core_id)
-        self.thread_id : int = int(thread_id)
-        self.start_time : int = int(start_time)
-        self.end_time : int = int(end_time)
-        self.state_name : str = state_name
-        self.group_id : int = int(group_id)
-
-    def print(self) :
+    def __init__(self, query_id, task_name, core_id, thread_id, start_time, 
end_time, group_id) -> None:
+        self.query_id: str = query_id
+        self.task_name: str = task_name
+        self.core_id: int = int(core_id)
+        self.thread_id: int = int(thread_id)
+        self.start_time: int = int(start_time)
+        self.end_time: int = int(end_time)
+        self.group_id: int = int(group_id)
+
+    def print(self):
         print(f"query_id = {self.query_id}, task_name = {self.task_name}, 
start_time={self.start_time}")
 
-    def get_core(self) :
+    def get_core(self):
         return 1 if same_core else self.core_id
 
-    def to_json(self) :
-        json = {"name": self.task_name, "cat": self.state_name, "ph": "X", 
"ts": self.start_time, "dur": self.end_time - self.start_time,
-                "pid": self.get_core(), "tid": self.thread_id}
+    def to_json(self):
+        json = {"name": self.task_name, "ph": "X", "ts": self.start_time, 
"dur": self.end_time - self.start_time,
+                "pid": self.get_core(), "tid": self.thread_id, "args": 
{"tags": self.query_id}}
         return json
 
-def get_key(record : Record) -> int:
+
+def get_key(record: Record) -> int:
     return record.start_time
 
+
 def print_header(file):
     print(r'{"traceEvents":[', file=file)
 
+
 def print_footer(file):
     print(r"]}", file=file)
 
-parser = argparse.ArgumentParser(description='Accept file to analyse. Use 
parameters to sepecific how to illustrate it.')
+
+parser = argparse.ArgumentParser(
+    description='Accept file to analyse. Use parameters to sepecific how to 
illustrate it.')
 parser.add_argument('-s', '--source', help='SOURCE as the path of tracing 
record file to analyze')
 parser.add_argument('-d', '--dest', help='DEST as the path of json result file 
you want to save')
 parser.add_argument('-n', '--no-core', action='store_true', help='combine the 
thread in one core group to display')
 args = parser.parse_args()
 
-records : List[Record] = []
-same_core : bool = args.no_core
+records: List[Record] = []
+same_core: bool = args.no_core
 
 ### get records
 try:
     with open(args.source, "r") as file:
-        for line in file :
+        for line in file:
             record = Record(*line.strip().split('|'))
             records.append(record)
 except FileNotFoundError:
@@ -75,7 +80,7 @@ records.sort(key=get_key)
 
 ### print json
 try:
-    with open(args.dest, "w") as file: # overwrite file
+    with open(args.dest, "w") as file:  # overwrite file
         print_header(file)
         for record in records:
             print(json.dumps(record.to_json(), sort_keys=True, indent=4, 
separators=(',', ':')), end=',\n', file=file)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to