This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new bea05da638e [shuffle](fix) Do not use copy assignment for TUniqueId 
(#32969)
bea05da638e is described below

commit bea05da638e7ce21f758123c3569ae10eb6e2223
Author: Gabriel <[email protected]>
AuthorDate: Fri Mar 29 10:35:23 2024 +0800

    [shuffle](fix) Do not use copy assignment for TUniqueId (#32969)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp | 28 +++++++++++++--------------
 be/src/vec/sink/vdata_stream_sender.h         |  2 +-
 2 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2b97551d8fb..0eba79b25c5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -186,27 +186,27 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
     if (_is_finishing) {
         return Status::OK();
     }
-    TUniqueId ins_id = request.channel->_fragment_instance_id;
-    if (_is_receiver_eof(ins_id.lo)) {
+    auto ins_id = request.channel->_fragment_instance_id.lo;
+    if (_is_receiver_eof(ins_id)) {
         return Status::EndOfFile("receiver eof");
     }
     bool send_now = false;
     {
-        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
+        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id]);
         // Do not have in process rpc, directly send
-        if (_rpc_channel_is_idle[ins_id.lo]) {
+        if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
-            _rpc_channel_is_idle[ins_id.lo] = false;
+            _rpc_channel_is_idle[ins_id] = false;
             _busy_channels++;
         }
-        _instance_to_package_queue[ins_id.lo].emplace(std::move(request));
+        _instance_to_package_queue[ins_id].emplace(std::move(request));
         _total_queue_size++;
         if (_queue_dependency && _total_queue_size > _queue_capacity) {
             _queue_dependency->block();
         }
     }
     if (send_now) {
-        RETURN_IF_ERROR(_send_rpc(ins_id.lo));
+        RETURN_IF_ERROR(_send_rpc(ins_id));
     }
 
     return Status::OK();
@@ -217,23 +217,23 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& req
     if (_is_finishing) {
         return Status::OK();
     }
-    TUniqueId ins_id = request.channel->_fragment_instance_id;
-    if (_is_receiver_eof(ins_id.lo)) {
+    auto ins_id = request.channel->_fragment_instance_id.lo;
+    if (_is_receiver_eof(ins_id)) {
         return Status::EndOfFile("receiver eof");
     }
     bool send_now = false;
     {
-        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
+        std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id]);
         // Do not have in process rpc, directly send
-        if (_rpc_channel_is_idle[ins_id.lo]) {
+        if (_rpc_channel_is_idle[ins_id]) {
             send_now = true;
-            _rpc_channel_is_idle[ins_id.lo] = false;
+            _rpc_channel_is_idle[ins_id] = false;
             _busy_channels++;
         }
-        _instance_to_broadcast_package_queue[ins_id.lo].emplace(request);
+        _instance_to_broadcast_package_queue[ins_id].emplace(request);
     }
     if (send_now) {
-        RETURN_IF_ERROR(_send_rpc(ins_id.lo));
+        RETURN_IF_ERROR(_send_rpc(ins_id));
     }
 
     return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 88a948ed05c..8119b5a35f9 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -380,7 +380,7 @@ protected:
     Parent* _parent = nullptr;
 
     const RowDescriptor& _row_desc;
-    TUniqueId _fragment_instance_id;
+    const TUniqueId _fragment_instance_id;
     PlanNodeId _dest_node_id;
 
     // the number of RowBatch.data bytes sent successfully


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to