This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit be31b8dc61f49a33be975a41d70811652c60c37d
Author: HappenLee <[email protected]>
AuthorDate: Mon Feb 5 16:07:33 2024 +0800

    [Refactor](exchange) remove unless code in exchange and opt some code 
(#30813)
---
 be/src/pipeline/pipeline.cpp              |  7 ++--
 be/src/service/internal_service.cpp       | 20 ++++--------
 be/src/vec/runtime/vdata_stream_mgr.cpp   |  2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp | 54 +++++++++++++------------------
 be/src/vec/runtime/vdata_stream_recvr.h   |  2 --
 5 files changed, 32 insertions(+), 53 deletions(-)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 7990f84df49..9a30cee5ab9 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -18,7 +18,6 @@
 #include "pipeline.h"
 
 #include <ostream>
-#include <typeinfo>
 #include <utility>
 
 #include "pipeline/exec/operator.h"
@@ -26,10 +25,8 @@
 namespace doris::pipeline {
 
 void Pipeline::_init_profile() {
-    std::stringstream ss;
-    ss << "Pipeline"
-       << " (pipeline id=" << _pipeline_id << ")";
-    _pipeline_profile.reset(new RuntimeProfile(ss.str()));
+    auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id);
+    _pipeline_profile.reset(new RuntimeProfile(std::move(s)));
 }
 
 Status Pipeline::build_operators() {
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 36e3b743d0a..d68451ac6d0 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1319,24 +1319,18 @@ void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle
     }
 }
 
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
controller,
-                                           const PTransmitDataParams* request,
-                                           PTransmitDataResult* response,
-                                           google::protobuf::Closure* done,
-                                           const Status& extract_st) {
-    std::string query_id;
-    TUniqueId finst_id;
+void PInternalService::_transmit_block(google::protobuf::RpcController* 
controller,
+                                       const PTransmitDataParams* request,
+                                       PTransmitDataResult* response,
+                                       google::protobuf::Closure* done, const 
Status& extract_st) {
     if (request->has_query_id()) {
-        query_id = print_id(request->query_id());
-        finst_id.__set_hi(request->finst_id().hi());
-        finst_id.__set_lo(request->finst_id().lo());
+        VLOG_ROW << "transmit block: fragment_instance_id=" << 
print_id(request->finst_id())
+                 << " query_id=" << print_id(request->query_id()) << " node=" 
<< request->node_id();
     }
-    VLOG_ROW << "transmit block: fragment_instance_id=" << 
print_id(request->finst_id())
-             << " query_id=" << query_id << " node=" << request->node_id();
+
     // The response is accessed when done->Run is called in transmit_block(),
     // give response a default value to avoid null pointers in high 
concurrency.
     Status st;
-    st.to_protobuf(response->mutable_status());
     if (extract_st.ok()) {
         st = _exec_env->vstream_mgr()->transmit_block(request, &done);
         if (!st.ok() && !st.is<END_OF_FILE>()) {
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index 1210cd811d1..37cf47f4a36 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -92,7 +92,7 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::find_recvr(const TUniqueId& fr
         }
         ++range.first;
     }
-    return std::shared_ptr<VDataStreamRecvr>();
+    return nullptr;
 }
 
 Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index dfc574591b3..6c26128bf5a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -136,6 +136,9 @@ void 
VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
 Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
                                                 int64_t packet_seq,
                                                 ::google::protobuf::Closure** 
done) {
+    const auto pblock_byte_size = pblock.ByteSizeLong();
+    COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
+
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_is_cancelled) {
@@ -153,8 +156,6 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         } else {
             _packet_seq_map.emplace(be_number, packet_seq);
         }
-        auto pblock_byte_size = pblock.ByteSizeLong();
-        COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size);
 
         DCHECK(_num_remaining_senders >= 0);
         if (_num_remaining_senders == 0) {
@@ -171,8 +172,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         RETURN_IF_ERROR(block->deserialize(pblock));
     }
 
+    const auto rows = block->rows();
+    if (rows == 0) {
+        return Status::OK();
+    }
     auto block_byte_size = block->allocated_bytes();
-    VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << 
block_byte_size << "\n";
+    VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << 
"\n";
 
     std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
@@ -182,16 +187,13 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
     COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
     COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
     COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
-    COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+    COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
-    bool empty = !block->rows();
+    _block_queue.emplace_back(std::move(block), block_byte_size);
+    _record_debug_info();
+    try_set_dep_ready_without_lock();
 
-    if (!empty) {
-        _block_queue.emplace_back(std::move(block), block_byte_size);
-        _record_debug_info();
-        try_set_dep_ready_without_lock();
-    }
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
         MonotonicStopWatch monotonicStopWatch;
@@ -201,16 +203,15 @@ Status VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_num
         *done = nullptr;
     }
     _recvr->update_blocks_memory_usage(block_byte_size);
-    if (!empty) {
-        _data_arrival_cv.notify_one();
-    }
+    _data_arrival_cv.notify_one();
     return Status::OK();
 }
 
 void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
+    const auto rows = block->rows();
     {
         std::unique_lock<std::mutex> l(_lock);
-        if (_is_cancelled || !block->rows()) {
+        if (_is_cancelled || rows == 0) {
             return;
         }
     }
@@ -223,7 +224,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     if (use_move) {
         block->clear();
     } else {
-        auto rows = block->rows();
         for (int i = 0; i < nblock->columns(); ++i) {
             nblock->get_by_position(i).column =
                     nblock->get_by_position(i).column->clone_resized(rows);
@@ -237,17 +237,13 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* 
block, bool use_move) {
         return;
     }
     COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_bytes_received);
-    COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows());
+    COUNTER_UPDATE(_recvr->_rows_produced_counter, rows);
     COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1);
 
-    bool empty = !nblock->rows();
-
-    if (!empty) {
-        _block_queue.emplace_back(std::move(nblock), block_mem_size);
-        _record_debug_info();
-        try_set_dep_ready_without_lock();
-        _data_arrival_cv.notify_one();
-    }
+    _block_queue.emplace_back(std::move(nblock), block_mem_size);
+    _record_debug_info();
+    try_set_dep_ready_without_lock();
+    _data_arrival_cv.notify_one();
 
     // Careful: Accessing members of _recvr that are allocated by Object pool
     // should be done before the following logic, because the _lock will be 
released
@@ -353,8 +349,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* 
stream_mgr, RuntimeState* sta
           _is_closed(false),
           _profile(profile),
           _peak_memory_usage_counter(nullptr),
-          _enable_pipeline(state->enable_pipeline_exec()),
-          _mem_available(std::make_shared<bool>(true)) {
+          _enable_pipeline(state->enable_pipeline_exec()) {
     // DataStreamRecvr may be destructed after the instance execution thread 
ends.
     _mem_tracker =
             std::make_unique<MemTracker>("VDataStreamRecvr:" + 
print_id(_fragment_instance_id));
@@ -492,12 +487,7 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) {
 
 void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) {
     _blocks_memory_usage->add(size);
-    auto val = _blocks_memory_usage_current_value.fetch_add(size);
-    if (val + size > config::exchg_node_buffer_size_bytes) {
-        *_mem_available = false;
-    } else {
-        *_mem_available = true;
-    }
+    _blocks_memory_usage_current_value.fetch_add(size);
 }
 
 void VDataStreamRecvr::close() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 2da6f9f920b..30ac4088f88 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -174,8 +174,6 @@ private:
     bool _enable_pipeline;
     std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>>
             _sender_to_local_channel_dependency;
-
-    std::shared_ptr<bool> _mem_available;
 };
 
 class ThreadClosure : public google::protobuf::Closure {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to