This is an automated email from the ASF dual-hosted git repository.

arodoni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6601327af6088def7d880940a5712719fe46acb2
Author: poojanilangekar <pooja.nilange...@cloudera.com>
AuthorDate: Thu Jan 24 09:58:17 2019 -0800

    IMPALA-8096: Add rows produced limit per query
    
    This patch limits the number of rows produced by a query by
    tracking it at the PlanRootSink level. When the
    NUM_ROWS_PRODUCED_LIMIT is set, it cancels a query when its
    execution produces more rows than the specified limit. This limit
    only applies when the results are returned to a client, e.g. for a
    SELECT query, but not an INSERT query.
    
    Testing:
    Added tests to query-resource-limits.test to verify that the rows
    produced limit is honored.
    Manually tested on various combinations of tables, fileformats
    and ROWS_RETURNED_LIMIT values.
    
    Change-Id: I7b22dbe130a368f4be1f3662a559eb9aae7f0c1d
    Reviewed-on: http://gerrit.cloudera.org:8080/12328
    Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/exec/plan-root-sink.cc                          | 15 ++++++++++++++-
 be/src/exec/plan-root-sink.h                           |  6 ++++++
 be/src/service/query-options-test.cc                   |  3 ++-
 be/src/service/query-options.cc                        | 11 +++++++++++
 be/src/service/query-options.h                         |  4 +++-
 common/thrift/ImpalaInternalService.thrift             |  3 +++
 common/thrift/ImpalaService.thrift                     |  5 +++++
 common/thrift/generate_error_codes.py                  |  4 ++++
 .../queries/QueryTest/query-resource-limits.test       | 18 ++++++++++++++++++
 9 files changed, 66 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 4bb9693..89e442b 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -34,7 +34,8 @@ namespace impala {
 
 PlanRootSink::PlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state) {}
+  : DataSink(sink_id, row_desc, "PLAN_ROOT_SINK", state),
+    num_rows_produced_limit_(state->query_options().num_rows_produced_limit) {}
 
 namespace {
 
@@ -69,6 +70,18 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* 
batch) {
   ValidateCollectionSlots(*row_desc_, batch);
   int current_batch_row = 0;
 
+  // Check to ensure that the number of rows produced by query execution does 
not exceed
+  // rows_returned_limit_. Since the PlanRootSink has a single producer, the
+  // num_rows_returned_ value can be verified without acquiring the lock_.
+  num_rows_produced_ += batch->num_rows();
+  if (num_rows_produced_limit_ > 0 && num_rows_produced_ > 
num_rows_produced_limit_) {
+    Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
+        PrintId(state->query_id()),
+        PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
+    VLOG_QUERY << err.msg().msg();
+    return err;
+  }
+
   // Don't enter the loop if batch->num_rows() == 0; no point triggering the 
consumer with
   // 0 rows to return. Be wary of ever returning 0-row batches to the client; 
some poorly
   // written clients may not cope correctly with them. See IMPALA-4335.
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index b68fff7..71dec98 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -118,6 +118,12 @@ class PlanRootSink : public DataSink {
 
   /// Set by GetNext() to indicate to Send() how many rows it should write to 
results_.
   int num_rows_requested_ = 0;
+
+  /// Updated by Send() to indicate the total number of rows produced by query 
execution.
+  int64_t num_rows_produced_ = 0;
+
+  /// Limit on the number of rows produced by this query, initialized by the 
constructor.
+  const int64_t num_rows_produced_limit_;
 };
 }
 
diff --git a/be/src/service/query-options-test.cc 
b/be/src/service/query-options-test.cc
index 059b355..b18b58e 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -251,7 +251,8 @@ TEST(QueryOptions, SetBigIntOptions) {
   TQueryOptions options;
   // List of pairs of Key and its valid range
   pair<OptionDef<int64_t>, Range<int64_t>> case_set[] {
-      {MAKE_OPTIONDEF(cpu_limit_s),  {0, I64_MAX}},
+      {MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}},
+      {MAKE_OPTIONDEF(num_rows_produced_limit), {0, I64_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int64_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 177acac..74c178b 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -750,6 +750,17 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
             num_remote_executor_candidates);
         break;
       }
+      case TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT: {
+        StringParser::ParseResult result;
+        const int64_t num_rows_produced_limit =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), 
&result);
+        if (result != StringParser::PARSE_SUCCESS || num_rows_produced_limit < 
0) {
+          return Status(Substitute("Invalid rows returned limit: '$0'. "
+                                   "Only non-negative numbers are allowed.", 
value));
+        }
+        query_options->__set_num_rows_produced_limit(num_rows_produced_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 53aedd6..7ad1ca0 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::NUM_REMOTE_EXECUTOR_CANDIDATES + 1);\
+      TImpalaQueryOptions::NUM_ROWS_PRODUCED_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -148,6 +148,8 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, 
TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index a790105..3d3e997 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -318,6 +318,9 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift.
   // The default value is set to 3 as this is the default value of HDFS 
replicas.
   76: optional i32 num_remote_executor_candidates = 3;
+
+  // See comment in ImpalaService.thrift.
+  77: optional i64 num_rows_produced_limit = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 0eafd6e..8d5f872 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -367,6 +367,11 @@ enum TImpalaQueryOptions {
   // cache). If set to 0, the number of executor candidates is unlimited, and 
remote
   // ranges will be scheduled across all executors.
   NUM_REMOTE_EXECUTOR_CANDIDATES,
+
+  // A limit on the number of rows produced by the query. The query will be
+  // canceled if the query is still executing after this limit is hit. A value
+  // of 0 means there is no limit on the number of rows produced.
+  NUM_ROWS_PRODUCED_LIMIT
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 19d6f44..93fed7d 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -393,6 +393,10 @@ error_codes = (
   ("CPU_LIMIT_EXCEEDED", 129, "Query $0 terminated due to CPU limit of $1"),
 
   ("SCAN_BYTES_LIMIT_EXCEEDED", 130, "Query $0 terminated due to scan bytes 
limit of $1"),
+
+  ("ROWS_PRODUCED_LIMIT_EXCEEDED", 131,
+   "Query $0 terminated due to rows produced limit of $1. "
+   "Unset or increase NUM_ROWS_PRODUCED_LIMIT query option to produce more 
rows."),
 )
 
 import sys
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
 
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
index 65abd10..8702da0 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
@@ -50,3 +50,21 @@ select sleep(10000)
 ---- CATCH
 row_regex:.*expired due to execution time limit of 2s000ms.*
 ====
+---- QUERY
+# Query should fail due to exceeding rows produced limit.
+set NUM_ROWS_PRODUCED_LIMIT = 10000;
+select * from tpch.lineitem;
+---- CATCH
+row_regex:.*terminated due to rows produced limit of 10000.*
+====
+---- QUERY
+# The query should succeed when it doesn't exceed the rows produced limit.
+set NUM_ROWS_PRODUCED_LIMIT = 3;
+select * from functional.tinytable;
+---- TYPES
+STRING,STRING
+---- RESULTS
+'aaaaaaa','bbbbbbb'
+'ccccc','dddd'
+'eeeeeeee','f'
+====

Reply via email to