This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0-preview by this
push:
new 433a8d610c4 [branch-4.0] Cherry-pick pipeline tracing part 2 and fix
(#34563)
433a8d610c4 is described below
commit 433a8d610c42f5810086ece7e12bd3a26de5a84b
Author: zclllyybb <[email protected]>
AuthorDate: Thu May 9 09:35:27 2024 +0800
[branch-4.0] Cherry-pick pipeline tracing part 2 and fix (#34563)
---
be/src/pipeline/pipeline_tracing.cpp | 117 ++++++++++++++++---------------
be/src/pipeline/pipeline_tracing.h | 14 ++--
be/src/pipeline/task_scheduler.cpp | 3 +-
tools/pipeline-tracing/README.md | 36 ++++++++++
tools/pipeline-tracing/origin-to-show.py | 85 ++++++++++++++++++++++
5 files changed, 191 insertions(+), 64 deletions(-)
diff --git a/be/src/pipeline/pipeline_tracing.cpp
b/be/src/pipeline/pipeline_tracing.cpp
index efe9667f2b5..047e3c3a01d 100644
--- a/be/src/pipeline/pipeline_tracing.cpp
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -19,6 +19,7 @@
#include <absl/time/clock.h>
#include <fcntl.h>
+#include <sys/stat.h>
#include <boost/algorithm/string/predicate.hpp>
#include <chrono>
@@ -41,6 +42,7 @@ void PipelineTracerContext::record(ScheduleRecord record) {
if (_datas.contains(record.query_id)) {
_datas[record.query_id].enqueue(record);
} else {
+ // dump per timeslice may cause this. lead perv records broken. that's
acceptable
std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
_datas[record.query_id].enqueue(record);
}
@@ -52,12 +54,12 @@ void PipelineTracerContext::end_query(TUniqueId query_id,
uint64_t workload_grou
_id_to_workload_group[query_id] = workload_group;
}
if (_dump_type == RecordType::PerQuery) {
- _dump(query_id);
+ _dump_query(query_id);
} else if (_dump_type == RecordType::Periodic) {
auto now = MonotonicSeconds();
auto interval = now - _last_dump_time;
if (interval > _dump_interval_s) {
- _dump(query_id);
+ _dump_timeslice();
}
}
}
@@ -90,75 +92,76 @@ Status PipelineTracerContext::change_record_params(
"No qualified param in changing tracing record
method");
}
-void PipelineTracerContext::_dump(TUniqueId query_id) {
- if (_dump_type == RecordType::None) {
- return;
- }
-
- std::filesystem::path log_dir = fmt::format("{}/pipe_tracing",
getenv("LOG_DIR"));
+void PipelineTracerContext::_dump_query(TUniqueId query_id) {
//TODO: when dump, now could append records but can't add new query. try
use better grained locks.
std::unique_lock<std::mutex> l(_data_lock); // can't rehash
- if (_dump_type == RecordType::PerQuery) {
- auto path = log_dir / fmt::format("query{}", to_string(query_id));
- int fd = ::open(
- path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
- S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP |
S_IWOTH | S_IROTH);
- if (fd < 0) [[unlikely]] {
- throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
- "create tracing log file {} failed", path.c_str()));
+ auto path = _log_dir / fmt::format("query{}", to_string(query_id));
+ int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+ S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP
| S_IWOTH | S_IROTH);
+ if (fd < 0) [[unlikely]] {
+ throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+ "create tracing log file {} failed", path.c_str()));
+ }
+ auto writer = io::LocalFileWriter {path, fd};
+
+ ScheduleRecord record;
+ while (_datas[query_id].try_dequeue(record)) {
+ uint64_t v = 0;
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ v = _id_to_workload_group.at(query_id);
}
- auto writer = io::LocalFileWriter {path, fd};
+ auto tmp_str = record.to_string(v);
+ auto text = Slice {tmp_str};
+ THROW_IF_ERROR(writer.appendv(&text, 1));
+ }
+
+ THROW_IF_ERROR(writer.finalize());
+ THROW_IF_ERROR(writer.close());
+
+ _last_dump_time = MonotonicSeconds();
+
+ _datas.erase(query_id);
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ _id_to_workload_group.erase(query_id);
+ }
+}
+
+void PipelineTracerContext::_dump_timeslice() {
+ std::unique_lock<std::mutex> l(_data_lock); // can't rehash
+ //TODO: if long time, per timeslice per file
+ auto path = _log_dir /
+ fmt::format("until{}",
std::chrono::steady_clock::now().time_since_epoch().count());
+ int fd = ::open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+ S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP
| S_IWOTH | S_IROTH);
+ if (fd < 0) [[unlikely]] {
+ throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+ "create tracing log file {} failed", path.c_str()));
+ }
+ auto writer = io::LocalFileWriter {path, fd};
+
+ // dump all query traces in this time window to one file.
+ for (auto& [query_id, trace] : _datas) {
ScheduleRecord record;
- while (_datas[query_id].try_dequeue(record)) {
+ while (trace.try_dequeue(record)) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
- v = _id_to_workload_group[query_id];
+ v = _id_to_workload_group.at(query_id);
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
THROW_IF_ERROR(writer.appendv(&text, 1));
}
-
- THROW_IF_ERROR(writer.finalize());
- THROW_IF_ERROR(writer.close());
- } else if (_dump_type == RecordType::Periodic) {
- auto path =
- log_dir /
- fmt::format("until{}",
std::chrono::steady_clock::now().time_since_epoch().count());
- int fd = ::open(
- path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
- S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP |
S_IWOTH | S_IROTH);
- if (fd < 0) [[unlikely]] {
- throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
- "create tracing log file {} failed", path.c_str()));
- }
- auto writer = io::LocalFileWriter {path, fd};
-
- for (auto& [id, trace] : _datas) {
- ScheduleRecord record;
- while (trace.try_dequeue(record)) {
- uint64_t v = 0;
- {
- std::unique_lock<std::mutex> l(_tg_lock);
- v = _id_to_workload_group[query_id];
- }
- auto tmp_str = record.to_string(v);
- auto text = Slice {tmp_str};
- THROW_IF_ERROR(writer.appendv(&text, 1));
- }
- }
- THROW_IF_ERROR(writer.finalize());
- THROW_IF_ERROR(writer.close());
-
- _last_dump_time = MonotonicSeconds();
}
+ THROW_IF_ERROR(writer.finalize());
+ THROW_IF_ERROR(writer.close());
- _datas.erase(query_id);
- {
- std::unique_lock<std::mutex> l(_tg_lock);
- _id_to_workload_group.erase(query_id);
- }
+ _last_dump_time = MonotonicSeconds();
+
+ _datas.clear();
+ _id_to_workload_group.clear();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h
b/be/src/pipeline/pipeline_tracing.h
index e4e4d4e4e7d..27dda6b85f1 100644
--- a/be/src/pipeline/pipeline_tracing.h
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -18,7 +18,7 @@
#pragma once
#include <concurrentqueue.h>
-#include <fmt/core.h>
+#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <parallel_hashmap/phmap.h>
@@ -51,8 +51,7 @@ struct ScheduleRecord {
// all tracing datas of ONE specific query
using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
-// belongs to exec_env, for all query, if enable
-// curl http://{host}:{web_server_port}/api/running_pipeline_tasks
+// belongs to exec_env, for all query, if enabled
class PipelineTracerContext {
public:
enum class RecordType {
@@ -68,12 +67,17 @@ public:
bool enabled() const { return !(_dump_type == RecordType::None); }
private:
- void _dump(TUniqueId query_id); // dump data to disk. one query or all.
+ // dump data to disk. one query or all.
+ void _dump_query(TUniqueId query_id);
+ void _dump_timeslice();
+
+ std::filesystem::path _log_dir = fmt::format("{}/pipe_tracing",
getenv("LOG_DIR"));
std::mutex _data_lock; // lock for map, not map items.
phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
std::mutex _tg_lock; //TODO: use an lockfree DS
- phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_workload_group;
+ phmap::flat_hash_map<TUniqueId, uint64_t>
+ _id_to_workload_group; // save query's workload group number
RecordType _dump_type = RecordType::None;
decltype(MonotonicSeconds()) _last_dump_time;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 8981a7e621c..f858bbda1d3 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -334,8 +334,7 @@ void TaskScheduler::_do_work(size_t index) {
uint64_t end_time = MonotonicMicros();
auto state = task->get_state();
- std::string state_name =
- state == PipelineTaskState::RUNNABLE ?
get_state_name(state) : "";
+ std::string state_name = get_state_name(state);
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time,
end_time,
state_name});
diff --git a/tools/pipeline-tracing/README.md b/tools/pipeline-tracing/README.md
new file mode 100644
index 00000000000..f047d61a452
--- /dev/null
+++ b/tools/pipeline-tracing/README.md
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Pipeline Tracing and Display Tool
+
+In the Pipeline execution engine, we split the execution plan tree of each
Instance into multiple small Pipeline Tasks and execute them under our custom
Pipeline scheduler. Therefore, in an environment with a large number of
Pipeline Tasks executing, how these Tasks are scheduled across threads and CPU
cores is an important factor for execution performance. We have developed a
specialised tool to observe the scheduling process on a particular query or
time period, which we call "Pipeline [...]
+
+This tool converts record files to proper JSON format for visualization.
+
+## How to Use
+
+```shell
+python3 origin-to-show.py -s <SOURCE_FILE> -d <DEST>.json
+```
+to transfer record file `<SOURCE_FILE>` to `<DEST>.json`. Then it could be
visualized.
+
+```shell
+python3 origin-to-show.py --help
+```
+for help details.
\ No newline at end of file
diff --git a/tools/pipeline-tracing/origin-to-show.py
b/tools/pipeline-tracing/origin-to-show.py
new file mode 100644
index 00000000000..cbe68545e36
--- /dev/null
+++ b/tools/pipeline-tracing/origin-to-show.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import sys
+from typing import List
+import json
+
+class Record:
+ def __init__(self, query_id, task_name, core_id, thread_id, start_time,
end_time, state_name, group_id) -> None:
+ self.query_id : str = query_id
+ self.task_name : str = task_name
+ self.core_id : int = int(core_id)
+ self.thread_id : int = int(thread_id)
+ self.start_time : int = int(start_time)
+ self.end_time : int = int(end_time)
+ self.state_name : str = state_name
+ self.group_id : int = int(group_id)
+
+ def print(self) :
+ print(f"query_id = {self.query_id}, task_name = {self.task_name},
start_time={self.start_time}")
+
+ def get_core(self) :
+ return 1 if same_core else self.core_id
+
+ def to_json(self) :
+ json = {"name": self.task_name, "cat": self.state_name, "ph": "X",
"ts": self.start_time, "dur": self.end_time - self.start_time,
+ "pid": self.get_core(), "tid": self.thread_id}
+ return json
+
+def get_key(record : Record) -> int:
+ return record.start_time
+
+def print_header(file):
+ print(r'{"traceEvents":[', file=file)
+
+def print_footer(file):
+ print(r"]}", file=file)
+
+parser = argparse.ArgumentParser(description='Accept file to analyse. Use
parameters to sepecific how to illustrate it.')
+parser.add_argument('-s', '--source', help='SOURCE as the path of tracing
record file to analyze')
+parser.add_argument('-d', '--dest', help='DEST as the path of json result file
you want to save')
+parser.add_argument('-n', '--no-core', action='store_true', help='combine the
thread in one core group to display')
+args = parser.parse_args()
+
+records : List[Record] = []
+same_core : bool = args.no_core
+
+### get records
+try:
+ with open(args.source, "r") as file:
+ for line in file :
+ record = Record(*line.strip().split('|'))
+ records.append(record)
+except FileNotFoundError:
+ sys.exit(f"File '{args.source}' doesn't exist. Please check the path.")
+except Exception as e:
+ sys.exit(f"Error occured! {e}")
+
+records.sort(key=get_key)
+
+### print json
+try:
+ with open(args.dest, "w") as file: # overwrite file
+ print_header(file)
+ for record in records:
+ print(json.dumps(record.to_json(), sort_keys=True, indent=4,
separators=(',', ':')), end=',\n', file=file)
+ print_footer(file)
+ print(f"Generate json to {args.dest} succeed!")
+except Exception as e:
+ print(f"Error occured! {e}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]