This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 17b8e5a933c073505356ffc4f843f488ad06a9f9 Author: Csaba Ringhofer <[email protected]> AuthorDate: Fri Dec 20 15:07:45 2024 +0100 IMPALA-13630: Avoid holding global lock when closing KrpcDataStreamRecvr KrpcDataStreamRecvr::CancelStream() may take some time as it: - may need to wait for SenderQueue's lock - needs to respond all deferred RPCs While responding RPCs is asynchronous, it still involves serialization of the response and a potential system call to schedule the response to the KRPC reactor thread. The change is safe as all KrpcDataStreamRecvr::SenderQueue functions check cancallation and return fast if it is already cancelled. Calling CancelStream() twice (once in DeregisterRecvr(), once in Cancel()) is also harmless. Change-Id: Ie110722f144400132de1e23813e260dab1de77e7 Reviewed-on: http://gerrit.cloudera.org:8080/22251 Reviewed-by: Michael Smith <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/runtime/krpc-data-stream-mgr.cc | 98 ++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc index 42258f31d..cdfbb01c5 100644 --- a/be/src/runtime/krpc-data-stream-mgr.cc +++ b/be/src/runtime/krpc-data-stream-mgr.cc @@ -307,55 +307,71 @@ Status KrpcDataStreamMgr::DeregisterRecvr( const TUniqueId& finst_id, PlanNodeId dest_node_id) { VLOG_QUERY << "DeregisterRecvr(): fragment_instance_id=" << PrintId(finst_id) << ", node=" << dest_node_id; + shared_ptr<KrpcDataStreamRecvr> recvr_to_cancel; uint32_t hash_value = GetHashValue(finst_id, dest_node_id); - lock_guard<mutex> l(lock_); - pair<RecvrMap::iterator, RecvrMap::iterator> range = - receiver_map_.equal_range(hash_value); - while (range.first != range.second) { - const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second; - if (recvr->fragment_instance_id() == finst_id && - recvr->dest_node_id() == dest_node_id) { - // Notify concurrent AddData() requests that the stream has been terminated. - recvr->CancelStream(); - RecvrId recvr_id = - make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()); - fragment_recvr_set_.erase(recvr_id); - receiver_map_.erase(range.first); - closed_stream_expirations_.insert( - make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, recvr_id)); - closed_stream_cache_.insert(recvr_id); - return Status::OK(); + { + lock_guard<mutex> l(lock_); + pair<RecvrMap::iterator, RecvrMap::iterator> range = + receiver_map_.equal_range(hash_value); + while (range.first != range.second) { + const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second; + if (recvr->fragment_instance_id() == finst_id && + recvr->dest_node_id() == dest_node_id) { + recvr_to_cancel = recvr; + // Notify concurrent AddData() requests that the stream has been terminated. + RecvrId recvr_id = + make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()); + fragment_recvr_set_.erase(recvr_id); + receiver_map_.erase(range.first); + closed_stream_expirations_.insert( + make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, recvr_id)); + closed_stream_cache_.insert(recvr_id); + break; + } + ++range.first; } - ++range.first; } - - const string msg = Substitute( - "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1", - PrintId(finst_id), dest_node_id); - return Status(msg); + if (recvr_to_cancel == nullptr) { + const string msg = Substitute( + "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1", + PrintId(finst_id), dest_node_id); + return Status(msg); + } + // Call CancelStream() after releasing lock_. + recvr_to_cancel->CancelStream(); + return Status::OK(); } void KrpcDataStreamMgr::Cancel(const TUniqueId& query_id) { VLOG_QUERY << "cancelling active streams for query_id=" << PrintId(query_id); - lock_guard<mutex> l(lock_); - // Fragment instance IDs are the query ID with the lower bits set to the instance - // index. Therefore all finstances for a query are clustered together, starting - // after the position in the map where the query_id would be. - FragmentRecvrSet::iterator iter = - fragment_recvr_set_.lower_bound(make_pair(query_id, 0)); - while (iter != fragment_recvr_set_.end() && - GetQueryId(iter->first) == query_id) { - bool unused; - shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first, iter->second, &unused); - if (recvr != nullptr) { - recvr->CancelStream(); - } else { - // keep going but at least log it - LOG(ERROR) << Substitute( - "Cancel(): missing in stream_map: fragment_instance_id=$0 node=$1", - PrintId(iter->first), iter->second); + vector<shared_ptr<KrpcDataStreamRecvr>> receivers_to_cancel; + { + lock_guard<mutex> l(lock_); + // Fragment instance IDs are the query ID with the lower bits set to the instance + // index. Therefore all finstances for a query are clustered together, starting + // after the position in the map where the query_id would be. + FragmentRecvrSet::iterator iter = + fragment_recvr_set_.lower_bound(make_pair(query_id, 0)); + while (iter != fragment_recvr_set_.end() && + GetQueryId(iter->first) == query_id) { + bool unused; + shared_ptr<KrpcDataStreamRecvr> recvr = + FindRecvr(iter->first, iter->second, &unused); + if (recvr != nullptr) { + receivers_to_cancel.push_back(recvr); + } else { + // keep going but at least log it + LOG(ERROR) << Substitute( + "Cancel(): missing in stream_map: fragment_instance_id=$0 node=$1", + PrintId(iter->first), iter->second); + } + ++iter; } - ++iter; + } + // Call CancelStream() after releasing lock_. + for (shared_ptr<KrpcDataStreamRecvr>& recvr: receivers_to_cancel) { + DCHECK(recvr != nullptr); + recvr->CancelStream(); } }
