IMPALA-5199: prevent hang on empty row batch exchange The error path where delivery of "eos" fails now behaves the same as if delivery of a row batch fails.
Testing: Added a timeout test where the leaf fragments return 0 rows. Before the change this reproduced the hang. Change-Id: Ib370ebe44e3bb34d3f0fb9f05aa6386eb91c8645 Reviewed-on: http://gerrit.cloudera.org:8080/8005 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5119ced5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5119ced5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5119ced5 Branch: refs/heads/master Commit: 5119ced50c0e0c4001621c9d4da598c187bdb580 Parents: 491822f Author: Tim Armstrong <[email protected]> Authored: Thu Sep 7 16:28:46 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Sep 16 00:50:07 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/data-stream-mgr.cc | 17 +++++++++++++---- .../queries/QueryTest/exchange-delays.test | 10 ++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/be/src/runtime/data-stream-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc index a161ad3..9af8384 100644 --- a/be/src/runtime/data-stream-mgr.cc +++ b/be/src/runtime/data-stream-mgr.cc @@ -197,10 +197,19 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int sender_id) { VLOG_FILE << "CloseSender(): fragment_instance_id=" << fragment_instance_id << ", node=" << dest_node_id; - bool unused; + Status status; + bool already_unregistered; shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, dest_node_id, - &unused); - if (recvr.get() != NULL) recvr->RemoveSender(sender_id); + &already_unregistered); + if (recvr == nullptr) { + // Was not able to notify the receiver that this was the end of stream. Notify the + // sender that this failed so that they can take appropriate action (i.e. failing + // the query). + status = already_unregistered ? Status::OK() : + Status(TErrorCode::DATASTREAM_SENDER_TIMEOUT, PrintId(fragment_instance_id)); + } else { + recvr->RemoveSender(sender_id); + } { // Remove any closed streams that have been in the cache for more than @@ -221,7 +230,7 @@ Status DataStreamMgr::CloseSender(const TUniqueId& fragment_instance_id, << PrettyPrinter::Print(MonotonicMillis() - now, TUnit::TIME_MS); } } - return Status::OK(); + return status; } Status DataStreamMgr::DeregisterRecvr( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5119ced5/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test index 0dac1d9..b1f6f75 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test +++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test @@ -8,3 +8,13 @@ from tpch.lineitem ---- CATCH Sender timed out waiting for receiver fragment instance ==== +---- QUERY +# IMPALA-5199: Query with zero rows sent over exchange. +select l_orderkey, count(*) +from tpch.lineitem +where l_linenumber = -1 +group by l_orderkey +---- RESULTS +---- CATCH +Sender timed out waiting for receiver fragment instance +====
