IMPALA-4925: Cancel finstance if query has finished This patch is a partial fix for the issue where an finst would not detect that it should cancel if the query limit had not been hit. It changes the UpdateExecStatus() RPC to return a cancelled status to an finst if the query has finished because it hit a limit.
For certain queries, this allows them to finish much more quickly than they otherwise would. However, there's still a few-second delay for the finst to pick up the cancellation signal, because there UpdateExecStatus() RPC is only called every few seconds. A complete fix would also call CancelInternal() when returned_all_results_ was set to true. That would be a much larger change. The improvement here is to bound the delay between query completion and fragment teardown to a few seconds. Change-Id: I59f45e64978c9ab9914b5c33e86009960b4a88c4 Reviewed-on: http://gerrit.cloudera.org:8080/5987 Tested-by: Impala Public Jenkins Reviewed-by: Henry Robinson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5bb48ed7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5bb48ed7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5bb48ed7 Branch: refs/heads/master Commit: 5bb48ed71dc8272fdabac45a33b515cdd0d5f12d Parents: db5103d Author: Henry Robinson <[email protected]> Authored: Mon Feb 13 15:01:44 2017 -0800 Committer: Henry Robinson <[email protected]> Committed: Wed Jul 19 17:01:01 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/coordinator.cc | 4 ++++ tests/query_test/test_lifecycle.py | 27 ++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb48ed7/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 2bfe1b5..f904c48 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -977,6 +977,10 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param return Status::OK(); } + // If all results have been returned, return a cancelled status to force the fragment + // instance to stop executing. + if (!done && returned_all_results_) return Status::CANCELLED; + if (done) { lock_guard<mutex> l(lock_); DCHECK_GT(num_remaining_backends_, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb48ed7/tests/query_test/test_lifecycle.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_lifecycle.py b/tests/query_test/test_lifecycle.py index 2bd4476..7ed8f90 100644 --- a/tests/query_test/test_lifecycle.py +++ b/tests/query_test/test_lifecycle.py @@ -16,12 +16,15 @@ # under the License. import pytest +import time from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.impala_cluster import ImpalaCluster from tests.verifiers.metric_verifier import MetricVerifier -class TestFragmentLifecycle(ImpalaTestSuite): +# TODO: Debug actions leak into other tests in the same suite (if not explicitly +# unset). Ensure they get unset between tests. +class TestFragmentLifecycleWithDebugActions(ImpalaTestSuite): """Using the debug action interface, check that failed queries correctly clean up *all* fragments""" @@ -67,3 +70,25 @@ class TestFragmentLifecycle(ImpalaTestSuite): # # TODO: Fix when we have cancellable RPCs. v.wait_for_metric(self.IN_FLIGHT_FRAGMENTS, 0, timeout=125) + +class TestFragmentLifecycle(ImpalaTestSuite): + def test_finst_cancel_when_query_complete(self): + """Regression test for IMPALA-4295: if a query returns all its rows before all its + finsts have completed, it should cancel the finsts and complete promptly.""" + now = time.time() + + # Query designed to produce 1024 (the limit) rows very quickly from the first union + # child, but the second one takes a very long time to complete. Without fix for + # IMPALA-4295, the whole query waits for the second child to complete. + + # Due to IMPALA-5671, the limit must be a multiple of the row batch size - if it's + # reached during production of a row batch, processing moves to the second child, and + # the query will take a long time complete. + self.client.execute("with l as (select 1 from functional.alltypes), r as" + " (select count(*) from tpch_parquet.lineitem a cross join tpch_parquet.lineitem b)" + "select * from l union all (select * from r) LIMIT 1024") + end = time.time() + + # Query typically completes in < 2s, but if cross join is fully evaluated, will take > + # 10 minutes. Pick 2 minutes as a reasonable midpoint to avoid false negatives. + assert end - now < 120, "Query took too long to complete: " + duration + "s"
