This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 4099a60  IMPALA-10317: Add query option that limits huge joins at 
runtime
4099a60 is described below

commit 4099a606892c377b9e8c9c6df2a45a7d42afcaea
Author: Fucun Chu <[email protected]>
AuthorDate: Tue Nov 10 16:29:59 2020 +0800

    IMPALA-10317: Add query option that limits huge joins at runtime
    
    This patch adds support for limiting the rows produced by a join node
    such that runaway join queries can be prevented.
    
    The limit is specified by a query option. Queries exceeding that limit
    get terminated. The checking runs periodically, so the actual rows
    produced may go somewhat over the limit.
    
    JOIN_ROWS_PRODUCED_LIMIT is exposed as an advanced query option.
    
    Rows produced Query profile is updated to include query wide and per
    backend metrics for RowsReturned. Example from "
    set JOIN_ROWS_PRODUCED_LIMIT = 10000000;
    select count(*) from tpch_parquet.lineitem l1 cross join
    (select * from tpch_parquet.lineitem l2 limit 5) l3;":
    
    NESTED_LOOP_JOIN_NODE (id=2):
       - InactiveTotalTime: 107.534ms
       - PeakMemoryUsage: 16.00 KB (16384)
       - ProbeRows: 1.02K (1024)
       - ProbeTime: 0.000ns
       - RowsReturned: 10.00M (10002025)
       - RowsReturnedRate: 749.58 K/sec
       - TotalTime: 13s337ms
    
    Testing:
     Added tests for JOIN_ROWS_PRODUCED_LIMIT
    
    Change-Id: Idbca7e053b61b4e31b066edcfb3b0398fa859d02
    Reviewed-on: http://gerrit.cloudera.org:8080/16706
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/coordinator-backend-state.cc        |  4 ++
 be/src/runtime/coordinator.h                       | 15 +++++
 be/src/runtime/fragment-instance-state.cc          | 13 +++-
 be/src/runtime/fragment-instance-state.h           |  7 ++
 be/src/runtime/query-state.cc                      |  5 ++
 be/src/service/impala-server.cc                    | 24 ++++++-
 be/src/service/query-options-test.cc               |  1 +
 be/src/service/query-options.cc                    | 11 ++++
 be/src/service/query-options.h                     |  4 +-
 common/protobuf/control_service.proto              |  3 +
 common/thrift/ImpalaInternalService.thrift         |  2 +
 common/thrift/ImpalaService.thrift                 |  4 ++
 common/thrift/generate_error_codes.py              |  5 ++
 .../queries/QueryTest/query-resource-limits.test   | 74 ++++++++++++++++++++++
 14 files changed, 167 insertions(+), 5 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc 
b/be/src/runtime/coordinator-backend-state.cc
index df75f35..58fd87c 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -500,6 +500,10 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   backend_utilization_.bytes_read = backend_exec_status.bytes_read();
   backend_utilization_.exchange_bytes_sent = 
backend_exec_status.exchange_bytes_sent();
   backend_utilization_.scan_bytes_sent = backend_exec_status.scan_bytes_sent();
+  std::map<int32_t, int64_t> per_join_rows_produced(
+      backend_exec_status.per_join_rows_produced().begin(),
+      backend_exec_status.per_join_rows_produced().end());
+  backend_utilization_.per_join_rows_produced = per_join_rows_produced;
 
   // Only merge fragment stats if they are newer than that last report 
