This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 885019a  Make DataStreamRecvr cancel_stream out of lock (#2281)
885019a is described below

commit 885019a75b18e9264bac7abf37f6f3aa2cd5640c
Author: kangkaisen <[email protected]>
AuthorDate: Sat Nov 23 16:52:49 2019 +0800

    Make DataStreamRecvr cancel_stream out of lock (#2281)
---
 be/src/runtime/data_stream_mgr.cpp | 93 ++++++++++++++++++++++----------------
 1 file changed, 54 insertions(+), 39 deletions(-)

diff --git a/be/src/runtime/data_stream_mgr.cpp 
b/be/src/runtime/data_stream_mgr.cpp
index 8f76156..0af9c99 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -54,11 +54,11 @@ shared_ptr<DataStreamRecvr> 
DataStreamMgr::create_recvr(RuntimeState* state,
         bool is_merging, std::shared_ptr<QueryStatisticsRecvr> 
sub_plan_query_statistics_recvr) {
     DCHECK(profile != NULL);
     VLOG_FILE << "creating receiver for fragment="
-            << fragment_instance_id << ", node=" << dest_node_id;
+              << fragment_instance_id << ", node=" << dest_node_id;
     shared_ptr<DataStreamRecvr> recvr(
-            new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-                fragment_instance_id, dest_node_id, num_senders, is_merging, 
buffer_size,
-                profile, sub_plan_query_statistics_recvr));
+        new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
+                            fragment_instance_id, dest_node_id, num_senders, 
is_merging, buffer_size,
+                            profile, sub_plan_query_statistics_recvr));
     uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
     lock_guard<mutex> l(_lock);
     _fragment_stream_set.insert(std::make_pair(fragment_instance_id, 
dest_node_id));
@@ -99,7 +99,6 @@ Status DataStreamMgr::transmit_data(const 
PTransmitDataParams* request, ::google
     t_finst_id.hi = finst_id.hi();
     t_finst_id.lo = finst_id.lo();
     shared_ptr<DataStreamRecvr> recvr = find_recvr(t_finst_id, 
request->node_id());
-
     if (recvr == nullptr) {
         // The receiver may remove itself from the receiver map via 
deregister_recvr()
         // at any time without considering the remaining number of senders.
@@ -111,8 +110,8 @@ Status DataStreamMgr::transmit_data(const 
PTransmitDataParams* request, ::google
         return Status::OK();
     }
 
-    // request can only be used before calling recvr's add_batch or when 
request 
-    // is the last for the sender, because request maybe released after it's 
batch 
+    // request can only be used before calling recvr's add_batch or when 
request
+    // is the last for the sender, because request maybe released after it's 
batch
     // is consumed by ExchangeNode.
     if (request->has_query_statistics()) {
         recvr->add_sub_plan_statistics(request->query_statistics(), 
request->sender_id());
@@ -120,36 +119,44 @@ Status DataStreamMgr::transmit_data(const 
PTransmitDataParams* request, ::google
 
     bool eos = request->eos();
     if (request->has_row_batch()) {
-        recvr->add_batch(request->row_batch(), request->sender_id(), 
-                request->be_number(), request->packet_seq(), eos ? nullptr : 
done);
+        recvr->add_batch(request->row_batch(), request->sender_id(),
+                         request->be_number(), request->packet_seq(), eos ? 
nullptr : done);
     }
 
     if (eos) {
-        recvr->remove_sender(request->sender_id(), request->be_number());      
      
-    } 
+        recvr->remove_sender(request->sender_id(), request->be_number());
+    }
     return Status::OK();
 }
 
 Status DataStreamMgr::deregister_recvr(
         const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
+    std::vector<boost::shared_ptr<DataStreamRecvr>> recvrs;
     VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << 
fragment_instance_id
-        << ", node=" << node_id;
+               << ", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
-    lock_guard<mutex> l(_lock);
-    std::pair<StreamMap::iterator, StreamMap::iterator> range =
-        _receiver_map.equal_range(hash_value);
-    while (range.first != range.second) {
-        const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
-        if (recvr->fragment_instance_id() == fragment_instance_id
+    {
+        lock_guard<mutex> l(_lock);
+        std::pair<StreamMap::iterator, StreamMap::iterator> range =
+            _receiver_map.equal_range(hash_value);
+        while (range.first != range.second) {
+            const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
+            if (recvr->fragment_instance_id() == fragment_instance_id
                 && recvr->dest_node_id() == node_id) {
-            // Notify concurrent add_data() requests that the stream has been 
terminated.
-            recvr->cancel_stream();
-            
_fragment_stream_set.erase(std::make_pair(recvr->fragment_instance_id(),
-                        recvr->dest_node_id()));
-            _receiver_map.erase(range.first);
-            return Status::OK();
+                recvrs.push_back(recvr);
+                _fragment_stream_set.erase(std::make_pair(
+                    recvr->fragment_instance_id(), recvr->dest_node_id()));
+                _receiver_map.erase(range.first);
+                return Status::OK();
+            }
+            ++range.first;
         }
-        ++range.first;
+    }
+
+    // Notify concurrent add_data() requests that the stream has been 
terminated.
+    // cancel_stream maybe take a long time, so we handle it out of lock.
+    for(auto& it: recvrs) {
+        it->cancel_stream();
     }
 
     std::stringstream err;
@@ -161,21 +168,29 @@ Status DataStreamMgr::deregister_recvr(
 
 void DataStreamMgr::cancel(const TUniqueId& fragment_instance_id) {
     VLOG_QUERY << "cancelling all streams for fragment=" << 
fragment_instance_id;
-    lock_guard<mutex> l(_lock);
-    FragmentStreamSet::iterator i =
-        _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 
0));
-    while (i != _fragment_stream_set.end() && i->first == 
fragment_instance_id) {
-        shared_ptr<DataStreamRecvr> recvr = find_recvr(i->first, i->second, 
false);
-        if (recvr == NULL) {
-            // keep going but at least log it
-            std::stringstream err;
-            err << "cancel(): missing in stream_map: fragment=" << i->first
-                << " node=" << i->second;
-            LOG(ERROR) << err.str();
-        } else {
-            recvr->cancel_stream();
+    std::vector<boost::shared_ptr<DataStreamRecvr>> recvrs;
+    {
+        lock_guard<mutex> l(_lock);
+        FragmentStreamSet::iterator i =
+            
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
+        while (i != _fragment_stream_set.end() && i->first == 
fragment_instance_id) {
+            shared_ptr<DataStreamRecvr> recvr = find_recvr(i->first, 
i->second, false);
+            if (recvr == NULL) {
+                // keep going but at least log it
+                std::stringstream err;
+                err << "cancel(): missing in stream_map: fragment=" << i->first
+                    << " node=" << i->second;
+                LOG(ERROR) << err.str();
+            } else {
+                recvrs.push_back(recvr);
+            }
+            ++i;
         }
-        ++i;
+    }
+
+    // cancel_stream maybe take a long time, so we handle it out of lock.
+    for(auto& it: recvrs) {
+        it->cancel_stream();
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to