This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit be31b8dc61f49a33be975a41d70811652c60c37d Author: HappenLee <[email protected]> AuthorDate: Mon Feb 5 16:07:33 2024 +0800 [Refactor](exchange) remove unless code in exchange and opt some code (#30813) --- be/src/pipeline/pipeline.cpp | 7 ++-- be/src/service/internal_service.cpp | 20 ++++-------- be/src/vec/runtime/vdata_stream_mgr.cpp | 2 +- be/src/vec/runtime/vdata_stream_recvr.cpp | 54 +++++++++++++------------------ be/src/vec/runtime/vdata_stream_recvr.h | 2 -- 5 files changed, 32 insertions(+), 53 deletions(-) diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 7990f84df49..9a30cee5ab9 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -18,7 +18,6 @@ #include "pipeline.h" #include <ostream> -#include <typeinfo> #include <utility> #include "pipeline/exec/operator.h" @@ -26,10 +25,8 @@ namespace doris::pipeline { void Pipeline::_init_profile() { - std::stringstream ss; - ss << "Pipeline" - << " (pipeline id=" << _pipeline_id << ")"; - _pipeline_profile.reset(new RuntimeProfile(ss.str())); + auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id); + _pipeline_profile.reset(new RuntimeProfile(std::move(s))); } Status Pipeline::build_operators() { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 36e3b743d0a..d68451ac6d0 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -1319,24 +1319,18 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle } } -void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* controller, - const PTransmitDataParams* request, - PTransmitDataResult* response, - google::protobuf::Closure* done, - const Status& extract_st) { - std::string query_id; - TUniqueId finst_id; +void PInternalService::_transmit_block(google::protobuf::RpcController* controller, + const PTransmitDataParams* request, + PTransmitDataResult* response, + google::protobuf::Closure* done, const Status& extract_st) { if (request->has_query_id()) { - query_id = print_id(request->query_id()); - finst_id.__set_hi(request->finst_id().hi()); - finst_id.__set_lo(request->finst_id().lo()); + VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) + << " query_id=" << print_id(request->query_id()) << " node=" << request->node_id(); } - VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) - << " query_id=" << query_id << " node=" << request->node_id(); + // The response is accessed when done->Run is called in transmit_block(), // give response a default value to avoid null pointers in high concurrency. Status st; - st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { st = _exec_env->vstream_mgr()->transmit_block(request, &done); if (!st.ok() && !st.is<END_OF_FILE>()) { diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index 1210cd811d1..37cf47f4a36 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -92,7 +92,7 @@ std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::find_recvr(const TUniqueId& fr } ++range.first; } - return std::shared_ptr<VDataStreamRecvr>(); + return nullptr; } Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index dfc574591b3..6c26128bf5a 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -136,6 +136,9 @@ void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() { Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { + const auto pblock_byte_size = pblock.ByteSizeLong(); + COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size); + { std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { @@ -153,8 +156,6 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num } else { _packet_seq_map.emplace(be_number, packet_seq); } - auto pblock_byte_size = pblock.ByteSizeLong(); - COUNTER_UPDATE(_recvr->_bytes_received_counter, pblock_byte_size); DCHECK(_num_remaining_senders >= 0); if (_num_remaining_senders == 0) { @@ -171,8 +172,12 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num RETURN_IF_ERROR(block->deserialize(pblock)); } + const auto rows = block->rows(); + if (rows == 0) { + return Status::OK(); + } auto block_byte_size = block->allocated_bytes(); - VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" << block_byte_size << "\n"; + VLOG_ROW << "added #rows=" << rows << " batch_size=" << block_byte_size << "\n"; std::lock_guard<std::mutex> l(_lock); if (_is_cancelled) { @@ -182,16 +187,13 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time); COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time()); COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes()); - COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows()); + COUNTER_UPDATE(_recvr->_rows_produced_counter, rows); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); - bool empty = !block->rows(); + _block_queue.emplace_back(std::move(block), block_byte_size); + _record_debug_info(); + try_set_dep_ready_without_lock(); - if (!empty) { - _block_queue.emplace_back(std::move(block), block_byte_size); - _record_debug_info(); - try_set_dep_ready_without_lock(); - } // if done is nullptr, this function can't delay this response if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) { MonotonicStopWatch monotonicStopWatch; @@ -201,16 +203,15 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num *done = nullptr; } _recvr->update_blocks_memory_usage(block_byte_size); - if (!empty) { - _data_arrival_cv.notify_one(); - } + _data_arrival_cv.notify_one(); return Status::OK(); } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { + const auto rows = block->rows(); { std::unique_lock<std::mutex> l(_lock); - if (_is_cancelled || !block->rows()) { + if (_is_cancelled || rows == 0) { return; } } @@ -223,7 +224,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { if (use_move) { block->clear(); } else { - auto rows = block->rows(); for (int i = 0; i < nblock->columns(); ++i) { nblock->get_by_position(i).column = nblock->get_by_position(i).column->clone_resized(rows); @@ -237,17 +237,13 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { return; } COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_bytes_received); - COUNTER_UPDATE(_recvr->_rows_produced_counter, block->rows()); + COUNTER_UPDATE(_recvr->_rows_produced_counter, rows); COUNTER_UPDATE(_recvr->_blocks_produced_counter, 1); - bool empty = !nblock->rows(); - - if (!empty) { - _block_queue.emplace_back(std::move(nblock), block_mem_size); - _record_debug_info(); - try_set_dep_ready_without_lock(); - _data_arrival_cv.notify_one(); - } + _block_queue.emplace_back(std::move(nblock), block_mem_size); + _record_debug_info(); + try_set_dep_ready_without_lock(); + _data_arrival_cv.notify_one(); // Careful: Accessing members of _recvr that are allocated by Object pool // should be done before the following logic, because the _lock will be released @@ -353,8 +349,7 @@ VDataStreamRecvr::VDataStreamRecvr(VDataStreamMgr* stream_mgr, RuntimeState* sta _is_closed(false), _profile(profile), _peak_memory_usage_counter(nullptr), - _enable_pipeline(state->enable_pipeline_exec()), - _mem_available(std::make_shared<bool>(true)) { + _enable_pipeline(state->enable_pipeline_exec()) { // DataStreamRecvr may be destructed after the instance execution thread ends. _mem_tracker = std::make_unique<MemTracker>("VDataStreamRecvr:" + print_id(_fragment_instance_id)); @@ -492,12 +487,7 @@ void VDataStreamRecvr::cancel_stream(Status exec_status) { void VDataStreamRecvr::update_blocks_memory_usage(int64_t size) { _blocks_memory_usage->add(size); - auto val = _blocks_memory_usage_current_value.fetch_add(size); - if (val + size > config::exchg_node_buffer_size_bytes) { - *_mem_available = false; - } else { - *_mem_available = true; - } + _blocks_memory_usage_current_value.fetch_add(size); } void VDataStreamRecvr::close() { diff --git a/be/src/vec/runtime/vdata_stream_recvr.h b/be/src/vec/runtime/vdata_stream_recvr.h index 2da6f9f920b..30ac4088f88 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.h +++ b/be/src/vec/runtime/vdata_stream_recvr.h @@ -174,8 +174,6 @@ private: bool _enable_pipeline; std::vector<std::shared_ptr<pipeline::LocalExchangeChannelDependency>> _sender_to_local_channel_dependency; - - std::shared_ptr<bool> _mem_available; }; class ThreadClosure : public google::protobuf::Closure { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