received.
   if (backend_exec_status.backend_report_seq_no() <= 
last_backend_report_seq_no_) {
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index b4a4216..36168c2 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -37,6 +37,7 @@
 #include "util/promise.h"
 #include "util/runtime-profile-counters.h"
 #include "util/spinlock.h"
+#include "util/container-util.h"
 
 namespace kudu {
 namespace rpc {
@@ -226,6 +227,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
     /// Total system cpu consumed.
     int64_t cpu_sys_ns = 0;
 
+    /// Total num rows produced by each join node. The key is join node id.
+    std::map<int32_t, int64_t> per_join_rows_produced;
+
     /// Merge utilization from 'other' into this.
     void Merge(const ResourceUtilization& other) {
       peak_per_host_mem_consumption =
@@ -235,6 +239,17 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
       scan_bytes_sent += other.scan_bytes_sent;
       cpu_user_ns += other.cpu_user_ns;
       cpu_sys_ns += other.cpu_sys_ns;
+      MergeMapValues(other.per_join_rows_produced, &per_join_rows_produced);
+    }
+
+    /// Max join rows produced across join nodes
+    const std::pair<int32_t, int64_t> MaxJoinNodeRowsProduced() const {
+      std::pair<int32_t, int64_t> entryWithMaxValue = std::make_pair(0, 0);
+      for (const auto& entry : per_join_rows_produced) {
+        if (entry.second > entryWithMaxValue.second)
+          entryWithMaxValue = std::make_pair(entry.first, entry.second);
+      }
+      return entryWithMaxValue;
     }
   };
 
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 61cd7cc..1de34d6 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -305,6 +305,7 @@ void 
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
   int64_t bytes_read = 0;
   int64_t scan_ranges_complete = 0;
   int64_t total_bytes_sent = 0;
+  std::map<int32_t, int64_t> per_join_rows_produced;
   for (RuntimeProfileBase* node : nodes) {
     RuntimeProfile::Counter* c = node->GetCounter(PROFILE_BytesRead.name());
     if (c != nullptr) bytes_read += c->value();
@@ -325,7 +326,16 @@ void 
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
       }
       RuntimeProfile::Counter* rows_counter = node->GetCounter("RowsReturned");
       RuntimeProfile::Counter* mem_counter = 
node->GetCounter("PeakMemoryUsage");
-      if (rows_counter != nullptr) 
summary_data->set_rows_returned(rows_counter->value());
+      if (rows_counter != nullptr) {
+        summary_data->set_rows_returned(rows_counter->value());
+        // row count stats for a join node
+        string hash_type = PrintThriftEnum(TPlanNodeType::HASH_JOIN_NODE);
+        string nested_loop_type = 
PrintThriftEnum(TPlanNodeType::NESTED_LOOP_JOIN_NODE);
+        if (node->name().rfind(hash_type, 0) == 0
+            || node->name().rfind(nested_loop_type, 0) == 0) {
+          per_join_rows_produced[node->metadata().plan_node_id] = 
rows_counter->value();
+        }
+      }
       if (mem_counter != nullptr) 
summary_data->set_peak_mem_usage(mem_counter->value());
       summary_data->set_local_time_ns(node->local_time());
     }
@@ -333,6 +343,7 @@ void 
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
   bytes_read_ = bytes_read;
   scan_ranges_complete_ = scan_ranges_complete;
   total_bytes_sent_  = total_bytes_sent;
+  per_join_rows_produced_ = per_join_rows_produced;
 
   // Send the DML stats if this is the final report.
   if (done) {
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index a4cf413..58b5a4b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -149,6 +149,9 @@ class FragmentInstanceState {
   int64_t cpu_sys_ns() const { return cpu_sys_ns_; }
   int64_t bytes_read() const { return bytes_read_; }
   int64_t total_bytes_sent() const { return total_bytes_sent_; }
+  const std::map<int32_t, int64_t>& per_join_rows_produced() const {
+    return per_join_rows_produced_;
+  }
 
   /// Returns true if the current thread is a thread executing the whole or 
part of
   /// a fragment instance.
@@ -213,6 +216,10 @@ class FragmentInstanceState {
   /// Total bytes sent on exchanges in this backend. Set in GetStatusReport().
   int64_t total_bytes_sent_ = 0;
 
+  /// For each join node, sum of RowsReturned counters on this backend.
+  /// Set in GetStatusReport().
+  std::map<int32_t, int64_t> per_join_rows_produced_;
+
   /// Profile for timings for each stage of the plan fragment instance's 
lifecycle.
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index aff7310..fae3c1c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -540,6 +540,7 @@ void QueryState::ConstructReport(bool instances_started,
     int64_t scan_ranges_complete = 0;
     int64_t exchange_bytes_sent = 0;
     int64_t scan_bytes_sent = 0;
+    std::map<int32_t, int64_t> per_join_rows_produced;
 
     for (const auto& entry : fis_map_) {
       FragmentInstanceState* fis = entry.second;
@@ -580,6 +581,7 @@ void QueryState::ConstructReport(bool instances_started,
       } else {
         exchange_bytes_sent += fis->total_bytes_sent();
       }
+      MergeMapValues(fis->per_join_rows_produced(), &per_join_rows_produced);
     }
 
     // Construct the per-fragment status reports, including runtime profiles.
@@ -604,6 +606,9 @@ void QueryState::ConstructReport(bool instances_started,
     report->set_scan_ranges_complete(scan_ranges_complete);
     report->set_exchange_bytes_sent(exchange_bytes_sent);
     report->set_scan_bytes_sent(scan_bytes_sent);
+    for (const auto& entry : per_join_rows_produced) {
+      (*report->mutable_per_join_rows_produced())[entry.first] = entry.second;
+    }
   }
 }
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8b1ca2f..347c9ba 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1274,8 +1274,10 @@ Status ImpalaServer::SetQueryInflight(
   int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
   int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
   int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
-  if (idle_timeout_s > 0 || exec_time_limit_s > 0 ||
-        cpu_limit_s > 0 || scan_bytes_limit > 0) {
+  int64_t join_rows_produced_limit =
+      query_handle->query_options().join_rows_produced_limit;
+  if (idle_timeout_s > 0 || exec_time_limit_s > 0 || cpu_limit_s > 0
+      || scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
     lock_guard<mutex> l2(query_expiration_lock_);
     int64_t now = UnixMillis();
     if (idle_timeout_s > 0) {
@@ -1290,7 +1292,7 @@ Status ImpalaServer::SetQueryInflight(
       queries_by_timestamp_.emplace(ExpirationEvent{
           now + (1000L * exec_time_limit_s), query_id, 
ExpirationKind::EXEC_TIME_LIMIT});
     }
-    if (cpu_limit_s > 0 || scan_bytes_limit > 0) {
+    if (cpu_limit_s > 0 || scan_bytes_limit > 0 || join_rows_produced_limit > 
0) {
       if (cpu_limit_s > 0) {
         VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
                    << PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S);
@@ -1299,6 +1301,10 @@ Status ImpalaServer::SetQueryInflight(
         VLOG_QUERY << "Query " << PrintId(query_id) << " has scan bytes limit 
of "
                    << PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES);
       }
+      if (join_rows_produced_limit > 0) {
+        VLOG_QUERY << "Query " << PrintId(query_id) << " has join rows 
produced limit of "
+                   << PrettyPrinter::Print(join_rows_produced_limit, 
TUnit::UNIT);
+      }
       queries_by_timestamp_.emplace(ExpirationEvent{
           now + EXPIRATION_CHECK_INTERVAL_MS, query_id, 
ExpirationKind::RESOURCE_LIMIT});
     }
@@ -2673,6 +2679,18 @@ Status 
ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
     VLOG_QUERY << err.msg().msg();
     return err;
   }
+
+  auto& max_join_node_entry = utilization.MaxJoinNodeRowsProduced();
+  int32_t join_node_id = max_join_node_entry.first;
+  int64_t join_rows_produced = max_join_node_entry.second;
+  int64_t join_rows_produced_limit = 
crs->query_options().join_rows_produced_limit;
+  if (join_rows_produced_limit > 0 && join_rows_produced > 
join_rows_produced_limit) {
+    Status err = 
Status::Expected(TErrorCode::JOIN_ROWS_PRODUCED_LIMIT_EXCEEDED,
+        PrintId(crs->query_id()),
+        PrettyPrinter::Print(join_rows_produced_limit, TUnit::UNIT), 
join_node_id);
+    VLOG_QUERY << err.msg().msg();
+    return err;
+  }
   // Query is within the resource limits, check again later.
   return Status::OK();
 }
diff --git a/be/src/service/query-options-test.cc 
b/be/src/service/query-options-test.cc
index c1b51fd..979f4d5 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -279,6 +279,7 @@ TEST(QueryOptions, SetBigIntOptions) {
   pair<OptionDef<int64_t>, Range<int64_t>> case_set[] {
       {MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}},
       {MAKE_OPTIONDEF(num_rows_produced_limit), {0, I64_MAX}},
+      {MAKE_OPTIONDEF(join_rows_produced_limit), {0, I64_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int64_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c986292..96c8809 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1006,6 +1006,17 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_broadcast_to_partition_factor(val);
         break;
       }
+      case TImpalaQueryOptions::JOIN_ROWS_PRODUCED_LIMIT: {
+        StringParser::ParseResult result;
+        const int64_t join_rows_produced_limit =
+            StringParser::StringToInt<int64_t>(value.c_str(), value.length(), 
&result);
+        if (result != StringParser::PARSE_SUCCESS || join_rows_produced_limit 
< 0) {
+          return Status(Substitute("Invalid join rows produced limit: '$0'. "
+                                   "Only non-negative numbers are allowed.", 
value));
+        }
+        
query_options->__set_join_rows_produced_limit(join_rows_produced_limit);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 9abd042..e23805c 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR + 1);\
+      TImpalaQueryOptions::JOIN_ROWS_PRODUCED_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -229,6 +229,8 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT,\
+      TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/protobuf/control_service.proto 
b/common/protobuf/control_service.proto
index 13e6008..14beea3 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -256,6 +256,9 @@ message ReportExecStatusRequestPB {
   // disregarded if the sequence number of this report is less than or equal to
   // the previous report received.
   optional int64 backend_report_seq_no = 15;
+
+  // For each join node, sum of RowsReturned counters on this backend.
+  map<int32, int64> per_join_rows_produced = 16;
 }
 
 message ReportExecStatusResponsePB {
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index d61b9eb..6b63b97 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -475,6 +475,8 @@ struct TQueryOptions {
   // See comment in ImpalaService.thrift
   119: optional double broadcast_to_partition_factor = 1.0;
 
+  // See comment in ImpalaService.thrift
+  120: optional i64 join_rows_produced_limit = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index f553d76..0561ae5 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -613,6 +613,10 @@ enum TImpalaQueryOptions {
   // favor partition distribution.
   BROADCAST_TO_PARTITION_FACTOR = 118
 
+  // A limit on the number of join rows produced by the query. The query will 
be
+  // canceled if the query is still executing after this limit is hit. A value
+  // of 0 means there is no limit on the number of join rows produced.
+  JOIN_ROWS_PRODUCED_LIMIT = 119
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index f0f9505..5c38a40 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -463,6 +463,11 @@ error_codes = (
   # Note: impala_shell uses a regex to search for this specific error message, 
so
   # changing it may break older shell version.
   ("INVALID_QUERY_HANDLE", 150, "Invalid or unknown query handle: $0."),
+
+  ("JOIN_ROWS_PRODUCED_LIMIT_EXCEEDED", 151,
+   "Query $0 terminated due to join rows produced exceeds the limit of $1 "
+   "at node with id $2. Unset or increase JOIN_ROWS_PRODUCED_LIMIT query 
option "
+   "to produce more rows."),
 )
 
 import sys
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
 
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
index 8702da0..fa7554a 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
@@ -68,3 +68,77 @@ STRING,STRING
 'ccccc','dddd'
 'eeeeeeee','f'
 ====
+---- QUERY
+# Query should fail due to exceeding hash join rows produced limit.
+# Added a sleep to make sure it runs long enough to get a non-zero update from 
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 1000;
+select sleep(10000) union select count(*) from tpch.lineitem l1,tpch.lineitem 
l2,
+tpch.lineitem l3 where l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = 
l2.l_orderkey
+and l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment having 
count(*) = 99
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 1.00K.*
+====
+---- QUERY
+# Query should fail due to exceeding nested loop join rows produced limit.
+# Added a sleep to make sure it runs long enough to get a non-zero update from 
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 500;
+select sleep(10000) union select count(*) from functional.alltypestiny t1 left 
outer join
+functional.alltypessmall t2 on t1.int_col < t2.int_col;
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 500.*
+====
+---- QUERY
+# If one of the mixed joins exceeds the row produced limit, the query should 
fail.
+# Added a sleep to make sure it runs long enough to get a non-zero update from 
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 10000;
+select sleep(10000) union select count(*) from (
+select
+  *
+from (
+  select
+    ps_partkey,
+    sum(ps_supplycost * ps_availqty) as value
+  from
+    tpch_parquet.partsupp,
+    tpch_parquet.supplier,
+    tpch_parquet.nation
+  where
+    ps_suppkey = s_suppkey
+    and s_nationkey = n_nationkey
+    and n_name = 'GERMANY'
+  group by
+    ps_partkey
+) as inner_query
+where
+  value > (
+    select
+      sum(ps_supplycost * ps_availqty) * 0.0001
+    from
+      tpch_parquet.partsupp,
+      tpch_parquet.supplier,
+      tpch_parquet.nation
+    where
+      ps_suppkey = s_suppkey
+      and s_nationkey = n_nationkey
+      and n_name = 'GERMANY'
+  )
+order by
+  value desc
+) t;
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 10.00k*
+====
+---- QUERY
+# The query should succeed when it doesn't exceed the join rows produced limit.
+set JOIN_ROWS_PRODUCED_LIMIT = 10000;
+select count(*) from alltypessmall a inner join alltypessmall b
+on (a.timestamp_col = b.timestamp_col)
+where a.year=2009 and a.month=1 and b.year=2009 and b.month=1;
+---- TYPES
+BIGINT
+---- RESULTS
+25
+====

Reply via email to