This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 885019a Make DataStreamRecvr cancel_stream out of lock (#2281)
885019a is described below
commit 885019a75b18e9264bac7abf37f6f3aa2cd5640c
Author: kangkaisen <[email protected]>
AuthorDate: Sat Nov 23 16:52:49 2019 +0800
Make DataStreamRecvr cancel_stream out of lock (#2281)
---
be/src/runtime/data_stream_mgr.cpp | 93 ++++++++++++++++++++++----------------
1 file changed, 54 insertions(+), 39 deletions(-)
diff --git a/be/src/runtime/data_stream_mgr.cpp
b/be/src/runtime/data_stream_mgr.cpp
index 8f76156..0af9c99 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -54,11 +54,11 @@ shared_ptr<DataStreamRecvr>
DataStreamMgr::create_recvr(RuntimeState* state,
bool is_merging, std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr) {
DCHECK(profile != NULL);
VLOG_FILE << "creating receiver for fragment="
- << fragment_instance_id << ", node=" << dest_node_id;
+ << fragment_instance_id << ", node=" << dest_node_id;
shared_ptr<DataStreamRecvr> recvr(
- new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
- fragment_instance_id, dest_node_id, num_senders, is_merging,
buffer_size,
- profile, sub_plan_query_statistics_recvr));
+ new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
+ fragment_instance_id, dest_node_id, num_senders,
is_merging, buffer_size,
+ profile, sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
lock_guard<mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
@@ -99,7 +99,6 @@ Status DataStreamMgr::transmit_data(const
PTransmitDataParams* request, ::google
t_finst_id.hi = finst_id.hi();
t_finst_id.lo = finst_id.lo();
shared_ptr<DataStreamRecvr> recvr = find_recvr(t_finst_id,
request->node_id());
-
if (recvr == nullptr) {
// The receiver may remove itself from the receiver map via
deregister_recvr()
// at any time without considering the remaining number of senders.
@@ -111,8 +110,8 @@ Status DataStreamMgr::transmit_data(const
PTransmitDataParams* request, ::google
return Status::OK();
}
- // request can only be used before calling recvr's add_batch or when
request
- // is the last for the sender, because request maybe released after it's
batch
+ // request can only be used before calling recvr's add_batch or when
request
+ // is the last for the sender, because request maybe released after it's
batch
// is consumed by ExchangeNode.
if (request->has_query_statistics()) {
recvr->add_sub_plan_statistics(request->query_statistics(),
request->sender_id());
@@ -120,36 +119,44 @@ Status DataStreamMgr::transmit_data(const
PTransmitDataParams* request, ::google
bool eos = request->eos();
if (request->has_row_batch()) {
- recvr->add_batch(request->row_batch(), request->sender_id(),
- request->be_number(), request->packet_seq(), eos ? nullptr :
done);
+ recvr->add_batch(request->row_batch(), request->sender_id(),
+ request->be_number(), request->packet_seq(), eos ?
nullptr : done);
}
if (eos) {
- recvr->remove_sender(request->sender_id(), request->be_number());
- }
+ recvr->remove_sender(request->sender_id(), request->be_number());
+ }
return Status::OK();
}
Status DataStreamMgr::deregister_recvr(
const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
+ std::vector<boost::shared_ptr<DataStreamRecvr>> recvrs;
VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" <<
fragment_instance_id
- << ", node=" << node_id;
+ << ", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
- lock_guard<mutex> l(_lock);
- std::pair<StreamMap::iterator, StreamMap::iterator> range =
- _receiver_map.equal_range(hash_value);
- while (range.first != range.second) {
- const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
- if (recvr->fragment_instance_id() == fragment_instance_id
+ {
+ lock_guard<mutex> l(_lock);
+ std::pair<StreamMap::iterator, StreamMap::iterator> range =
+ _receiver_map.equal_range(hash_value);
+ while (range.first != range.second) {
+ const shared_ptr<DataStreamRecvr>& recvr = range.first->second;
+ if (recvr->fragment_instance_id() == fragment_instance_id
&& recvr->dest_node_id() == node_id) {
- // Notify concurrent add_data() requests that the stream has been
terminated.
- recvr->cancel_stream();
-
_fragment_stream_set.erase(std::make_pair(recvr->fragment_instance_id(),
- recvr->dest_node_id()));
- _receiver_map.erase(range.first);
- return Status::OK();
+ recvrs.push_back(recvr);
+ _fragment_stream_set.erase(std::make_pair(
+ recvr->fragment_instance_id(), recvr->dest_node_id()));
+ _receiver_map.erase(range.first);
+ return Status::OK();
+ }
+ ++range.first;
}
- ++range.first;
+ }
+
+ // Notify concurrent add_data() requests that the stream has been
terminated.
+ // cancel_stream maybe take a long time, so we handle it out of lock.
+ for(auto& it: recvrs) {
+ it->cancel_stream();
}
std::stringstream err;
@@ -161,21 +168,29 @@ Status DataStreamMgr::deregister_recvr(
void DataStreamMgr::cancel(const TUniqueId& fragment_instance_id) {
VLOG_QUERY << "cancelling all streams for fragment=" <<
fragment_instance_id;
- lock_guard<mutex> l(_lock);
- FragmentStreamSet::iterator i =
- _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id,
0));
- while (i != _fragment_stream_set.end() && i->first ==
fragment_instance_id) {
- shared_ptr<DataStreamRecvr> recvr = find_recvr(i->first, i->second,
false);
- if (recvr == NULL) {
- // keep going but at least log it
- std::stringstream err;
- err << "cancel(): missing in stream_map: fragment=" << i->first
- << " node=" << i->second;
- LOG(ERROR) << err.str();
- } else {
- recvr->cancel_stream();
+ std::vector<boost::shared_ptr<DataStreamRecvr>> recvrs;
+ {
+ lock_guard<mutex> l(_lock);
+ FragmentStreamSet::iterator i =
+
_fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
+ while (i != _fragment_stream_set.end() && i->first ==
fragment_instance_id) {
+ shared_ptr<DataStreamRecvr> recvr = find_recvr(i->first,
i->second, false);
+ if (recvr == NULL) {
+ // keep going but at least log it
+ std::stringstream err;
+ err << "cancel(): missing in stream_map: fragment=" << i->first
+ << " node=" << i->second;
+ LOG(ERROR) << err.str();
+ } else {
+ recvrs.push_back(recvr);
+ }
+ ++i;
}
- ++i;
+ }
+
+ // cancel_stream maybe take a long time, so we handle it out of lock.
+ for(auto& it: recvrs) {
+ it->cancel_stream();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]