IMPALA-6144: PublishFilter() continues to run after query failure/cancellation
The CoordinatorBackendState::PublishFilter() function does not check for query failure/cancellation. So if runtime filters are being published during/after a failure, they will not be cancelled and still be sent out which may take a while depending on the size of the cluster. Also, these functions could potentially hold very large amounts of untracked memory. This patch fixes it by checking for cancellation/failure in PublishFilter. Change-Id: I400456ad85adb9c23d2d432d772311fa4dcff2ed Reviewed-on: http://gerrit.cloudera.org:8080/8455 Reviewed-by: Bharath Vissapragada <[email protected]> Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e6c3a01b Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e6c3a01b Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e6c3a01b Branch: refs/heads/master Commit: e6c3a01b948777affaee3dd004a526f5e4dcd5a0 Parents: ca680b6 Author: Sailesh Mukil <[email protected]> Authored: Thu Nov 2 21:54:05 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Nov 4 02:08:57 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator-backend-state.cc | 7 +++++++ be/src/runtime/coordinator.cc | 2 ++ 2 files changed, 9 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e6c3a01b/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index a62d8cc..12689e0 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -391,6 +391,13 @@ bool Coordinator::BackendState::Cancel() { void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) { DCHECK_EQ(rpc_params.dst_query_id, query_id_); + { + // If the backend is already done, it's not waiting for this filter, so we skip + // sending it in this case. + lock_guard<mutex> l(lock_); + if (IsDone()) return; + } + if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return; Status status; ImpalaBackendConnection backend_client( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e6c3a01b/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 4a131b9..d18d658 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -1125,6 +1125,8 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { // Assign outgoing bloom filter. TBloomFilter& aggregated_filter = state->bloom_filter(); filter_mem_tracker_->Release(aggregated_filter.directory.size()); + + // TODO: Track memory used by 'rpc_params'. swap(rpc_params.bloom_filter, aggregated_filter); DCHECK(rpc_params.bloom_filter.always_false || rpc_params.bloom_filter.always_true || !rpc_params.bloom_filter.directory.empty());
