This is an automated email from the ASF dual-hosted git repository. arodoni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6601327af6088def7d880940a5712719fe46acb2 Author: poojanilangekar <pooja.nilange...@cloudera.com> AuthorDate: Thu Jan 24 09:58:17 2019 -0800 IMPALA-8096: Add rows produced limit per query This patch limits the number of rows produced by a query by tracking it at the PlanRootSink level. When the NUM_ROWS_PRODUCED_LIMIT is set, it cancels a query when its execution produces more rows than the specified limit. This limit only applies when the results are returned to a client, e.g. for a SELECT query, but not an INSERT query. Testing: Added tests to query-resource-limits.test to verify that the rows produced limit is honored. Manually tested on various combinations of tables, fileformats and ROWS_RETURNED_LIMIT values. Change-Id: I7b22dbe130a368f4be1f3662a559eb9aae7f0c1d Reviewed-on: http://gerrit.cloudera.org:8080/12328 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/exec/plan-root-sink.cc | 15 ++++++++++++++- be/src/exec/plan-root-sink.h | 6 ++++++ be/src/service/query-options-test.cc | 3 ++- be/src/service/query-options.cc | 11 +++++++++++ be/src/service/query-options.h | 4 +++- common/thrift/ImpalaInternalService.thrift | 3 +++ common/thrift/ImpalaService.thrift | 5 +++++ common/thrift/generate_error_codes.py | 4 ++++ .../queries/QueryTest/query-resource-limits.test | 18 ++++++++++++++++++ 9 files changed, 66 insertions(+), 3 deletions(-) diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index 4bb9693..89e442b 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -34,7 +34,8 @@ namespace impala { PlanRootSink::PlanRootSink( TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state) - : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state) {} + : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state), + num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {} namespace { @@ -69,6 +70,18 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { ValidateCollectionSlots(*row_desc_, batch); int current_batch_row = 0; + // Check to ensure that the number of rows produced by query execution does not exceed + // rows_returned_limit_. Since the PlanRootSink has a single producer, the + // num_rows_returned_ value can be verified without acquiring the lock_. + num_rows_produced_ += batch->num_rows(); + if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) { + Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED, + PrintId(state->query_id()), + PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE)); + VLOG_QUERY << err.msg().msg(); + return err; + } + // Don't enter the loop if batch->num_rows() == 0; no point triggering the consumer with // 0 rows to return. Be wary of ever returning 0-row batches to the client; some poorly // written clients may not cope correctly with them. See IMPALA-4335. diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h index b68fff7..71dec98 100644 --- a/be/src/exec/plan-root-sink.h +++ b/be/src/exec/plan-root-sink.h @@ -118,6 +118,12 @@ class PlanRootSink : public DataSink { /// Set by GetNext() to indicate to Send() how many rows it should write to results_. int num_rows_requested_ = 0; + + /// Updated by Send() to indicate the total number of rows produced by query execution. + int64_t num_rows_produced_ = 0; + + /// Limit on the number of rows produced by this query, initialized by the constructor. + const int64_t num_rows_produced_limit_; }; } diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 059b355..b18b58e 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -251,7 +251,8 @@ TEST(QueryOptions, SetBigIntOptions) { TQueryOptions options; // List of pairs of Key and its valid range pair<OptionDef<int64_t>, Range<int64_t>> case_set[] { - {MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}}, + {MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}}, + {MAKE_OPTIONDEF(num_rows_produced_limit), {0, I64_MAX}}, }; for (const auto& test_case : case_set) { const OptionDef<int64_t>& option_def = test_case.first; diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 177acac..74c178b 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -750,6 +750,17 @@ Status impala::SetQueryOption(const string& key, const string& value, num_remote_executor_candidates); break; } + case TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT: { + StringParser::ParseResult result; + const int64_t num_rows_produced_limit = + StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result); + if (result != StringParser::PARSE_SUCCESS || num_rows_produced_limit < 0) { + return Status(Substitute("Invalid rows returned limit: '$0'. " + "Only non-negative numbers are allowed.", value)); + } + query_options->__set_num_rows_produced_limit(num_rows_produced_limit); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 53aedd6..7ad1ca0 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES + 1);\ + TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -148,6 +148,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,\ TQueryOptionLevel::ADVANCED)\ + QUERY_OPT_FN(num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT,\ + TQueryOptionLevel::ADVANCED)\ ; /// Enforce practical limits on some query options to avoid undesired query state. diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index a790105..3d3e997 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -318,6 +318,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift. // The default value is set to 3 as this is the default value of HDFS replicas. 76: optional i32 num_remote_executor_candidates = 3; + + // See comment in ImpalaService.thrift. + 77: optional i64 num_rows_produced_limit = 0; } // Impala currently has two types of sessions: Beeswax and HiveServer2 diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 0eafd6e..8d5f872 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -367,6 +367,11 @@ enum TImpalaQueryOptions { // cache). If set to 0, the number of executor candidates is unlimited, and remote // ranges will be scheduled across all executors. NUM_REMOTE_EXECUTOR_CANDIDATES, + + // A limit on the number of rows produced by the query. The query will be + // canceled if the query is still executing after this limit is hit. A value + // of 0 means there is no limit on the number of rows produced. + NUM_ROWS_PRODUCED_LIMIT } // The summary of a DML statement. diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 19d6f44..93fed7d 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -393,6 +393,10 @@ error_codes = ( ("CPU_LIMIT_EXCEEDED", 129, "Query $0 terminated due to CPU limit of $1"), ("SCAN_BYTES_LIMIT_EXCEEDED", 130, "Query $0 terminated due to scan bytes limit of $1"), + + ("ROWS_PRODUCED_LIMIT_EXCEEDED", 131, + "Query $0 terminated due to rows produced limit of $1. " + "Unset or increase NUM_ROWS_PRODUCED_LIMIT query option to produce more rows."), ) import sys diff --git a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test index 65abd10..8702da0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test +++ b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test @@ -50,3 +50,21 @@ select sleep(10000) ---- CATCH row_regex:.*expired due to execution time limit of 2s000ms.* ==== +---- QUERY +# Query should fail due to exceeding rows produced limit. +set NUM_ROWS_PRODUCED_LIMIT = 10000; +select * from tpch.lineitem; +---- CATCH +row_regex:.*terminated due to rows produced limit of 10000.* +==== +---- QUERY +# The query should succeed when it doesn't exceed the rows produced limit. +set NUM_ROWS_PRODUCED_LIMIT = 3; +select * from functional.tinytable; +---- TYPES +STRING,STRING +---- RESULTS +'aaaaaaa','bbbbbbb' +'ccccc','dddd' +'eeeeeeee','f' +====