This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9f0d822be01 [branch-2.1](tracing) Pick pipeline tracing and relative
bugfix (#31367)
9f0d822be01 is described below
commit 9f0d822be0106bc4692b782e18a189ca386aee7d
Author: zclllyybb <[email protected]>
AuthorDate: Wed Feb 28 22:35:41 2024 +0800
[branch-2.1](tracing) Pick pipeline tracing and relative bugfix (#31367)
* [Feature](pipeline) Trace pipeline scheduling (part I) (#31027)
* [fix](compile) Fix performance compile fail #31305
* [fix](compile) Fix macOS compilation issues for PURE macro and CPU core
identification (#31357)
* [fix](compile) Correct PURE macro definition to fix compilation on macOS
* 2
---------
Co-authored-by: zy-kkk <[email protected]>
---
be/src/common/compiler_util.h | 2 +
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/exec/data_sink.cpp | 19 ---
be/src/exec/data_sink.h | 3 -
.../action/adjust_tracing_dump.cpp} | 47 ++----
.../action/adjust_tracing_dump.h} | 32 +---
be/src/io/fs/file_system.h | 13 +-
be/src/io/fs/local_file_system.h | 13 +-
be/src/io/fs/local_file_writer.h | 9 +-
be/src/pipeline/pipeline.cpp | 13 +-
be/src/pipeline/pipeline.h | 18 ++-
be/src/pipeline/pipeline_fragment_context.cpp | 32 +---
be/src/pipeline/pipeline_fragment_context.h | 11 +-
be/src/pipeline/pipeline_task.h | 11 +-
be/src/pipeline/pipeline_tracing.cpp | 162 +++++++++++++++++++++
be/src/pipeline/pipeline_tracing.h | 83 +++++++++++
.../pipeline_x/pipeline_x_fragment_context.cpp | 63 ++++----
.../pipeline_x/pipeline_x_fragment_context.h | 2 -
be/src/pipeline/task_scheduler.cpp | 84 +++++++----
be/src/pipeline/task_scheduler.h | 3 -
be/src/runtime/exec_env.h | 25 ++--
be/src/runtime/exec_env_init.cpp | 2 +
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/query_context.cpp | 24 ++-
be/src/runtime/query_context.h | 7 +-
be/src/service/http_service.cpp | 14 +-
be/src/util/thrift_util.cpp | 34 ++++-
be/src/util/thrift_util.h | 20 +--
build.sh | 1 +
30 files changed, 501 insertions(+), 250 deletions(-)
diff --git a/be/src/common/compiler_util.h b/be/src/common/compiler_util.h
index 4b684659ed9..79588cade91 100644
--- a/be/src/common/compiler_util.h
+++ b/be/src/common/compiler_util.h
@@ -49,3 +49,5 @@
#define MAY_ALIAS __attribute__((__may_alias__))
#define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE)))
+
+#define PURE __attribute__((pure))
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 46c98bb763b..b49b95e8e5d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -199,6 +199,7 @@ DEFINE_mInt32(download_low_speed_time, "300");
// log dir
DEFINE_String(sys_log_dir, "${DORIS_HOME}/log");
DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf");
+DEFINE_String(pipeline_tracing_log_dir, "${DORIS_HOME}/log/tracing");
// INFO, WARNING, ERROR, FATAL
DEFINE_mString(sys_log_level, "INFO");
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8878cba0e64..c82d42a9ef4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -247,6 +247,7 @@ DECLARE_mInt32(download_low_speed_time);
// log dir
DECLARE_String(sys_log_dir);
DECLARE_String(user_function_dir);
+DECLARE_String(pipeline_tracing_log_dir);
// INFO, WARNING, ERROR, FATAL
DECLARE_String(sys_log_level);
// TIME-DAY, TIME-HOUR, SIZE-MB-nnn
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 5809912f4a2..5047ae8fc78 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -346,23 +346,4 @@ Status DataSink::init(const TDataSink& thrift_sink) {
Status DataSink::prepare(RuntimeState* state) {
return Status::OK();
}
-
-bool DataSink::_has_inverted_index_or_partial_update(TOlapTableSink sink) {
- OlapTableSchemaParam schema;
- if (!schema.init(sink.schema).ok()) {
- return false;
- }
- if (schema.is_partial_update()) {
- return true;
- }
- for (const auto& index_schema : schema.indexes()) {
- for (const auto& index : index_schema->indexes) {
- if (index->index_type() == INVERTED) {
- return true;
- }
- }
- }
- return false;
-}
-
} // namespace doris
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index e08d40ab023..5258929ba79 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -106,9 +106,6 @@ public:
std::shared_ptr<QueryStatistics> get_query_statistics_ptr();
-private:
- static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-
protected:
// Set to true after close() has been called. subclasses should check and
set this in
// close().
diff --git a/be/src/io/fs/local_file_writer.h
b/be/src/http/action/adjust_tracing_dump.cpp
similarity index 52%
copy from be/src/io/fs/local_file_writer.h
copy to be/src/http/action/adjust_tracing_dump.cpp
index b138879e358..55d1526d82b 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/http/action/adjust_tracing_dump.cpp
@@ -15,38 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "adjust_tracing_dump.h"
-#include <cstddef>
-
-#include "common/status.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
+#include "common/logging.h"
+#include "http/http_channel.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "runtime/exec_env.h"
namespace doris {
-namespace io {
-
-class LocalFileWriter final : public FileWriter {
-public:
- LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data =
true);
- LocalFileWriter(Path path, int fd);
- ~LocalFileWriter() override;
-
- Status close() override;
- Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
-
-private:
- void _abort();
- Status _close(bool sync);
-
-private:
- int _fd; // owned
- bool _dirty = false;
- const bool _sync_data;
-};
-
-} // namespace io
+void AdjustTracingDump::handle(HttpRequest* req) {
+ auto* ctx = ExecEnv::GetInstance()->pipeline_tracer_context();
+ auto* params = req->params();
+ if (auto status = ctx->change_record_params(*params); status.ok()) {
+ HttpChannel::send_reply(req, "change record type succeed!\n"); // ok
+ } else { // not
ok
+ LOG(WARNING) << "adjust pipeline tracing dump method failed:" <<
status.msg() << '\n';
+ HttpChannel::send_reply(req, HttpStatus::NOT_FOUND,
status.msg().data());
+ }
+}
} // namespace doris
diff --git a/be/src/io/fs/local_file_writer.h
b/be/src/http/action/adjust_tracing_dump.h
similarity index 56%
copy from be/src/io/fs/local_file_writer.h
copy to be/src/http/action/adjust_tracing_dump.h
index b138879e358..d82d8b1e55b 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/http/action/adjust_tracing_dump.h
@@ -17,36 +17,18 @@
#pragma once
-#include <cstddef>
-
-#include "common/status.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
+#include "http/http_handler.h"
namespace doris {
-namespace io {
-class LocalFileWriter final : public FileWriter {
-public:
- LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data =
true);
- LocalFileWriter(Path path, int fd);
- ~LocalFileWriter() override;
+class HttpRequest;
- Status close() override;
- Status appendv(const Slice* data, size_t data_cnt) override;
- Status finalize() override;
+class AdjustTracingDump : public HttpHandler {
+public:
+ AdjustTracingDump() = default;
-private:
- void _abort();
- Status _close(bool sync);
+ ~AdjustTracingDump() override = default;
-private:
- int _fd; // owned
- bool _dirty = false;
- const bool _sync_data;
+ void handle(HttpRequest* req) override;
};
-
-} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index 82d3911ae9d..a8cdf5f4eb6 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -19,8 +19,8 @@
#include <butil/macros.h>
#include <glog/logging.h>
-#include <stdint.h>
+#include <cstdint>
#include <memory>
#include <string>
#include <vector>
@@ -29,8 +29,7 @@
#include "io/fs/file_reader_writer_fwd.h"
#include "io/fs/path.h"
-namespace doris {
-namespace io {
+namespace doris::io {
#ifndef FILESYSTEM_M
#define FILESYSTEM_M(stmt) \
@@ -83,7 +82,6 @@ public:
std::shared_ptr<FileSystem> getSPtr() { return shared_from_this(); }
-public:
// the root path of this fs.
// if not empty, all given Path will be "_root_path/path"
const Path& root_path() const { return _root_path; }
@@ -97,7 +95,8 @@ public:
virtual ~FileSystem() = default;
// Each derived class should implement create method to create fs.
- DISALLOW_COPY_AND_ASSIGN(FileSystem);
+ FileSystem(const FileSystem&) = delete;
+ const FileSystem& operator=(const FileSystem&) = delete;
protected:
/// create file and return a FileWriter
@@ -152,7 +151,6 @@ protected:
return _root_path / path;
}
-protected:
FileSystem(Path&& root_path, std::string&& id, FileSystemType type)
: _root_path(std::move(root_path)), _id(std::move(id)),
_type(type) {}
@@ -163,5 +161,4 @@ protected:
using FileSystemSPtr = std::shared_ptr<FileSystem>;
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 705f0ed36e9..8578b9f5ac2 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -17,9 +17,8 @@
#pragma once
-#include <stdint.h>
-#include <time.h>
-
+#include <cstdint>
+#include <ctime>
#include <functional>
#include <memory>
#include <string>
@@ -29,8 +28,7 @@
#include "io/fs/file_system.h"
#include "io/fs/path.h"
-namespace doris {
-namespace io {
+namespace doris::io {
class LocalFileSystem final : public FileSystem {
public:
@@ -106,7 +104,6 @@ private:
LocalFileSystem(Path&& root_path, std::string&& id = "");
};
-const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
+PURE const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index b138879e358..8c8d436d54e 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -25,8 +25,7 @@
#include "io/fs/path.h"
#include "util/slice.h"
-namespace doris {
-namespace io {
+namespace doris::io {
class LocalFileWriter final : public FileWriter {
public:
@@ -42,11 +41,9 @@ private:
void _abort();
Status _close(bool sync);
-private:
int _fd; // owned
bool _dirty = false;
- const bool _sync_data;
+ const bool _sync_data = false;
};
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 9a30cee5ab9..9c9d7cd6099 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -17,7 +17,8 @@
#include "pipeline.h"
-#include <ostream>
+#include <memory>
+#include <string>
#include <utility>
#include "pipeline/exec/operator.h"
@@ -26,10 +27,13 @@ namespace doris::pipeline {
void Pipeline::_init_profile() {
auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id);
- _pipeline_profile.reset(new RuntimeProfile(std::move(s)));
+ _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
}
Status Pipeline::build_operators() {
+ _name.reserve(_operator_builders.size() * 10);
+ _name.append(std::to_string(id()));
+
OperatorPtr pre;
for (auto& operator_t : _operator_builders) {
auto o = operator_t->build_operator();
@@ -37,6 +41,11 @@ Status Pipeline::build_operators() {
static_cast<void>(o->set_child(pre));
}
_operators.emplace_back(o);
+
+ _name.push_back('-');
+ _name.append(std::to_string(operator_t->id()));
+ _name.append(o->get_name());
+
pre = std::move(o);
}
return Status::OK();
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 148191f4e2d..ab5b7e36bc2 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -18,11 +18,11 @@
#pragma once
#include <glog/logging.h>
-#include <stdint.h>
-#include <algorithm>
-#include <atomic>
+#include <cstdint>
#include <memory>
+#include <string_view>
+#include <utility>
#include <vector>
#include "common/status.h"
@@ -42,18 +42,19 @@ using PipelineId = uint32_t;
class Pipeline : public std::enable_shared_from_this<Pipeline> {
friend class PipelineTask;
friend class PipelineXTask;
+ friend class PipelineXFragmentContext;
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, int num_tasks,
std::weak_ptr<PipelineFragmentContext> context)
- : _pipeline_id(pipeline_id), _context(context),
_num_tasks(num_tasks) {
+ : _pipeline_id(pipeline_id), _context(std::move(context)),
_num_tasks(num_tasks) {
_init_profile();
}
void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
- pipeline->_parents.push_back({_operator_builders.size(),
weak_from_this()});
- _dependencies.push_back({_operator_builders.size(), pipeline});
+ pipeline->_parents.emplace_back(_operator_builders.size(),
weak_from_this());
+ _dependencies.emplace_back(_operator_builders.size(), pipeline);
}
// If all dependencies are finished, this pipeline task should be
scheduled.
@@ -192,6 +193,11 @@ private:
std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;
+ // pipline id + operator names. init when:
+ // build_operators(), if pipeline;
+ // _build_pipelines() and _create_tree_helper(), if pipelineX.
+ std::string _name;
+
std::unique_ptr<RuntimeProfile> _pipeline_profile;
// Operators for pipelineX. All pipeline tasks share operators from this.
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 358086e94eb..239173fa781 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -117,10 +117,10 @@ namespace doris::pipeline {
bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");
PipelineFragmentContext::PipelineFragmentContext(
- const TUniqueId& query_id, const TUniqueId& instance_id, const int
fragment_id,
- int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv*
exec_env,
+ const TUniqueId& query_id, const TUniqueId& instance_id, int
fragment_id, int backend_num,
+ std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
- const report_status_callback& report_status_cb)
+ report_status_callback report_status_cb)
: _query_id(query_id),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
@@ -129,7 +129,7 @@ PipelineFragmentContext::PipelineFragmentContext(
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_is_report_on_cancel(true),
- _report_status_cb(report_status_cb),
+ _report_status_cb(std::move(report_status_cb)),
_create_time(MonotonicNanos()) {
_fragment_watcher.start();
}
@@ -951,31 +951,13 @@ Status PipelineFragmentContext::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
- [this](auto&& PH1) { return
update_status(std::forward<decltype(PH1)>(PH1)); },
- [this](auto&& PH1, auto&& PH2) {
- cancel(std::forward<decltype(PH1)>(PH1),
std::forward<decltype(PH2)>(PH2));
+ [this](Status st) { return update_status(st); },
+ [this](const PPlanFragmentCancelReason& reason, const
std::string& msg) {
+ cancel(reason, msg);
}},
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}
-bool
PipelineFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink
sink) {
- OlapTableSchemaParam schema;
- if (!schema.init(sink.schema).ok()) {
- return false;
- }
- if (schema.is_partial_update()) {
- return true;
- }
- for (const auto& index_schema : schema.indexes()) {
- for (const auto& index : index_schema->indexes) {
- if (index->index_type() == INVERTED) {
- return true;
- }
- }
- }
- return false;
-}
-
std::string PipelineFragmentContext::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info: QueryId
= {}\n",
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 4c805b50582..9925689cb2c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -19,13 +19,11 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
-#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
#include <functional>
-#include <future>
#include <memory>
#include <mutex>
#include <string>
@@ -62,10 +60,10 @@ public:
using report_status_callback = std::function<Status(
const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&)>;
PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId&
instance_id,
- const int fragment_id, int backend_num,
+ int fragment_id, int backend_num,
std::shared_ptr<QueryContext> query_ctx, ExecEnv*
exec_env,
const std::function<void(RuntimeState*, Status*)>&
call_back,
- const report_status_callback& report_status_cb);
+ report_status_callback report_status_cb);
~PipelineFragmentContext() override;
@@ -210,7 +208,6 @@ protected:
int _num_instances = 1;
private:
- static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
std::vector<std::unique_ptr<PipelineTask>> _tasks;
uint64_t _create_time;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index b54a6de593d..517d6b8a8de 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -17,11 +17,10 @@
#pragma once
-#include <stdint.h>
-
+#include <cstdint>
#include <memory>
#include <string>
-#include <vector>
+#include <string_view>
#include "common/config.h"
#include "common/status.h"
@@ -266,7 +265,7 @@ public:
}
// If enable_debug_log_timeout_secs <= 0, then disable the log
if (_pipeline_task_watcher.elapsed_time() >
- config::enable_debug_log_timeout_secs * 1000l * 1000l * 1000l) {
+ config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) {
_has_exceed_timeout = true;
return true;
}
@@ -286,7 +285,9 @@ public:
}
}
- RuntimeState* runtime_state() { return _state; }
+ RuntimeState* runtime_state() const { return _state; }
+
+ std::string task_name() const { return fmt::format("task{}({})", _index,
_pipeline->_name); }
protected:
void _finish_p_dependency() {
diff --git a/be/src/pipeline/pipeline_tracing.cpp
b/be/src/pipeline/pipeline_tracing.cpp
new file mode 100644
index 00000000000..94675f77f63
--- /dev/null
+++ b/be/src/pipeline/pipeline_tracing.cpp
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline_tracing.h"
+
+#include <absl/time/clock.h>
+#include <fcntl.h>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <chrono>
+#include <cstdint>
+#include <mutex>
+#include <string>
+
+#include "common/config.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "io/fs/local_file_writer.h"
+#include "util/time.h"
+
+namespace doris::pipeline {
+
+void PipelineTracerContext::record(ScheduleRecord record) {
+ if (_dump_type == RecordType::None) [[unlikely]] {
+ return;
+ }
+ if (_datas.contains(record.query_id)) {
+ _datas[record.query_id].enqueue(record);
+ } else {
+ std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash
+ _datas[record.query_id].enqueue(record);
+ }
+}
+
+void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t task_group)
{
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ _id_to_taskgroup[query_id] = task_group;
+ }
+ if (_dump_type == RecordType::PerQuery) {
+ _dump(query_id);
+ } else if (_dump_type == RecordType::Periodic) {
+ auto now = MonotonicSeconds();
+ auto interval = now - _last_dump_time;
+ if (interval > _dump_interval_s) {
+ _dump(query_id);
+ }
+ }
+}
+
+Status PipelineTracerContext::change_record_params(
+ const std::map<std::string, std::string>& params) {
+ bool effective = false;
+ if (auto it = params.find("type"); it != params.end()) {
+ if (boost::iequals(it->second, "disable") ||
boost::iequals(it->second, "none")) {
+ _dump_type = RecordType::None;
+ effective = true;
+ } else if (boost::iequals(it->second, "per_query") ||
+ boost::iequals(it->second, "perquery")) {
+ _dump_type = RecordType::PerQuery;
+ effective = true;
+ } else if (boost::iequals(it->second, "periodic")) {
+ _dump_type = RecordType::Periodic;
+ _last_dump_time = MonotonicSeconds();
+ effective = true;
+ }
+ }
+
+ if (auto it = params.find("dump_interval"); it != params.end()) {
+ _dump_interval_s = std::stoll(it->second); // s as unit
+ effective = true;
+ }
+
+ return effective ? Status::OK()
+ : Status::InvalidArgument(
+ "No qualified param in changing tracing record
method");
+}
+
+void PipelineTracerContext::_dump(TUniqueId query_id) {
+ if (_dump_type == RecordType::None) {
+ return;
+ }
+
+ //TODO: when dump, now could append records but can't add new query. try
use better grained locks.
+ std::unique_lock<std::mutex> l(_data_lock); // can't rehash
+ if (_dump_type == RecordType::PerQuery) {
+ auto path = _dir / fmt::format("query{}", to_string(query_id));
+ int fd = ::open(
+ path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+ S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP |
S_IWOTH | S_IROTH);
+ if (fd < 0) [[unlikely]] {
+ throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+ "create tracing log file {} failed", path.c_str()));
+ }
+ auto writer = io::LocalFileWriter {path, fd};
+
+ ScheduleRecord record;
+ while (_datas[query_id].try_dequeue(record)) {
+ uint64_t v = 0;
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ v = _id_to_taskgroup[query_id];
+ }
+ auto tmp_str = record.to_string(v);
+ auto text = Slice {tmp_str};
+ THROW_IF_ERROR(writer.appendv(&text, 1));
+ }
+
+ THROW_IF_ERROR(writer.finalize());
+ THROW_IF_ERROR(writer.close());
+ } else if (_dump_type == RecordType::Periodic) {
+ auto path = _dir / fmt::format("until{}",
+
std::chrono::steady_clock::now().time_since_epoch().count());
+ int fd = ::open(
+ path.c_str(), O_CREAT | O_WRONLY | O_TRUNC,
+ S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP |
S_IWOTH | S_IROTH);
+ if (fd < 0) [[unlikely]] {
+ throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>(
+ "create tracing log file {} failed", path.c_str()));
+ }
+ auto writer = io::LocalFileWriter {path, fd};
+
+ for (auto& [id, trace] : _datas) {
+ ScheduleRecord record;
+ while (trace.try_dequeue(record)) {
+ uint64_t v = 0;
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ v = _id_to_taskgroup[query_id];
+ }
+ auto tmp_str = record.to_string(v);
+ auto text = Slice {tmp_str};
+ THROW_IF_ERROR(writer.appendv(&text, 1));
+ }
+ }
+ THROW_IF_ERROR(writer.finalize());
+ THROW_IF_ERROR(writer.close());
+
+ _last_dump_time = MonotonicSeconds();
+ }
+
+ _datas.erase(query_id);
+ {
+ std::unique_lock<std::mutex> l(_tg_lock);
+ _id_to_taskgroup.erase(query_id);
+ }
+}
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_tracing.h
b/be/src/pipeline/pipeline_tracing.h
new file mode 100644
index 00000000000..3160148c570
--- /dev/null
+++ b/be/src/pipeline/pipeline_tracing.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <concurrentqueue.h>
+#include <fmt/core.h>
+#include <gen_cpp/Types_types.h>
+#include <parallel_hashmap/phmap.h>
+
+#include <cstdint>
+#include <filesystem>
+
+#include "common/config.h"
+#include "util/hash_util.hpp" // IWYU pragma: keep
+#include "util/thrift_util.h"
+#include "util/time.h"
+
+namespace doris::pipeline {
+
+struct ScheduleRecord {
+ TUniqueId query_id;
+ std::string task_id;
+ uint32_t core_id;
+ uint64_t thread_id;
+ uint64_t start_time;
+ uint64_t end_time;
+ std::string state_name;
+
+ bool operator<(const ScheduleRecord& rhs) const { return start_time <
rhs.start_time; }
+ std::string to_string(uint64_t append_value) const {
+ return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n",
doris::to_string(query_id), task_id,
+ core_id, thread_id, start_time, end_time,
state_name, append_value);
+ }
+};
+
+// all tracing datas of ONE specific query
+using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>;
+
+// belongs to exec_env, for all query, if enable
+class PipelineTracerContext {
+public:
+ enum class RecordType {
+ None, // disable
+ PerQuery, // record per query. one query one file.
+ Periodic // record per times. one timeslice one file.
+ };
+ void record(ScheduleRecord record); // record one schedule record
+ void end_query(TUniqueId query_id,
+ uint64_t task_group); // tell context this query is end.
may leads to dump.
+ Status change_record_params(const std::map<std::string, std::string>&
params);
+
+ bool enabled() const { return !(_dump_type == RecordType::None); }
+
+private:
+ void _dump(TUniqueId query_id); // dump data to disk. one query or all.
+
+ std::mutex _data_lock; // lock for map, not map items.
+ phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
+ std::mutex _tg_lock; //TODO: use an lockfree DS
+ phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_taskgroup;
+
+ RecordType _dump_type = RecordType::None;
+ std::filesystem::path _dir = config::pipeline_tracing_log_dir;
+ decltype(MonotonicSeconds()) _last_dump_time;
+ decltype(MonotonicSeconds()) _dump_interval_s =
+ 60; // effective iff Periodic mode. 1 minute default.
+};
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 696fcfefba5..f13cf37b1fb 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -23,12 +23,13 @@
#include <gen_cpp/Planner_types.h>
#include <pthread.h>
#include <runtime/result_buffer_mgr.h>
-#include <stdlib.h>
+
+#include <cstdlib>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
+#include <memory>
#include <ostream>
-#include <typeinfo>
#include <utility>
#include "common/config.h"
@@ -42,7 +43,6 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
-#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
@@ -67,7 +67,6 @@
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "pipeline/exec/result_sink_operator.h"
-#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/schema_scan_operator.h"
#include "pipeline/exec/select_operator.h"
#include "pipeline/exec/set_probe_sink_operator.h"
@@ -93,7 +92,6 @@
#include "util/container_util.hpp"
#include "util/debug_util.h"
#include "util/uid_util.h"
-#include "vec/common/assert_cast.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
@@ -162,7 +160,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
}
_num_instances = request.local_params.size();
_total_instances = request.__isset.total_instances ?
request.total_instances : _num_instances;
- _runtime_profile.reset(new RuntimeProfile("PipelineContext"));
+ _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
@@ -378,7 +376,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
// TODO: figure out good buffer size based on size of output row
// Result file sink is not the top sink
- if (params.__isset.destinations && params.destinations.size() > 0) {
+ if (params.__isset.destinations && !params.destinations.empty()) {
_sink.reset(new ResultFileSinkOperatorX(next_sink_operator_id(),
row_desc,
thrift_sink.result_file_sink,
params.destinations,
output_exprs, desc_tbl));
@@ -408,7 +406,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
auto new_pipeline = add_pipeline();
RowDescriptor* _row_desc = nullptr;
{
- auto& tmp_row_desc =
+ const auto& tmp_row_desc =
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
? RowDescriptor(state->desc_tbl(),
{thrift_sink.multi_cast_stream_sink.sinks[i]
@@ -602,14 +600,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
return Status::OK();
};
- for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
- if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
- auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
+ for (auto& _pipeline : _pipelines) {
+ if (pipeline_id_to_task.contains(_pipeline->id())) {
+ auto* task = pipeline_id_to_task[_pipeline->id()];
DCHECK(task != nullptr);
// if this task has upstream dependency, then record them.
- if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) {
- auto& deps = _dag[_pipelines[pip_idx]->id()];
+ if (_dag.find(_pipeline->id()) != _dag.end()) {
+ auto& deps = _dag[_pipeline->id()];
for (auto& dep : deps) {
if (pipeline_id_to_task.contains(dep)) {
task->add_upstream_dependency(
@@ -642,11 +640,14 @@ Status
PipelineXFragmentContext::_build_pipelines(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs,
OperatorXPtr* root,
PipelinePtr cur_pipe) {
- if (request.fragment.plan.nodes.size() == 0) {
+ if (request.fragment.plan.nodes.empty()) {
throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no
plan node!");
}
int node_idx = 0;
+
+ cur_pipe->_name.append(std::to_string(cur_pipe->id()));
+
RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes,
request, descs, nullptr,
&node_idx, root, cur_pipe, 0));
@@ -688,6 +689,10 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
*root = op;
}
+ cur_pipe->_name.push_back('-');
+ cur_pipe->_name.append(std::to_string(op->id()));
+ cur_pipe->_name.append(op->get_name());
+
// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
@@ -875,6 +880,8 @@ Status PipelineXFragmentContext::_add_local_exchange(
return Status::OK();
}
+// NOLINTBEGIN(readability-function-size)
+// NOLINTBEGIN(readability-function-cognitive-complexity)
Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const
TPlanNode& tnode,
const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs,
OperatorXPtr& op,
@@ -1150,6 +1157,9 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
return Status::OK();
}
+// NOLINTEND(readability-function-cognitive-complexity)
+// NOLINTEND(readability-function-size)
+
template <bool is_intersect>
Status PipelineXFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorXPtr& op,
@@ -1295,30 +1305,13 @@ Status PipelineXFragmentContext::send_report(bool done)
{
{true, exec_status, runtime_states, nullptr,
_runtime_state->load_channel_profile(),
done || !exec_status.ok(), _query_ctx->coord_addr, _query_id,
_fragment_id,
TUniqueId(), _backend_num, _runtime_state.get(),
- std::bind(&PipelineFragmentContext::update_status, this,
std::placeholders::_1),
- std::bind(&PipelineFragmentContext::cancel, this,
std::placeholders::_1,
- std::placeholders::_2)},
+ [this](Status st) { return update_status(st); },
+ [this](const PPlanFragmentCancelReason& reason, const
std::string& msg) {
+ cancel(reason, msg);
+ }},
std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this()));
}
-bool
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink
sink) {
- OlapTableSchemaParam schema;
- if (!schema.init(sink.schema).ok()) {
- return false;
- }
- if (schema.is_partial_update()) {
- return true;
- }
- for (const auto& index_schema : schema.indexes()) {
- for (const auto& index : index_schema->indexes) {
- if (index->index_type() == INVERTED) {
- return true;
- }
- }
- }
- return false;
-}
-
std::string PipelineXFragmentContext::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "PipelineXFragmentContext Info:\n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 102dd854998..9630484f443 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -169,8 +169,6 @@ private:
const std::map<int, int>&
shuffle_idx_to_instance_idx,
const bool ignore_data_distribution);
- bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
-
bool _enable_local_shuffle() const { return
_runtime_state->enable_local_shuffle(); }
OperatorXPtr _root_op = nullptr;
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 98678685d3f..5032bdef6b7 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -23,33 +23,33 @@
#include <glog/logging.h>
#include <sched.h>
-#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <functional>
#include <ostream>
#include <string>
#include <thread>
+#include <utility>
#include "common/logging.h"
-#include "common/signal_handler.h"
#include "pipeline/pipeline_task.h"
-#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
+#include "runtime/exec_env.h"
#include "runtime/query_context.h"
#include "util/debug_util.h"
#include "util/sse_util.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
+#include "util/time.h"
#include "util/uid_util.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris::pipeline {
BlockedTaskScheduler::BlockedTaskScheduler(std::string name)
- : _name(name), _started(false), _shutdown(false) {}
+ : _name(std::move(name)), _started(false), _shutdown(false) {}
Status BlockedTaskScheduler::start() {
LOG(INFO) << "BlockedTaskScheduler start";
@@ -192,7 +192,7 @@ void BlockedTaskScheduler::_schedule() {
void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>&
local_tasks,
std::list<PipelineTask*>::iterator&
task_itr,
PipelineTaskState t_state) {
- auto task = *task_itr;
+ auto* task = *task_itr;
task->set_state(t_state);
local_tasks.erase(task_itr++);
static_cast<void>(task->get_task_queue()->push_back(task));
@@ -215,8 +215,7 @@ Status TaskScheduler::start() {
_markers.reserve(cores);
for (size_t i = 0; i < cores; ++i) {
_markers.push_back(std::make_unique<std::atomic<bool>>(true));
- RETURN_IF_ERROR(
-
_fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i)));
+ RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i);
}));
}
return Status::OK();
}
@@ -226,6 +225,29 @@ Status TaskScheduler::schedule_task(PipelineTask* task) {
// TODO control num of task
}
+// after _close_task, task maybe destructed.
+void _close_task(PipelineTask* task, PipelineTaskState state, Status
exec_status) {
+ // close_a_pipeline may delete fragment context and will core in some defer
+ // code, because the defer code will access fragment context it self.
+ auto lock_for_context = task->fragment_context()->shared_from_this();
+ // is_pending_finish does not check status, so has to check status in
close API.
+ // For example, in async writer, the writer may failed during dealing with
eos_block
+ // but it does not return error status. Has to check the error status in
close API.
+ // We have already refactor all source and sink api, the close API does
not need waiting
+ // for pending finish now. So that could call close directly.
+ Status status = task->close(exec_status);
+ if (!status.ok() && state != PipelineTaskState::CANCELED) {
+ task->query_context()->cancel(true, status.to_string(),
+ Status::Cancelled(status.to_string()));
+ state = PipelineTaskState::CANCELED;
+ }
+ task->set_state(state);
+ task->set_close_pipeline_time();
+ task->finalize();
+ task->set_running(false);
+ task->fragment_context()->close_a_pipeline();
+}
+
void TaskScheduler::_do_work(size_t index) {
const auto& marker = _markers[index];
while (*marker) {
@@ -286,7 +308,31 @@ void TaskScheduler::_do_work(size_t index) {
auto status = Status::OK();
try {
- status = task->execute(&eos);
+ //TODO: use a better enclose to abstracting these
+ if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
+ TUniqueId query_id = task->query_context()->query_id();
+ std::string task_name = task->task_name();
+#ifdef __APPLE__
+ uint32_t core_id = 0;
+#else
+ uint32_t core_id = sched_getcpu();
+#endif
+ std::thread::id tid = std::this_thread::get_id();
+ uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid);
+ uint64_t start_time = MonotonicMicros();
+
+ status = task->execute(&eos);
+
+ uint64_t end_time = MonotonicMicros();
+ auto state = task->get_state();
+ std::string state_name =
+ state == PipelineTaskState::RUNNABLE ?
get_state_name(state) : "";
+ ExecEnv::GetInstance()->pipeline_tracer_context()->record(
+ {query_id, task_name, core_id, thread_id, start_time,
end_time,
+ state_name});
+ } else {
+ status = task->execute(&eos);
+ }
} catch (const Exception& e) {
status = e.to_status();
}
@@ -372,28 +418,6 @@ void TaskScheduler::_do_work(size_t index) {
}
}
-void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status) {
- // close_a_pipeline may delete fragment context and will core in some defer
- // code, because the defer code will access fragment context it self.
- auto lock_for_context = task->fragment_context()->shared_from_this();
- // is_pending_finish does not check status, so has to check status in
close API.
- // For example, in async writer, the writer may failed during dealing with
eos_block
- // but it does not return error status. Has to check the error status in
close API.
- // We have already refactor all source and sink api, the close API does
not need waiting
- // for pending finish now. So that could call close directly.
- Status status = task->close(exec_status);
- if (!status.ok() && state != PipelineTaskState::CANCELED) {
- task->query_context()->cancel(true, status.to_string(),
- Status::Cancelled(status.to_string()));
- state = PipelineTaskState::CANCELED;
- }
- task->set_state(state);
- task->set_close_pipeline_time();
- task->finalize();
- task->set_running(false);
- task->fragment_context()->close_a_pipeline();
-}
-
void TaskScheduler::stop() {
if (!this->_shutdown.load()) {
if (_task_queue) {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 41ac8c0c098..3074cf02afc 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -103,8 +103,5 @@ private:
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
void _do_work(size_t index);
- // after _close_task, task maybe destructed.
- void _close_task(PipelineTask* task, PipelineTaskState state,
- Status exec_status = Status::OK());
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1f6b1b7e8b9..89d871be293 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -34,6 +34,7 @@
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/tablet_fwd.h"
+#include "pipeline/pipeline_tracing.h"
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove
this include header
#include "util/threadpool.h"
@@ -143,7 +144,7 @@ public:
static bool ready() { return _s_ready.load(std::memory_order_acquire); }
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return
_external_scan_context_mgr; }
- doris::vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
+ vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; }
ResultBufferMgr* result_mgr() { return _result_mgr; }
ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; }
ClientCache<BackendServiceClient>* client_cache() { return
_backend_client_cache; }
@@ -206,7 +207,7 @@ public:
std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return
_stream_load_executor; }
RoutineLoadTaskExecutor* routine_load_task_executor() { return
_routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
- doris::vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
+ vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
MemTableMemoryLimiter* memtable_memory_limiter() { return
_memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
@@ -264,14 +265,18 @@ public:
}
std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return
_dummy_lru_cache; }
- std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
get_global_block_scheduler() {
+ std::shared_ptr<pipeline::BlockedTaskScheduler>
get_global_block_scheduler() {
return _global_block_scheduler;
}
- doris::pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
+ pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
return _runtime_filter_timer_queue;
}
+ pipeline::PipelineTracerContext* pipeline_tracer_context() {
+ return _pipeline_tracer_ctx.get();
+ }
+
private:
ExecEnv();
@@ -291,7 +296,7 @@ private:
UserFunctionCache* _user_function_cache = nullptr;
// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
- doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
+ vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
ResultBufferMgr* _result_mgr = nullptr;
ResultQueueMgr* _result_queue_mgr = nullptr;
ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
@@ -342,7 +347,7 @@ private:
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
- doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
+ vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
@@ -374,15 +379,17 @@ private:
std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
// used for query with group cpu hard limit
- std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_global_block_scheduler;
+ std::shared_ptr<pipeline::BlockedTaskScheduler> _global_block_scheduler;
// used for query without workload group
- std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_without_group_block_scheduler;
+ std::shared_ptr<pipeline::BlockedTaskScheduler>
_without_group_block_scheduler;
- doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue =
nullptr;
+ pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;
WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
+
+ std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index f5f08a71064..2f6ac61d646 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -50,6 +50,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema_cache.h"
#include "olap/wal/wal_manager.h"
+#include "pipeline/pipeline_tracing.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/block_spill_manager.h"
@@ -201,6 +202,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
// so it should be created before all query begin and deleted after all
query and daemon thread stoppped
_runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
init_file_cache_factory();
+ _pipeline_tracer_ctx =
std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_task_group_manager = new taskgroup::TaskGroupManager();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e52b71d277b..c722fb24bb7 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -612,7 +612,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(query_id,
params.fragment_num_on_host, _exec_env,
- params.query_options,
params.coord);
+ params.query_options,
params.coord, pipeline);
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool),
params.desc_tbl,
&(query_ctx->desc_tbl)));
// set file scan range params
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 2c25d37d14b..bca8b409c02 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -17,6 +17,9 @@
#include "runtime/query_context.h"
+#include <exception>
+#include <memory>
+
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
@@ -34,11 +37,13 @@ public:
};
QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num,
ExecEnv* exec_env,
- const TQueryOptions& query_options, TNetworkAddress
coord_addr)
+ const TQueryOptions& query_options, TNetworkAddress
coord_addr,
+ bool is_pipeline)
: fragment_num(total_fragment_num),
timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
+ _is_pipeline(is_pipeline),
_query_options(query_options) {
this->coord_addr = coord_addr;
_start_time = VecDateTimeValue::local_time();
@@ -46,8 +51,8 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
_shared_scanner_controller.reset(new
vectorized::SharedScannerController());
_execution_dependency =
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency",
this);
- _runtime_filter_mgr.reset(
- new RuntimeFilterMgr(TUniqueId(),
RuntimeFilterParamsContext::create(this)));
+ _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
+ TUniqueId(), RuntimeFilterParamsContext::create(this));
timeout_second = query_options.execution_timeout;
@@ -86,7 +91,7 @@ QueryContext::~QueryContext() {
// it is found that query already exists in _query_ctx_map, and query mem
tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because
there is memory consumed
// on query mem tracker, released on other trackers.
- std::string mem_tracker_msg {""};
+ std::string mem_tracker_msg;
if (query_mem_tracker->peak_consumption() != 0) {
mem_tracker_msg = fmt::format(
", deregister query/load memory tracker, queryId={}, Limit={},
CurrUsed={}, "
@@ -95,7 +100,9 @@ QueryContext::~QueryContext() {
MemTracker::print_bytes(query_mem_tracker->consumption()),
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
+ uint64_t group_id = 0;
if (_task_group) {
+ group_id = _task_group->id(); // before remove
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
_task_group->remove_query(_query_id);
}
@@ -110,6 +117,15 @@ QueryContext::~QueryContext() {
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
}
+
+ //TODO: check if pipeline and tracing both enabled
+ if (_is_pipeline &&
ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
+ try {
+
ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id,
group_id);
+ } catch (std::exception& e) {
+ LOG(WARNING) << "Dump trace log failed bacause " << e.what();
+ }
+ }
}
void QueryContext::set_ready_to_execute(bool is_cancelled) {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index a639268c552..a7632a443bf 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -33,7 +33,6 @@
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "task_group/task_group.h"
-#include "util/pretty_printer.h"
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/shared_hash_table_controller.h"
@@ -61,6 +60,7 @@ struct ReportStatusRequest {
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)>
cancel_fn;
};
+
// Save the common components of fragments in a query.
// Some components like DescriptorTbl may be very large
// that will slow down each execution of fragments when DeSer them every time.
@@ -70,7 +70,7 @@ class QueryContext {
public:
QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
- const TQueryOptions& query_options, TNetworkAddress
coord_addr);
+ const TQueryOptions& query_options, TNetworkAddress
coord_addr, bool is_pipeline);
~QueryContext();
@@ -216,7 +216,7 @@ public:
ThreadPool* get_non_pipe_exec_thread_pool();
- int64_t mem_limit() { return _bytes_limit; }
+ int64_t mem_limit() const { return _bytes_limit; }
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
@@ -256,6 +256,7 @@ private:
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
int64_t _bytes_limit = 0;
+ bool _is_pipeline = false;
// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from
exec env.
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 6c961959aec..219a9534831 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -20,13 +20,13 @@
#include <event2/bufferevent.h>
#include <event2/http.h>
-#include <algorithm>
#include <string>
#include <vector>
#include "common/config.h"
#include "common/status.h"
#include "http/action/adjust_log_level.h"
+#include "http/action/adjust_tracing_dump.h"
#include "http/action/check_rpc_channel_action.h"
#include "http/action/check_tablet_segment_action.h"
#include "http/action/checksum_action.h"
@@ -99,6 +99,7 @@ HttpService::~HttpService() {
stop();
}
+// NOLINTBEGIN(readability-function-size)
Status HttpService::start() {
add_default_path_handlers(_web_page_handler.get());
@@ -162,6 +163,11 @@ Status HttpService::start() {
AdjustLogLevelAction* adjust_log_level_action = _pool.add(new
AdjustLogLevelAction());
_ev_http_server->register_handler(HttpMethod::POST, "api/glog/adjust",
adjust_log_level_action);
+ //TODO: add query GET interface
+ auto* adjust_tracing_dump = _pool.add(new AdjustTracingDump());
+ _ev_http_server->register_handler(HttpMethod::POST, "api/pipeline/tracing",
+ adjust_tracing_dump);
+
// Register BE version action
VersionAction* version_action =
_pool.add(new VersionAction(_env, TPrivilegeHier::GLOBAL,
TPrivilegeType::NONE));
@@ -201,8 +207,9 @@ Status HttpService::start() {
// register metrics
{
- auto action = _pool.add(new
MetricsAction(DorisMetrics::instance()->metric_registry(), _env,
- TPrivilegeHier::GLOBAL,
TPrivilegeType::NONE));
+ auto* action =
+ _pool.add(new
MetricsAction(DorisMetrics::instance()->metric_registry(), _env,
+ TPrivilegeHier::GLOBAL,
TPrivilegeType::NONE));
_ev_http_server->register_handler(HttpMethod::GET, "/metrics", action);
}
@@ -310,6 +317,7 @@ Status HttpService::start() {
_ev_http_server->start();
return Status::OK();
}
+// NOLINTEND(readability-function-size)
void HttpService::stop() {
if (stopped) {
diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp
index fd141f3c74b..395c01ec390 100644
--- a/be/src/util/thrift_util.cpp
+++ b/be/src/util/thrift_util.cpp
@@ -24,18 +24,17 @@
#include <thrift/transport/TTransportException.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
+#include <string>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
+#include "exec/tablet_info.h"
+#include "olap/tablet_schema.h"
#include "util/thrift_server.h"
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache::thrift::protocol {
class TProtocol;
-} // namespace protocol
-} // namespace thrift
-} // namespace apache
+} // namespace apache::thrift::protocol
// TCompactProtocol requires some #defines to work right. They also define
UNLIKELY
// so we need to undef this.
@@ -152,4 +151,27 @@ bool t_network_address_comparator(const TNetworkAddress&
a, const TNetworkAddres
return false;
}
+
+std::string to_string(const TUniqueId& id) {
+ return std::to_string(id.hi).append(std::to_string(id.lo));
+}
+
+bool _has_inverted_index_or_partial_update(TOlapTableSink sink) {
+ OlapTableSchemaParam schema;
+ if (!schema.init(sink.schema).ok()) {
+ return false;
+ }
+ if (schema.is_partial_update()) {
+ return true;
+ }
+ for (const auto& index_schema : schema.indexes()) {
+ for (const auto& index : index_schema->indexes) {
+ if (index->index_type() == INVERTED) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
} // namespace doris
diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h
index b16482df610..aff3a9ab101 100644
--- a/be/src/util/thrift_util.h
+++ b/be/src/util/thrift_util.h
@@ -17,11 +17,13 @@
#pragma once
-#include <stdint.h>
-#include <string.h>
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Types_types.h>
#include <thrift/TApplicationException.h>
#include <thrift/transport/TBufferTransports.h>
+#include <cstdint>
+#include <cstring>
#include <exception>
#include <memory>
#include <string>
@@ -29,14 +31,10 @@
#include "common/status.h"
-namespace apache {
-namespace thrift {
-namespace protocol {
+namespace apache::thrift::protocol {
class TProtocol;
class TProtocolFactory;
-} // namespace protocol
-} // namespace thrift
-} // namespace apache
+} // namespace apache::thrift::protocol
namespace doris {
@@ -61,7 +59,7 @@ public:
uint8_t* buffer = nullptr;
RETURN_IF_ERROR(serialize<T>(obj, &len, &buffer));
result->resize(len);
- memcpy(&((*result)[0]), buffer, len);
+ memcpy(result->data(), buffer, len);
return Status::OK();
}
@@ -177,4 +175,8 @@ void t_network_address_to_string(const TNetworkAddress&
address, std::string* ou
// string representation
bool t_network_address_comparator(const TNetworkAddress& a, const
TNetworkAddress& b);
+PURE std::string to_string(const TUniqueId& id);
+
+PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+
} // namespace doris
diff --git a/build.sh b/build.sh
index 7b67a303db1..ffc73dcab23 100755
--- a/build.sh
+++ b/build.sh
@@ -727,6 +727,7 @@ EOF
cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/*
"${DORIS_OUTPUT}/be/www"/
copy_common_files "${DORIS_OUTPUT}/be/"
mkdir -p "${DORIS_OUTPUT}/be/log"
+ mkdir -p "${DORIS_OUTPUT}/be/log/tracing"
mkdir -p "${DORIS_OUTPUT}/be/storage"
fi
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]