This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new bea05da638e [shuffle](fix) Do not use copy assignment for TUniqueId
(#32969)
bea05da638e is described below
commit bea05da638e7ce21f758123c3569ae10eb6e2223
Author: Gabriel <[email protected]>
AuthorDate: Fri Mar 29 10:35:23 2024 +0800
[shuffle](fix) Do not use copy assignment for TUniqueId (#32969)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 28 +++++++++++++--------------
be/src/vec/sink/vdata_stream_sender.h | 2 +-
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2b97551d8fb..0eba79b25c5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -186,27 +186,27 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
if (_is_finishing) {
return Status::OK();
}
- TUniqueId ins_id = request.channel->_fragment_instance_id;
- if (_is_receiver_eof(ins_id.lo)) {
+ auto ins_id = request.channel->_fragment_instance_id.lo;
+ if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
- std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
+ std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id]);
// Do not have in process rpc, directly send
- if (_rpc_channel_is_idle[ins_id.lo]) {
+ if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
- _rpc_channel_is_idle[ins_id.lo] = false;
+ _rpc_channel_is_idle[ins_id] = false;
_busy_channels++;
}
- _instance_to_package_queue[ins_id.lo].emplace(std::move(request));
+ _instance_to_package_queue[ins_id].emplace(std::move(request));
_total_queue_size++;
if (_queue_dependency && _total_queue_size > _queue_capacity) {
_queue_dependency->block();
}
}
if (send_now) {
- RETURN_IF_ERROR(_send_rpc(ins_id.lo));
+ RETURN_IF_ERROR(_send_rpc(ins_id));
}
return Status::OK();
@@ -217,23 +217,23 @@ Status
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
if (_is_finishing) {
return Status::OK();
}
- TUniqueId ins_id = request.channel->_fragment_instance_id;
- if (_is_receiver_eof(ins_id.lo)) {
+ auto ins_id = request.channel->_fragment_instance_id.lo;
+ if (_is_receiver_eof(ins_id)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
- std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
+ std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id]);
// Do not have in process rpc, directly send
- if (_rpc_channel_is_idle[ins_id.lo]) {
+ if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
- _rpc_channel_is_idle[ins_id.lo] = false;
+ _rpc_channel_is_idle[ins_id] = false;
_busy_channels++;
}
- _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
+ _instance_to_broadcast_package_queue[ins_id].emplace(request);
}
if (send_now) {
- RETURN_IF_ERROR(_send_rpc(ins_id.lo));
+ RETURN_IF_ERROR(_send_rpc(ins_id));
}
return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 88a948ed05c..8119b5a35f9 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -380,7 +380,7 @@ protected:
Parent* _parent = nullptr;
const RowDescriptor& _row_desc;
- TUniqueId _fragment_instance_id;
+ const TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
// the number of RowBatch.data bytes sent successfully
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]