This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2b0ead71e33 [pipeline](tracing) Fix pipeline tracing tools (#35595)
2b0ead71e33 is described below
commit 2b0ead71e3347593458adfdec3185276083a1476
Author: Gabriel <[email protected]>
AuthorDate: Thu May 30 09:08:17 2024 +0800
[pipeline](tracing) Fix pipeline tracing tools (#35595)
---
be/src/pipeline/pipeline.cpp | 9 +++++
be/src/pipeline/pipeline_fragment_context.cpp | 6 ----
be/src/pipeline/pipeline_tracing.cpp | 48 ++++++++++++++++++---------
be/src/pipeline/pipeline_tracing.h | 16 +++++++--
build.sh | 2 +-
tools/pipeline-tracing/origin-to-show.py | 47 ++++++++++++++------------
6 files changed, 83 insertions(+), 45 deletions(-)
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6ea81b90eeb..4b16552531c 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -44,6 +44,15 @@ Status Pipeline::prepare(RuntimeState* state) {
RETURN_IF_ERROR(operatorXs.back()->open(state));
RETURN_IF_ERROR(_sink_x->prepare(state));
RETURN_IF_ERROR(_sink_x->open(state));
+ _name.append(std::to_string(id()));
+ _name.push_back('-');
+ for (auto& op : operatorXs) {
+ _name.append(std::to_string(op->node_id()));
+ _name.append(op->get_name());
+ }
+ _name.push_back('-');
+ _name.append(std::to_string(_sink_x->node_id()));
+ _name.append(_sink_x->get_name());
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 961d71c11e8..dbfdaba6d91 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -611,8 +611,6 @@ Status
PipelineFragmentContext::_build_pipelines(ObjectPool* pool,
int node_idx = 0;
- cur_pipe->_name.append(std::to_string(cur_pipe->id()));
-
RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes,
request, descs, nullptr,
&node_idx, root, cur_pipe, 0));
@@ -652,10 +650,6 @@ Status
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
*root = op;
}
- cur_pipe->_name.push_back('-');
- cur_pipe->_name.append(std::to_string(op->node_id()));
- cur_pipe->_name.append(op->get_name());
-
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
diff --git a/be/src/pipeline/pipeline_tracing.cpp
b/be/src/pipeline/pipeline_tracing.cpp
index d9029951682..fb8a53925f6 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -39,12 +39,31 @@ void PipelineTracerContext::record(ScheduleRecord record) {
if (_dump_type == RecordType::None) [[unlikely]] {
return;
}
- if (_datas.contains(record.query_id)) {
- _datas[record.query_id].enqueue(record);
+
+ auto map_ptr = std::atomic_load_explicit(&_data,
std::memory_order_relaxed);
+ auto it = map_ptr->find({record.query_id});
+ if (it != map_ptr->end()) {
+ it->second->enqueue(record);
} else {
- // dump per timeslice may cause this. lead perv records broken. that's
acceptable
- std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
- _datas[record.query_id].enqueue(record);
+ _update([&](QueryTracesMap& new_map) {
+ if (!new_map.contains({record.query_id})) {
+ new_map[{record.query_id}].reset(new OneQueryTraces());
+ }
+ new_map[{record.query_id}]->enqueue(record);
+ });
+ }
+}
+
+void PipelineTracerContext::_update(std::function<void(QueryTracesMap&)>&&
handler) {
+ auto map_ptr = std::atomic_load_explicit(&_data,
std::memory_order_relaxed);
+ while (true) {
+ auto new_map = std::make_shared<QueryTracesMap>(*map_ptr);
+ handler(*new_map);
+ if (std::atomic_compare_exchange_strong_explicit(&_data, &map_ptr,
new_map,
+
std::memory_order_relaxed,
+
std::memory_order_relaxed)) {
+ break;
+ }
}
}
@@ -93,8 +112,7 @@ Status PipelineTracerContext::change_record_params(
}
void PipelineTracerContext::_dump_query(TUniqueId query_id) {
- //TODO: when dump, now could append records but can't add new query. try
use better grained locks.
- std::unique_lock<std::mutex> l(_data_lock); // can't rehash
+ auto map_ptr = std::atomic_load_explicit(&_data,
std::memory_order_relaxed);
auto path = _log_dir / fmt::format("query{}", to_string(query_id));
int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP
| S_IWOTH | S_IROTH);
@@ -105,7 +123,7 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id)
{
auto writer = io::LocalFileWriter {path, fd};
ScheduleRecord record;
- while (_datas[query_id].try_dequeue(record)) {
+ while ((*map_ptr)[QueryID {query_id}]->try_dequeue(record)) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
@@ -120,7 +138,8 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id)
{
_last_dump_time = MonotonicSeconds();
- _datas.erase(query_id);
+ _update([&](QueryTracesMap& new_map) { _data->erase(QueryID {query_id});
});
+
{
std::unique_lock<std::mutex> l(_tg_lock);
_id_to_workload_group.erase(query_id);
@@ -128,8 +147,8 @@ void PipelineTracerContext::_dump_query(TUniqueId query_id)
{
}
void PipelineTracerContext::_dump_timeslice() {
- std::unique_lock<std::mutex> l(_data_lock); // can't rehash
-
+ auto new_map = std::make_shared<QueryTracesMap>();
+ new_map.swap(_data);
//TODO: if long time, per timeslice per file
auto path = _log_dir /
fmt::format("until{}",
std::chrono::steady_clock::now().time_since_epoch().count());
@@ -142,13 +161,13 @@ void PipelineTracerContext::_dump_timeslice() {
auto writer = io::LocalFileWriter {path, fd};
// dump all query traces in this time window to one file.
- for (auto& [query_id, trace] : _datas) {
+ for (auto& [query_id, trace] : (*new_map)) {
ScheduleRecord record;
- while (trace.try_dequeue(record)) {
+ while (trace->try_dequeue(record)) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
- v = _id_to_workload_group.at(query_id);
+ v = _id_to_workload_group.at(query_id.query_id);
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
@@ -159,7 +178,6 @@ void PipelineTracerContext::_dump_timeslice() {
_last_dump_time = MonotonicSeconds();
- _datas.clear();
_id_to_workload_group.clear();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h
b/be/src/pipeline/pipeline_tracing.h
index 5d446e6f3d3..48d8cd548d8 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -47,12 +47,24 @@ struct ScheduleRecord {
}
};
+struct QueryID {
+ TUniqueId query_id;
+ bool operator<(const QueryID& query_id_) const {
+ return query_id.hi < query_id_.query_id.hi ||
+ (query_id.hi == query_id_.query_id.hi && query_id.lo <
query_id_.query_id.lo);
+ }
+ bool operator==(const QueryID& query_id_) const { return query_id ==
query_id_.query_id; }
+};
+
// all tracing datas of ONE specific query
using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
+using OneQueryTracesSPtr =
std::shared_ptr<moodycamel::ConcurrentQueue<ScheduleRecord>>;
+using QueryTracesMap = std::map<QueryID, OneQueryTracesSPtr>;
// belongs to exec_env, for all query, if enabled
class PipelineTracerContext {
public:
+ PipelineTracerContext() : _data(std::make_shared<QueryTracesMap>()) {}
enum class RecordType {
None, // disable
PerQuery, // record per query. one query one file.
@@ -69,11 +81,11 @@ private:
// dump data to disk. one query or all.
void _dump_query(TUniqueId query_id);
void _dump_timeslice();
+ void _update(std::function<void(QueryTracesMap&)>&& handler);
std::filesystem::path _log_dir = fmt::format("{}/pipe_tracing",
getenv("LOG_DIR"));
- std::mutex _data_lock; // lock for map, not map items.
- phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
+ std::shared_ptr<QueryTracesMap> _data;
std::mutex _tg_lock; //TODO: use an lockfree DS
phmap::flat_hash_map<TUniqueId, uint64_t>
_id_to_workload_group; // save query's workload group number
diff --git a/build.sh b/build.sh
index f209b6273c1..e4fd7d48e9c 100755
--- a/build.sh
+++ b/build.sh
@@ -877,7 +877,7 @@ EOF
cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/*
"${DORIS_OUTPUT}/be/www"/
copy_common_files "${DORIS_OUTPUT}/be/"
mkdir -p "${DORIS_OUTPUT}/be/log"
- mkdir -p "${DORIS_OUTPUT}/be/log/tracing"
+ mkdir -p "${DORIS_OUTPUT}/be/log/pipe_tracing"
mkdir -p "${DORIS_OUTPUT}/be/storage"
mkdir -p "${DORIS_OUTPUT}/be/connectors"
fi
diff --git a/tools/pipeline-tracing/origin-to-show.py
b/tools/pipeline-tracing/origin-to-show.py
index cbe68545e36..155c199f54c 100644
--- a/tools/pipeline-tracing/origin-to-show.py
+++ b/tools/pipeline-tracing/origin-to-show.py
@@ -20,50 +20,55 @@ import sys
from typing import List
import json
+
class Record:
- def __init__(self, query_id, task_name, core_id, thread_id, start_time,
end_time, state_name, group_id) -> None:
- self.query_id : str = query_id
- self.task_name : str = task_name
- self.core_id : int = int(core_id)
- self.thread_id : int = int(thread_id)
- self.start_time : int = int(start_time)
- self.end_time : int = int(end_time)
- self.state_name : str = state_name
- self.group_id : int = int(group_id)
-
- def print(self) :
+ def __init__(self, query_id, task_name, core_id, thread_id, start_time,
end_time, group_id) -> None:
+ self.query_id: str = query_id
+ self.task_name: str = task_name
+ self.core_id: int = int(core_id)
+ self.thread_id: int = int(thread_id)
+ self.start_time: int = int(start_time)
+ self.end_time: int = int(end_time)
+ self.group_id: int = int(group_id)
+
+ def print(self):
print(f"query_id = {self.query_id}, task_name = {self.task_name},
start_time={self.start_time}")
- def get_core(self) :
+ def get_core(self):
return 1 if same_core else self.core_id
- def to_json(self) :
- json = {"name": self.task_name, "cat": self.state_name, "ph": "X",
"ts": self.start_time, "dur": self.end_time - self.start_time,
- "pid": self.get_core(), "tid": self.thread_id}
+ def to_json(self):
+ json = {"name": self.task_name, "ph": "X", "ts": self.start_time,
"dur": self.end_time - self.start_time,
+ "pid": self.get_core(), "tid": self.thread_id, "args":
{"tags": self.query_id}}
return json
-def get_key(record : Record) -> int:
+
+def get_key(record: Record) -> int:
return record.start_time
+
def print_header(file):
print(r'{"traceEvents":[', file=file)
+
def print_footer(file):
print(r"]}", file=file)
-parser = argparse.ArgumentParser(description='Accept file to analyse. Use
parameters to sepecific how to illustrate it.')
+
+parser = argparse.ArgumentParser(
+ description='Accept file to analyse. Use parameters to sepecific how to
illustrate it.')
parser.add_argument('-s', '--source', help='SOURCE as the path of tracing
record file to analyze')
parser.add_argument('-d', '--dest', help='DEST as the path of json result file
you want to save')
parser.add_argument('-n', '--no-core', action='store_true', help='combine the
thread in one core group to display')
args = parser.parse_args()
-records : List[Record] = []
-same_core : bool = args.no_core
+records: List[Record] = []
+same_core: bool = args.no_core
### get records
try:
with open(args.source, "r") as file:
- for line in file :
+ for line in file:
record = Record(*line.strip().split('|'))
records.append(record)
except FileNotFoundError:
@@ -75,7 +80,7 @@ records.sort(key=get_key)
### print json
try:
- with open(args.dest, "w") as file: # overwrite file
+ with open(args.dest, "w") as file: # overwrite file
print_header(file)
for record in records:
print(json.dumps(record.to_json(), sort_keys=True, indent=4,
separators=(',', ':')), end=',\n', file=file)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]