This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b022236337e [log](minor) Add debug logs (#56339)
b022236337e is described below

commit b022236337ea229c831f9636559fb1dc50adcf55
Author: Gabriel <[email protected]>
AuthorDate: Tue Sep 23 22:02:08 2025 +0800

    [log](minor) Add debug logs (#56339)
---
 be/src/pipeline/exec/exchange_source_operator.cpp | 11 ++-------
 be/src/vec/runtime/vdata_stream_recvr.cpp         | 27 +++++++++++++++++++++++
 be/src/vec/runtime/vdata_stream_recvr.h           |  2 ++
 be/src/vec/sink/vdata_stream_sender.cpp           | 13 +++++++++++
 be/src/vec/sink/vdata_stream_sender.h             |  1 +
 5 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 0072a6437bb..b31b193aff2 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -49,15 +49,8 @@ ExchangeLocalState::~ExchangeLocalState() {
 
 std::string ExchangeLocalState::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
-    fmt::format_to(debug_string_buffer, ", Queues: (");
-    const auto& queues = stream_recvr->sender_queues();
-    for (size_t i = 0; i < queues.size(); i++) {
-        fmt::format_to(debug_string_buffer,
-                       "No. {} queue: (_num_remaining_senders = {}, 
block_queue size = {})", i,
-                       queues[i]->_num_remaining_senders, 
queues[i]->_block_queue.size());
-    }
-    fmt::format_to(debug_string_buffer, ")");
+    fmt::format_to(debug_string_buffer, "{}, recvr: ({})", 
Base::debug_string(indentation_level),
+                   stream_recvr->debug_string());
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 5bcd7dd1ef9..9cd8d4bb8d9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -143,6 +143,21 @@ void 
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>
     }
 }
 
+std::string VDataStreamRecvr::SenderQueue::debug_string() {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer,
+                   "_num_remaining_senders = {}, block_queue size = {}, 
_is_cancelled: {}, "
+                   "_cancel_status: {}, _sender_eos_set: (",
+                   _num_remaining_senders, _block_queue.size(), _is_cancelled,
+                   _cancel_status.to_string());
+    std::lock_guard<std::mutex> l(_lock);
+    for (auto& i : _sender_eos_set) {
+        fmt::format_to(debug_string_buffer, "{}, ", i);
+    }
+    fmt::format_to(debug_string_buffer, ")");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> 
pblock, int be_number,
                                                 int64_t packet_seq,
                                                 ::google::protobuf::Closure** 
done,
@@ -412,6 +427,18 @@ void VDataStreamRecvr::add_block(Block* block, int 
sender_id, bool use_move) {
     _sender_queues[use_sender_id]->add_block(block, use_move);
 }
 
+std::string VDataStreamRecvr::debug_string() {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer,
+                   "fragment_instance_id: {}, _dest_node_id: {}, _is_merging: 
{}, _is_closed: {}",
+                   print_id(_fragment_instance_id), _dest_node_id, 
_is_merging, _is_closed);
+    for (size_t i = 0; i < _sender_queues.size(); i++) {
+        fmt::format_to(debug_string_buffer, "No. {} queue: {}", i,
+                       _sender_queues[i]->debug_string());
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 std::shared_ptr<pipeline::Dependency> 
VDataStreamRecvr::get_local_channel_dependency(
         int sender_id) {
     DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] != 
nullptr);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 9325f4ada3f..1d752b1bad8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -91,6 +91,7 @@ public:
                      const int64_t wait_for_worker, const uint64_t 
time_to_find_recvr);
 
     void add_block(Block* block, int sender_id, bool use_move);
+    std::string debug_string();
 
     MOCK_FUNCTION Status get_next(Block* block, bool* eos);
 
@@ -184,6 +185,7 @@ public:
     Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t 
packet_seq,
                      ::google::protobuf::Closure** done, const int64_t 
wait_for_worker,
                      const uint64_t time_to_find_recvr);
+    std::string debug_string();
 
     void add_block(Block* block, bool use_move);
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b31980fe36f..b4af185adb3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -263,6 +263,19 @@ Status Channel::send_local_block(Block* block, bool eos, 
bool can_be_moved) {
     }
 }
 
+std::string Channel::debug_string() const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer,
+                   "fragment_instance_id: {}, _dest_node_id: {}, _is_local: 
{}, _receiver_status: "
+                   "{}, _closed: {}, _need_close: {}, _be_number: {}, 
_eos_send: {}",
+                   print_id(_fragment_instance_id), _dest_node_id, _is_local,
+                   _receiver_status.to_string(), _closed, _need_close, 
_be_number, _eos_send);
+    if (_is_local) {
+        fmt::format_to(debug_string_buffer, "_local_recvr: {}", 
_local_recvr->debug_string());
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status Channel::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index f62391d66b1..538f98fea50 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -129,6 +129,7 @@ public:
     // Returns OK if successful, error indication otherwise.
     Status init(RuntimeState* state);
     Status open(RuntimeState* state);
+    std::string debug_string() const;
 
     MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool 
can_be_moved);
     // Flush buffered rows and close channel. This function don't wait the 
response


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to