This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea7eca9345c [pipelineX](bug) Add some logs (#27596)
ea7eca9345c is described below

commit ea7eca9345c4af3f486d0cefac21ebc485beebdb
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 28 10:02:13 2023 +0800

    [pipelineX](bug) Add some logs (#27596)
---
 be/src/pipeline/exec/exchange_sink_buffer.h       |  1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp   |  9 +++++++++
 be/src/pipeline/exec/exchange_sink_operator.h     |  1 +
 be/src/pipeline/exec/exchange_source_operator.cpp | 24 +++++++++++++++++++++++
 be/src/pipeline/exec/exchange_source_operator.h   |  3 +++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp    | 10 ++++++----
 be/src/vec/runtime/vdata_stream_recvr.h           |  2 ++
 7 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index a04b3b29b32..d59872e2a1f 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -201,6 +201,7 @@ public:
     }
 
 private:
+    friend class ExchangeSinkLocalState;
     void _set_ready_to_finish(bool all_done);
 
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71517f377f0..1c66ec02207 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -518,6 +518,15 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* 
state, Status exec_status)
     return final_st;
 }
 
+std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}",
+                   PipelineXSinkLocalState<>::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, 
_busy_channels = {})",
+                   _sink_buffer->_should_stop.load(), 
_sink_buffer->_busy_channels.load());
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
     if (_closed) {
         return Status::OK();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 19a326d5c6e..751c2768e82 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -186,6 +186,7 @@ public:
 
     std::string id_name() override;
     segment_v2::CompressionTypePB& compression_type();
+    std::string debug_string(int indentation_level) const override;
 
     std::vector<vectorized::PipChannel<ExchangeSinkLocalState>*> channels;
     
std::vector<std::shared_ptr<vectorized::PipChannel<ExchangeSinkLocalState>>>
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 3b630cfcfcf..1c766f06a82 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -43,6 +43,30 @@ bool ExchangeSourceOperator::is_pending_finish() const {
 ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* 
parent)
         : PipelineXLocalState<>(state, parent), num_rows_skipped(0), 
is_ready(false) {}
 
+std::string ExchangeLocalState::debug_string(int indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}",
+                   PipelineXLocalState<>::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, ", Queues: (");
+    const auto& queues = stream_recvr->sender_queues();
+    for (size_t i = 0; i < queues.size(); i++) {
+        fmt::format_to(debug_string_buffer,
+                       "No. {} queue: (_num_remaining_senders = {}, 
block_queue size = {})", i,
+                       queues[i]->_num_remaining_senders, 
queues[i]->_block_queue.size());
+    }
+    fmt::format_to(debug_string_buffer, ")");
+    return fmt::to_string(debug_string_buffer);
+}
+
+std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const 
{
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}",
+                   
OperatorX<ExchangeLocalState>::debug_string(indentation_level));
+    fmt::format_to(debug_string_buffer, ", Info: (_num_senders = {}, 
_is_merging = {})",
+                   _num_senders, _is_merging);
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
     SCOPED_TIMER(exec_time_counter());
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 8e745def1ba..abac33001bb 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -67,6 +67,7 @@ class ExchangeLocalState final : public PipelineXLocalState<> 
{
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
     Dependency* dependency() override { return source_dependency.get(); }
+    std::string debug_string(int indentation_level) const override;
     std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
     doris::vectorized::VSortExecExprs vsort_exec_exprs;
     int64_t num_rows_skipped;
@@ -89,6 +90,8 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block,
                      SourceState& source_state) override;
 
+    std::string debug_string(int indentation_level = 0) const override;
+
     Status close(RuntimeState* state) override;
     [[nodiscard]] bool is_source() const override { return true; }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index c96f34b213b..ed2af3f7854 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -342,10 +342,12 @@ std::string PipelineXTask::debug_string() {
     fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
                    print_id(_state->fragment_instance_id()));
 
-    fmt::format_to(
-            debug_string_buffer,
-            "PipelineTask[this = {}, state = {}, data state = {}, dry run = 
{}]\noperators: ",
-            (void*)this, get_state_name(_cur_state), (int)_data_state, 
_dry_run);
+    fmt::format_to(debug_string_buffer,
+                   "PipelineTask[this = {}, state = {}, data state = {}, dry 
run = {}, elapse time "
+                   "= {}ns], block dependency = {}, _use_blocking_queue = 
{}\noperators: ",
+                   (void*)this, get_state_name(_cur_state), (int)_data_state, 
_dry_run,
+                   MonotonicNanos() - _fragment_context->create_time(),
+                   _blocked_dep ? _blocked_dep->debug_string() : "NULL", 
_use_blocking_queue);
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(
                 debug_string_buffer, "\n{}",
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index e31b433491a..bfdd1dd351a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -60,6 +60,7 @@ class RuntimeState;
 namespace pipeline {
 struct ExchangeDataDependency;
 class LocalExchangeChannelDependency;
+class ExchangeLocalState;
 } // namespace pipeline
 
 namespace vectorized {
@@ -235,6 +236,7 @@ public:
     }
 
 protected:
+    friend class pipeline::ExchangeLocalState;
     Status _inner_get_batch_without_lock(Block* block, bool* eos);
 
     // Not managed by this class


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to