This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ea7eca9345c [pipelineX](bug) Add some logs (#27596)
ea7eca9345c is described below
commit ea7eca9345c4af3f486d0cefac21ebc485beebdb
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 28 10:02:13 2023 +0800
[pipelineX](bug) Add some logs (#27596)
---
be/src/pipeline/exec/exchange_sink_buffer.h | 1 +
be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +++++++++
be/src/pipeline/exec/exchange_sink_operator.h | 1 +
be/src/pipeline/exec/exchange_source_operator.cpp | 24 +++++++++++++++++++++++
be/src/pipeline/exec/exchange_source_operator.h | 3 +++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 10 ++++++----
be/src/vec/runtime/vdata_stream_recvr.h | 2 ++
7 files changed, 46 insertions(+), 4 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index a04b3b29b32..d59872e2a1f 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -201,6 +201,7 @@ public:
}
private:
+ friend class ExchangeSinkLocalState;
void _set_ready_to_finish(bool all_done);
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71517f377f0..1c66ec02207 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -518,6 +518,15 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState*
state, Status exec_status)
return final_st;
}
+std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}",
+ PipelineXSinkLocalState<>::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {},
_busy_channels = {})",
+ _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load());
+ return fmt::to_string(debug_string_buffer);
+}
+
Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 19a326d5c6e..751c2768e82 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -186,6 +186,7 @@ public:
std::string id_name() override;
segment_v2::CompressionTypePB& compression_type();
+ std::string debug_string(int indentation_level) const override;
std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3b630cfcfcf..1c766f06a82 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -43,6 +43,30 @@ bool ExchangeSourceOperator::is_pending_finish() const {
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase*
parent)
: PipelineXLocalState<>(state, parent), num_rows_skipped(0),
is_ready(false) {}
+std::string ExchangeLocalState::debug_string(int indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}",
+ PipelineXLocalState<>::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, ", Queues: (");
+ const auto& queues = stream_recvr->sender_queues();
+ for (size_t i = 0; i < queues.size(); i++) {
+ fmt::format_to(debug_string_buffer,
+ "No. {} queue: (_num_remaining_senders = {},
block_queue size = {})", i,
+ queues[i]->_num_remaining_senders,
queues[i]->_block_queue.size());
+ }
+ fmt::format_to(debug_string_buffer, ")");
+ return fmt::to_string(debug_string_buffer);
+}
+
+std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const
{
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}",
+
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {},
_is_merging = {})",
+ _num_senders, _is_merging);
+ return fmt::to_string(debug_string_buffer);
+}
+
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index 8e745def1ba..abac33001bb 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -67,6 +67,7 @@ class ExchangeLocalState final : public PipelineXLocalState<>
{
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Dependency* dependency() override { return source_dependency.get(); }
+ std::string debug_string(int indentation_level) const override;
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
@@ -89,6 +90,8 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
+ std::string debug_string(int indentation_level = 0) const override;
+
Status close(RuntimeState* state) override;
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c96f34b213b..ed2af3f7854 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -342,10 +342,12 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(_state->fragment_instance_id()));
- fmt::format_to(
- debug_string_buffer,
- "PipelineTask[this = {}, state = {}, data state = {}, dry run =
{}]\noperators: ",
- (void*)this, get_state_name(_cur_state), (int)_data_state,
_dry_run);
+ fmt::format_to(debug_string_buffer,
+ "PipelineTask[this = {}, state = {}, data state = {}, dry
run = {}, elapse time "
+ "= {}ns], block dependency = {}, _use_blocking_queue =
{}\noperators: ",
+ (void*)this, get_state_name(_cur_state), (int)_data_state,
_dry_run,
+ MonotonicNanos() - _fragment_context->create_time(),
+ _blocked_dep ? _blocked_dep->debug_string() : "NULL",
_use_blocking_queue);
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(
debug_string_buffer, "\n{}",
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index e31b433491a..bfdd1dd351a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -60,6 +60,7 @@ class RuntimeState;
namespace pipeline {
struct ExchangeDataDependency;
class LocalExchangeChannelDependency;
+class ExchangeLocalState;
} // namespace pipeline
namespace vectorized {
@@ -235,6 +236,7 @@ public:
}
protected:
+ friend class pipeline::ExchangeLocalState;
Status _inner_get_batch_without_lock(Block* block, bool* eos);
// Not managed by this class
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]