This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 868a01d IMPALA-6101: call DataStreamMgr::Cancel() once per query
868a01d is described below
commit 868a01dca9071978f482aec2a9c3e18aca957914
Author: Tim Armstrong <[email protected]>
AuthorDate: Wed Dec 23 10:45:54 2020 -0800
IMPALA-6101: call DataStreamMgr::Cancel() once per query
This is a bit of cleanup left over from the KRPC work that could avoid
some lock contention for queries with large numbers of fragments.
The change is just to do cancellation of receivers once per query
instead of once per fragment.
Change-Id: I7677d21f0aaddc3d4b56f72c0470ea850e34611e
Reviewed-on: http://gerrit.cloudera.org:8080/16901
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/data-stream-test.cc | 4 ++--
be/src/runtime/fragment-instance-state.cc | 2 --
be/src/runtime/krpc-data-stream-mgr.cc | 13 ++++++++-----
be/src/runtime/krpc-data-stream-mgr.h | 17 ++++++++++-------
be/src/runtime/query-state.cc | 3 +++
5 files changed, 23 insertions(+), 16 deletions(-)
diff --git a/be/src/runtime/data-stream-test.cc
b/be/src/runtime/data-stream-test.cc
index ec61d5e..01928aa 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -708,9 +708,9 @@ TEST_F(DataStreamTest, UnknownSenderLargeResult) {
TEST_F(DataStreamTest, Cancel) {
TUniqueId instance_id;
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, false,
&instance_id);
- stream_mgr_->Cancel(instance_id);
+ stream_mgr_->Cancel(GetQueryId(instance_id));
StartReceiver(TPartitionType::UNPARTITIONED, 1, 1, 1024, true, &instance_id);
- stream_mgr_->Cancel(instance_id);
+ stream_mgr_->Cancel(GetQueryId(instance_id));
JoinReceivers();
EXPECT_TRUE(receiver_info_[0]->status.IsCancelled());
EXPECT_TRUE(receiver_info_[1]->status.IsCancelled());
diff --git a/be/src/runtime/fragment-instance-state.cc
b/be/src/runtime/fragment-instance-state.cc
index 1de34d6..4f306b9 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -38,7 +38,6 @@
#include "runtime/client-cache.h"
#include "runtime/exec-env.h"
#include "runtime/fragment-state.h"
-#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/krpc-data-stream-sender.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-state.h"
@@ -137,7 +136,6 @@ void FragmentInstanceState::Cancel() {
runtime_state_->Cancel();
PlanRootSink* root_sink = GetRootSink();
if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
-
ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
}
Status FragmentInstanceState::Prepare() {
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc
b/be/src/runtime/krpc-data-stream-mgr.cc
index 7e47fe5..96d6b62 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -335,13 +335,16 @@ Status KrpcDataStreamMgr::DeregisterRecvr(
return Status(msg);
}
-void KrpcDataStreamMgr::Cancel(const TUniqueId& finst_id) {
- VLOG_QUERY << "cancelling active streams for fragment_instance_id="
- << PrintId(finst_id);
+void KrpcDataStreamMgr::Cancel(const TUniqueId& query_id) {
+ VLOG_QUERY << "cancelling active streams for query_id=" << PrintId(query_id);
lock_guard<mutex> l(lock_);
+ // Fragment instance IDs are the query ID with the lower bits set to the
instance
+ // index. Therefore all finstances for a query are clustered together,
starting
+ // after the position in the map where the query_id would be.
FragmentRecvrSet::iterator iter =
- fragment_recvr_set_.lower_bound(make_pair(finst_id, 0));
- while (iter != fragment_recvr_set_.end() && iter->first == finst_id) {
+ fragment_recvr_set_.lower_bound(make_pair(query_id, 0));
+ while (iter != fragment_recvr_set_.end() &&
+ GetQueryId(iter->first) == query_id) {
bool unused;
shared_ptr<KrpcDataStreamRecvr> recvr = FindRecvr(iter->first,
iter->second, &unused);
if (recvr != nullptr) {
diff --git a/be/src/runtime/krpc-data-stream-mgr.h
b/be/src/runtime/krpc-data-stream-mgr.h
index af7ff2c..48a8144 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -213,7 +213,7 @@ struct EndDataStreamCtx {
//
/// DataStreamMgr also allows asynchronous cancellation of streams via Cancel()
/// which unblocks all KrpcDataStreamRecvr::GetBatch() calls that are made on
behalf
-/// of the cancelled fragment id.
+/// of the cancelled query id.
///
/// Exposes three metrics:
/// 'senders-blocked-on-recvr-creation' - currently blocked senders.
@@ -280,10 +280,10 @@ class KrpcDataStreamMgr : public CacheLineAligned {
void CloseSender(const EndDataStreamRequestPB* request,
EndDataStreamResponsePB* response, kudu::rpc::RpcContext* context);
- /// Cancels all receivers registered for fragment_instance_id immediately.
The
- /// receivers will not accept any row batches after being cancelled. Any
buffered
- /// row batches will not be freed until Close() is called on the receivers.
- void Cancel(const TUniqueId& fragment_instance_id);
+ /// Cancels all receivers registered for 'query_id' immediately. The
receivers will not
+ /// accept any row batches after being cancelled. Any buffered row batches
will not be
+ /// freed until Close() is called on the receivers.
+ void Cancel(const TUniqueId& query_id);
/// Waits for maintenance thread and sender response thread pool to finish.
~KrpcDataStreamMgr();
@@ -354,6 +354,9 @@ class KrpcDataStreamMgr : public CacheLineAligned {
typedef std::pair<impala::TUniqueId, PlanNodeId> RecvrId;
/// Less-than ordering for RecvrIds.
+ /// This ordering clusters all receivers for the same query together, because
+ /// the fragment instance ID is the query ID with the lower bits set to the
+ /// index of the fragment instance within the query.
struct ComparisonOp {
bool operator()(const RecvrId& a, const RecvrId& b) {
if (a.first.hi < b.first.hi) {
@@ -370,8 +373,8 @@ class KrpcDataStreamMgr : public CacheLineAligned {
};
/// An ordered set of receiver IDs so that we can easily find all receiver
IDs belonging
- /// to a fragment instance (by calling std::set::lower_bound(finst_id, 0) to
find the
- /// first entry and iterating until the entry's finst_id doesn't match).
+ /// to a query (by calling std::set::lower_bound(query_id, 0) to find the
+ /// first entry and iterating until the entry's finst_id doesn't belong to
the query).
///
/// There is one entry in fragment_recvr_set_ for every entry in
receiver_map_.
typedef std::set<RecvrId, ComparisonOp> FragmentRecvrSet;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index fae3c1c..9d3a07f 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -37,6 +37,7 @@
#include "runtime/fragment-instance-state.h"
#include "runtime/fragment-state.h"
#include "runtime/initial-reservations.h"
+#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/query-exec-mgr.h"
#include "runtime/runtime-filter-bank.h"
@@ -973,6 +974,8 @@ void QueryState::Cancel() {
if (!is_cancelled_.CompareAndSwap(0, 1)) return;
if (filter_bank_ != nullptr) filter_bank_->Cancel();
for (auto entry: fis_map_) entry.second->Cancel();
+ // Cancel data streams for all fragment instances.
+ ExecEnv::GetInstance()->stream_mgr()->Cancel(query_id());
}
void QueryState::PublishFilter(const PublishFilterParamsPB& params,
RpcContext* context) {