This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 868a01d  IMPALA-6101: call DataStreamMgr::Cancel() once per query
868a01d is described below

commit 868a01dca9071978f482aec2a9c3e18aca957914
Author: Tim Armstrong <[email protected]>
AuthorDate: Wed Dec 23 10:45:54 2020 -0800

    IMPALA-6101: call DataStreamMgr::Cancel() once per query
    
    This is a bit of cleanup left over from the KRPC work that could avoid
    some lock contention for queries with large numbers of fragments.
    
    The change is just to do cancellation of receivers once per query
    instead of once per fragment.
    
    Change-Id: I7677d21f0aaddc3d4b56f72c0470ea850e34611e
    Reviewed-on: http://gerrit.cloudera.org:8080/16901
    Reviewed-by: Thomas Tauber-Marshall <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/data-stream-test.cc        |  4 ++--
 be/src/runtime/fragment-instance-state.cc |  2 --
 be/src/runtime/krpc-data-stream-mgr.cc    | 13 ++++++++-----
 be/src/runtime/krpc-data-stream-mgr.h     | 17 ++++++++++-------
 be/src/runtime/query-state.cc             |  3 +++
 5 files changed, 23 insertions(+), 16 deletions(-)

diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index ec61d5e..01928aa 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -708,9 +708,9 @@ TEST_F(DataStreamTest, UnknownSenderLargeResult) {
 TEST_F(DataStreamTest, Cancel) {
   TUniqueId instance_id;
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false, 
&instance_id);
-  stream_mgr_->Cancel(instance_id);
+  stream_mgr_->Cancel(GetQueryId(instance_id));
   StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
-  stream_mgr_->Cancel(instance_id);
+  stream_mgr_->Cancel(GetQueryId(instance_id));
   JoinReceivers();
   EXPECT_TRUE(receiver_info_[0]->status.IsCancelled());
   EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 1de34d6..4f306b9 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -38,7 +38,6 @@
 #include "runtime/client-cache.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-state.h"
-#include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
@@ -137,7 +136,6 @@ void FragmentInstanceState::Cancel() {
   runtime_state_->Cancel();
   PlanRootSink* root_sink = GetRootSink();
   if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
-  
ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
 Status FragmentInstanceState::Prepare() {
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 7e47fe5..96d6b62 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -335,13 +335,16 @@ Status KrpcDataStreamMgr::DeregisterRecvr(
   return Status(msg);
 }
 
-void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
-  VLOG_QUERY << "cancelling active streams for fragment_instance_id="
-             << PrintId(finst_id);
+void KrpcDataStreamMgr::Cancel(const TUniqueId& query_id) {
+  VLOG_QUERY << "cancelling active streams for query_id=" << PrintId(query_id);
   lock_guard<mutex> l(lock_);
+  // Fragment instance IDs are the query ID with the lower bits set to the 
instance
+  // index. Therefore all finstances for a query are clustered together, 
starting
+  // after the position in the map where the query_id would be.
   FragmentRecvrSet::iterator iter =
-      fragment_recvr_set_.lower_bound(make_pair(finst_id, 0));
-  while (iter != fragment_recvr_set_.end() && iter->first == finst_id) {
+      fragment_recvr_set_.lower_bound(make_pair(query_id, 0));
+  while (iter != fragment_recvr_set_.end() &&
+         GetQueryId(iter->first) == query_id) {
     bool unused;
     shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first, 
iter->second, &unused);
     if (recvr != nullptr) {
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index af7ff2c..48a8144 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -213,7 +213,7 @@ struct EndDataStreamCtx {
 //
 /// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
 /// which unblocks all KrpcDataStreamRecvr::GetBatch() calls that are made on 
behalf
-/// of the cancelled fragment id.
+/// of the cancelled query id.
 ///
 /// Exposes three metrics:
 ///  'senders-blocked-on-recvr-creation' - currently blocked senders.
@@ -280,10 +280,10 @@ class KrpcDataStreamMgr : public CacheLineAligned {
   void CloseSender(const EndDataStreamRequestPB* request,
       EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context);
 
-  /// Cancels all receivers registered for fragment_instance_id immediately. 
The
-  /// receivers will not accept any row batches after being cancelled. Any 
buffered
-  /// row batches will not be freed until Close() is called on the receivers.
-  void Cancel(const TUniqueId& fragment_instance_id);
+  /// Cancels all receivers registered for 'query_id' immediately. The 
receivers will not
+  /// accept any row batches after being cancelled. Any buffered row batches 
will not be
+  /// freed until Close() is called on the receivers.
+  void Cancel(const TUniqueId& query_id);
 
   /// Waits for maintenance thread and sender response thread pool to finish.
   ~KrpcDataStreamMgr();
@@ -354,6 +354,9 @@ class KrpcDataStreamMgr : public CacheLineAligned {
   typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
 
   /// Less-than ordering for RecvrIds.
+  /// This ordering clusters all receivers for the same query together, because
+  /// the fragment instance ID is the query ID with the lower bits set to the
+  /// index of the fragment instance within the query.
   struct ComparisonOp {
     bool operator()(const RecvrId& a, const RecvrId& b) {
       if (a.first.hi < b.first.hi) {
@@ -370,8 +373,8 @@ class KrpcDataStreamMgr : public CacheLineAligned {
   };
 
   /// An ordered set of receiver IDs so that we can easily find all receiver 
IDs belonging
-  /// to a fragment instance (by calling std::set::lower_bound(finst_id, 0) to 
find the
-  /// first entry and iterating until the entry's finst_id doesn't match).
+  /// to a query (by calling std::set::lower_bound(query_id, 0) to find the
+  /// first entry and iterating until the entry's finst_id doesn't belong to 
the query).
   ///
   /// There is one entry in fragment_recvr_set_ for every entry in 
receiver_map_.
   typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index fae3c1c..9d3a07f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -37,6 +37,7 @@
 #include "runtime/fragment-instance-state.h"
 #include "runtime/fragment-state.h"
 #include "runtime/initial-reservations.h"
+#include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/runtime-filter-bank.h"
@@ -973,6 +974,8 @@ void QueryState::Cancel() {
   if (!is_cancelled_.CompareAndSwap(0, 1)) return;
   if (filter_bank_ != nullptr) filter_bank_->Cancel();
   for (auto entry: fis_map_) entry.second->Cancel();
+  // Cancel data streams for all fragment instances.
+  ExecEnv::GetInstance()->stream_mgr()->Cancel(query_id());
 }
 
 void QueryState::PublishFilter(const PublishFilterParamsPB& params, 
RpcContext* context) {

Reply via email to