This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 17b8e5a933c073505356ffc4f843f488ad06a9f9
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Fri Dec 20 15:07:45 2024 +0100

    IMPALA-13630: Avoid holding global lock when closing KrpcDataStreamRecvr
    
    KrpcDataStreamRecvr::CancelStream() may take some time as it:
    - may need to wait for SenderQueue's lock
    - needs to respond all deferred RPCs
    
    While responding RPCs is asynchronous, it still involves serialization
    of the response and a potential system call to schedule the response to
    the KRPC reactor thread.
    
    The change is safe as all KrpcDataStreamRecvr::SenderQueue functions
    check cancallation and return fast if it is already cancelled. Calling
    CancelStream() twice (once in DeregisterRecvr(), once in Cancel()) is
    also harmless.
    
    Change-Id: Ie110722f144400132de1e23813e260dab1de77e7
    Reviewed-on: http://gerrit.cloudera.org:8080/22251
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/krpc-data-stream-mgr.cc | 98 ++++++++++++++++++++--------------
 1 file changed, 57 insertions(+), 41 deletions(-)

diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 42258f31d..cdfbb01c5 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -307,55 +307,71 @@ Status KrpcDataStreamMgr::DeregisterRecvr(
     const TUniqueId& finst_id, PlanNodeId dest_node_id) {
   VLOG_QUERY << "DeregisterRecvr(): fragment_instance_id=" << PrintId(finst_id)
              << ", node=" << dest_node_id;
+  shared_ptr<KrpcDataStreamRecvr> recvr_to_cancel;
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
-  lock_guard<mutex> l(lock_);
-  pair<RecvrMap::iterator, RecvrMap::iterator> range =
-      receiver_map_.equal_range(hash_value);
-  while (range.first != range.second) {
-    const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second;
-    if (recvr->fragment_instance_id() == finst_id &&
-        recvr->dest_node_id() == dest_node_id) {
-      // Notify concurrent AddData() requests that the stream has been 
terminated.
-      recvr->CancelStream();
-      RecvrId recvr_id =
-          make_pair(recvr->fragment_instance_id(), recvr->dest_node_id());
-      fragment_recvr_set_.erase(recvr_id);
-      receiver_map_.erase(range.first);
-      closed_stream_expirations_.insert(
-          make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, recvr_id));
-      closed_stream_cache_.insert(recvr_id);
-      return Status::OK();
+  {
+    lock_guard<mutex> l(lock_);
+    pair<RecvrMap::iterator, RecvrMap::iterator> range =
+        receiver_map_.equal_range(hash_value);
+    while (range.first != range.second) {
+      const shared_ptr<KrpcDataStreamRecvr>& recvr = range.first->second;
+      if (recvr->fragment_instance_id() == finst_id &&
+          recvr->dest_node_id() == dest_node_id) {
+        recvr_to_cancel = recvr;
+        // Notify concurrent AddData() requests that the stream has been 
terminated.
+        RecvrId recvr_id =
+            make_pair(recvr->fragment_instance_id(), recvr->dest_node_id());
+        fragment_recvr_set_.erase(recvr_id);
+        receiver_map_.erase(range.first);
+        closed_stream_expirations_.insert(
+            make_pair(MonotonicMillis() + STREAM_EXPIRATION_TIME_MS, 
recvr_id));
+        closed_stream_cache_.insert(recvr_id);
+        break;
+      }
+      ++range.first;
     }
-    ++range.first;
   }
-
-  const string msg = Substitute(
-      "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1",
-      PrintId(finst_id), dest_node_id);
-  return Status(msg);
+  if (recvr_to_cancel == nullptr) {
+    const string msg = Substitute(
+        "Unknown row receiver id: fragment_instance_id=$0, dest_node_id=$1",
+        PrintId(finst_id), dest_node_id);
+    return Status(msg);
+  }
+  // Call CancelStream() after releasing lock_.
+  recvr_to_cancel->CancelStream();
+  return Status::OK();
 }
 
 void KrpcDataStreamMgr::Cancel(const TUniqueId& query_id) {
   VLOG_QUERY << "cancelling active streams for query_id=" << PrintId(query_id);
-  lock_guard<mutex> l(lock_);
-  // Fragment instance IDs are the query ID with the lower bits set to the 
instance
-  // index. Therefore all finstances for a query are clustered together, 
starting
-  // after the position in the map where the query_id would be.
-  FragmentRecvrSet::iterator iter =
-      fragment_recvr_set_.lower_bound(make_pair(query_id, 0));
-  while (iter != fragment_recvr_set_.end() &&
-         GetQueryId(iter->first) == query_id) {
-    bool unused;
-    shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first, 
iter->second, &unused);
-    if (recvr != nullptr) {
-      recvr->CancelStream();
-    } else {
-      // keep going but at least log it
-      LOG(ERROR) << Substitute(
-          "Cancel(): missing in stream_map: fragment_instance_id=$0 node=$1",
-              PrintId(iter->first), iter->second);
+  vector<shared_ptr<KrpcDataStreamRecvr>> receivers_to_cancel;
+  {
+    lock_guard<mutex> l(lock_);
+    // Fragment instance IDs are the query ID with the lower bits set to the 
instance
+    // index. Therefore all finstances for a query are clustered together, 
starting
+    // after the position in the map where the query_id would be.
+    FragmentRecvrSet::iterator iter =
+        fragment_recvr_set_.lower_bound(make_pair(query_id, 0));
+    while (iter != fragment_recvr_set_.end() &&
+          GetQueryId(iter->first) == query_id) {
+      bool unused;
+      shared_ptr<KrpcDataStreamRecvr> recvr =
+          FindRecvr(iter->first, iter->second, &unused);
+      if (recvr != nullptr) {
+        receivers_to_cancel.push_back(recvr);
+      } else {
+        // keep going but at least log it
+        LOG(ERROR) << Substitute(
+            "Cancel(): missing in stream_map: fragment_instance_id=$0 node=$1",
+                PrintId(iter->first), iter->second);
+      }
+      ++iter;
     }
-    ++iter;
+  }
+  // Call CancelStream() after releasing lock_.
+  for (shared_ptr<KrpcDataStreamRecvr>& recvr: receivers_to_cancel) {
+    DCHECK(recvr != nullptr);
+    recvr->CancelStream();
   }
 }
 

Reply via email to