This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b022236337e [log](minor) Add debug logs (#56339)
b022236337e is described below
commit b022236337ea229c831f9636559fb1dc50adcf55
Author: Gabriel <[email protected]>
AuthorDate: Tue Sep 23 22:02:08 2025 +0800
[log](minor) Add debug logs (#56339)
---
be/src/pipeline/exec/exchange_source_operator.cpp | 11 ++-------
be/src/vec/runtime/vdata_stream_recvr.cpp | 27 +++++++++++++++++++++++
be/src/vec/runtime/vdata_stream_recvr.h | 2 ++
be/src/vec/sink/vdata_stream_sender.cpp | 13 +++++++++++
be/src/vec/sink/vdata_stream_sender.h | 1 +
5 files changed, 45 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 0072a6437bb..b31b193aff2 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -49,15 +49,8 @@ ExchangeLocalState::~ExchangeLocalState() {
std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
- fmt::format_to(debug_string_buffer, ", Queues: (");
- const auto& queues = stream_recvr->sender_queues();
- for (size_t i = 0; i < queues.size(); i++) {
- fmt::format_to(debug_string_buffer,
- "No. {} queue: (_num_remaining_senders = {},
block_queue size = {})", i,
- queues[i]->_num_remaining_senders,
queues[i]->_block_queue.size());
- }
- fmt::format_to(debug_string_buffer, ")");
+ fmt::format_to(debug_string_buffer, "{}, recvr: ({})",
Base::debug_string(indentation_level),
+ stream_recvr->debug_string());
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 5bcd7dd1ef9..9cd8d4bb8d9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -143,6 +143,21 @@ void
VDataStreamRecvr::SenderQueue::set_source_ready(std::lock_guard<std::mutex>
}
}
+std::string VDataStreamRecvr::SenderQueue::debug_string() {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "_num_remaining_senders = {}, block_queue size = {},
_is_cancelled: {}, "
+ "_cancel_status: {}, _sender_eos_set: (",
+ _num_remaining_senders, _block_queue.size(), _is_cancelled,
+ _cancel_status.to_string());
+ std::lock_guard<std::mutex> l(_lock);
+ for (auto& i : _sender_eos_set) {
+ fmt::format_to(debug_string_buffer, "{}, ", i);
+ }
+ fmt::format_to(debug_string_buffer, ")");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock>
pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure**
done,
@@ -412,6 +427,18 @@ void VDataStreamRecvr::add_block(Block* block, int
sender_id, bool use_move) {
_sender_queues[use_sender_id]->add_block(block, use_move);
}
+std::string VDataStreamRecvr::debug_string() {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "fragment_instance_id: {}, _dest_node_id: {}, _is_merging:
{}, _is_closed: {}",
+ print_id(_fragment_instance_id), _dest_node_id,
_is_merging, _is_closed);
+ for (size_t i = 0; i < _sender_queues.size(); i++) {
+ fmt::format_to(debug_string_buffer, "No. {} queue: {}", i,
+ _sender_queues[i]->debug_string());
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
std::shared_ptr<pipeline::Dependency>
VDataStreamRecvr::get_local_channel_dependency(
int sender_id) {
DCHECK(_sender_to_local_channel_dependency[_is_merging ? sender_id : 0] !=
nullptr);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 9325f4ada3f..1d752b1bad8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -91,6 +91,7 @@ public:
const int64_t wait_for_worker, const uint64_t
time_to_find_recvr);
void add_block(Block* block, int sender_id, bool use_move);
+ std::string debug_string();
MOCK_FUNCTION Status get_next(Block* block, bool* eos);
@@ -184,6 +185,7 @@ public:
Status add_block(std::unique_ptr<PBlock> pblock, int be_number, int64_t
packet_seq,
::google::protobuf::Closure** done, const int64_t
wait_for_worker,
const uint64_t time_to_find_recvr);
+ std::string debug_string();
void add_block(Block* block, bool use_move);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index b31980fe36f..b4af185adb3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -263,6 +263,19 @@ Status Channel::send_local_block(Block* block, bool eos,
bool can_be_moved) {
}
}
+std::string Channel::debug_string() const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "fragment_instance_id: {}, _dest_node_id: {}, _is_local:
{}, _receiver_status: "
+ "{}, _closed: {}, _need_close: {}, _be_number: {},
_eos_send: {}",
+ print_id(_fragment_instance_id), _dest_node_id, _is_local,
+ _receiver_status.to_string(), _closed, _need_close,
_be_number, _eos_send);
+ if (_is_local) {
+ fmt::format_to(debug_string_buffer, "_local_recvr: {}",
_local_recvr->debug_string());
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
Status Channel::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index f62391d66b1..538f98fea50 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -129,6 +129,7 @@ public:
// Returns OK if successful, error indication otherwise.
Status init(RuntimeState* state);
Status open(RuntimeState* state);
+ std::string debug_string() const;
MOCK_FUNCTION Status send_local_block(Block* block, bool eos, bool
can_be_moved);
// Flush buffered rows and close channel. This function don't wait the
response
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]