This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 4099a60 IMPALA-10317: Add query option that limits huge joins at
runtime
4099a60 is described below
commit 4099a606892c377b9e8c9c6df2a45a7d42afcaea
Author: Fucun Chu <[email protected]>
AuthorDate: Tue Nov 10 16:29:59 2020 +0800
IMPALA-10317: Add query option that limits huge joins at runtime
This patch adds support for limiting the rows produced by a join node
such that runaway join queries can be prevented.
The limit is specified by a query option. Queries exceeding that limit
get terminated. The checking runs periodically, so the actual rows
produced may go somewhat over the limit.
JOIN_ROWS_PRODUCED_LIMIT is exposed as an advanced query option.
Rows produced Query profile is updated to include query wide and per
backend metrics for RowsReturned. Example from "
set JOIN_ROWS_PRODUCED_LIMIT = 10000000;
select count(*) from tpch_parquet.lineitem l1 cross join
(select * from tpch_parquet.lineitem l2 limit 5) l3;":
NESTED_LOOP_JOIN_NODE (id=2):
- InactiveTotalTime: 107.534ms
- PeakMemoryUsage: 16.00 KB (16384)
- ProbeRows: 1.02K (1024)
- ProbeTime: 0.000ns
- RowsReturned: 10.00M (10002025)
- RowsReturnedRate: 749.58 K/sec
- TotalTime: 13s337ms
Testing:
Added tests for JOIN_ROWS_PRODUCED_LIMIT
Change-Id: Idbca7e053b61b4e31b066edcfb3b0398fa859d02
Reviewed-on: http://gerrit.cloudera.org:8080/16706
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/runtime/coordinator-backend-state.cc | 4 ++
be/src/runtime/coordinator.h | 15 +++++
be/src/runtime/fragment-instance-state.cc | 13 +++-
be/src/runtime/fragment-instance-state.h | 7 ++
be/src/runtime/query-state.cc | 5 ++
be/src/service/impala-server.cc | 24 ++++++-
be/src/service/query-options-test.cc | 1 +
be/src/service/query-options.cc | 11 ++++
be/src/service/query-options.h | 4 +-
common/protobuf/control_service.proto | 3 +
common/thrift/ImpalaInternalService.thrift | 2 +
common/thrift/ImpalaService.thrift | 4 ++
common/thrift/generate_error_codes.py | 5 ++
.../queries/QueryTest/query-resource-limits.test | 74 ++++++++++++++++++++++
14 files changed, 167 insertions(+), 5 deletions(-)
diff --git a/be/src/runtime/coordinator-backend-state.cc
b/be/src/runtime/coordinator-backend-state.cc
index df75f35..58fd87c 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -500,6 +500,10 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
backend_utilization_.bytes_read = backend_exec_status.bytes_read();
backend_utilization_.exchange_bytes_sent =
backend_exec_status.exchange_bytes_sent();
backend_utilization_.scan_bytes_sent = backend_exec_status.scan_bytes_sent();
+ std::map<int32_t, int64_t> per_join_rows_produced(
+ backend_exec_status.per_join_rows_produced().begin(),
+ backend_exec_status.per_join_rows_produced().end());
+ backend_utilization_.per_join_rows_produced = per_join_rows_produced;
// Only merge fragment stats if they are newer than that last report
received.
if (backend_exec_status.backend_report_seq_no() <=
last_backend_report_seq_no_) {
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index b4a4216..36168c2 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -37,6 +37,7 @@
#include "util/promise.h"
#include "util/runtime-profile-counters.h"
#include "util/spinlock.h"
+#include "util/container-util.h"
namespace kudu {
namespace rpc {
@@ -226,6 +227,9 @@ class Coordinator { // NOLINT: The member variables could
be re-ordered to save
/// Total system cpu consumed.
int64_t cpu_sys_ns = 0;
+ /// Total num rows produced by each join node. The key is join node id.
+ std::map<int32_t, int64_t> per_join_rows_produced;
+
/// Merge utilization from 'other' into this.
void Merge(const ResourceUtilization& other) {
peak_per_host_mem_consumption =
@@ -235,6 +239,17 @@ class Coordinator { // NOLINT: The member variables could
be re-ordered to save
scan_bytes_sent += other.scan_bytes_sent;
cpu_user_ns += other.cpu_user_ns;
cpu_sys_ns += other.cpu_sys_ns;
+ MergeMapValues(other.per_join_rows_produced, &per_join_rows_produced);
+ }
+
+ /// Max join rows produced across join nodes
+ const std::pair<int32_t, int64_t> MaxJoinNodeRowsProduced() const {
+ std::pair<int32_t, int64_t> entryWithMaxValue = std::make_pair(0, 0);
+ for (const auto& entry : per_join_rows_produced) {
+ if (entry.second > entryWithMaxValue.second)
+ entryWithMaxValue = std::make_pair(entry.first, entry.second);
+ }
+ return entryWithMaxValue;
}
};
diff --git a/be/src/runtime/fragment-instance-state.cc
b/be/src/runtime/fragment-instance-state.cc
index 61cd7cc..1de34d6 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -305,6 +305,7 @@ void
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
int64_t bytes_read = 0;
int64_t scan_ranges_complete = 0;
int64_t total_bytes_sent = 0;
+ std::map<int32_t, int64_t> per_join_rows_produced;
for (RuntimeProfileBase* node : nodes) {
RuntimeProfile::Counter* c = node->GetCounter(PROFILE_BytesRead.name());
if (c != nullptr) bytes_read += c->value();
@@ -325,7 +326,16 @@ void
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
}
RuntimeProfile::Counter* rows_counter = node->GetCounter("RowsReturned");
RuntimeProfile::Counter* mem_counter =
node->GetCounter("PeakMemoryUsage");
- if (rows_counter != nullptr)
summary_data->set_rows_returned(rows_counter->value());
+ if (rows_counter != nullptr) {
+ summary_data->set_rows_returned(rows_counter->value());
+ // row count stats for a join node
+ string hash_type = PrintThriftEnum(TPlanNodeType::HASH_JOIN_NODE);
+ string nested_loop_type =
PrintThriftEnum(TPlanNodeType::NESTED_LOOP_JOIN_NODE);
+ if (node->name().rfind(hash_type, 0) == 0
+ || node->name().rfind(nested_loop_type, 0) == 0) {
+ per_join_rows_produced[node->metadata().plan_node_id] =
rows_counter->value();
+ }
+ }
if (mem_counter != nullptr)
summary_data->set_peak_mem_usage(mem_counter->value());
summary_data->set_local_time_ns(node->local_time());
}
@@ -333,6 +343,7 @@ void
FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
bytes_read_ = bytes_read;
scan_ranges_complete_ = scan_ranges_complete;
total_bytes_sent_ = total_bytes_sent;
+ per_join_rows_produced_ = per_join_rows_produced;
// Send the DML stats if this is the final report.
if (done) {
diff --git a/be/src/runtime/fragment-instance-state.h
b/be/src/runtime/fragment-instance-state.h
index a4cf413..58b5a4b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -149,6 +149,9 @@ class FragmentInstanceState {
int64_t cpu_sys_ns() const { return cpu_sys_ns_; }
int64_t bytes_read() const { return bytes_read_; }
int64_t total_bytes_sent() const { return total_bytes_sent_; }
+ const std::map<int32_t, int64_t>& per_join_rows_produced() const {
+ return per_join_rows_produced_;
+ }
/// Returns true if the current thread is a thread executing the whole or
part of
/// a fragment instance.
@@ -213,6 +216,10 @@ class FragmentInstanceState {
/// Total bytes sent on exchanges in this backend. Set in GetStatusReport().
int64_t total_bytes_sent_ = 0;
+ /// For each join node, sum of RowsReturned counters on this backend.
+ /// Set in GetStatusReport().
+ std::map<int32_t, int64_t> per_join_rows_produced_;
+
/// Profile for timings for each stage of the plan fragment instance's
lifecycle.
/// Lives in obj_pool().
RuntimeProfile* timings_profile_ = nullptr;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index aff7310..fae3c1c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -540,6 +540,7 @@ void QueryState::ConstructReport(bool instances_started,
int64_t scan_ranges_complete = 0;
int64_t exchange_bytes_sent = 0;
int64_t scan_bytes_sent = 0;
+ std::map<int32_t, int64_t> per_join_rows_produced;
for (const auto& entry : fis_map_) {
FragmentInstanceState* fis = entry.second;
@@ -580,6 +581,7 @@ void QueryState::ConstructReport(bool instances_started,
} else {
exchange_bytes_sent += fis->total_bytes_sent();
}
+ MergeMapValues(fis->per_join_rows_produced(), &per_join_rows_produced);
}
// Construct the per-fragment status reports, including runtime profiles.
@@ -604,6 +606,9 @@ void QueryState::ConstructReport(bool instances_started,
report->set_scan_ranges_complete(scan_ranges_complete);
report->set_exchange_bytes_sent(exchange_bytes_sent);
report->set_scan_bytes_sent(scan_bytes_sent);
+ for (const auto& entry : per_join_rows_produced) {
+ (*report->mutable_per_join_rows_produced())[entry.first] = entry.second;
+ }
}
}
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8b1ca2f..347c9ba 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1274,8 +1274,10 @@ Status ImpalaServer::SetQueryInflight(
int32_t exec_time_limit_s = query_handle->query_options().exec_time_limit_s;
int64_t cpu_limit_s = query_handle->query_options().cpu_limit_s;
int64_t scan_bytes_limit = query_handle->query_options().scan_bytes_limit;
- if (idle_timeout_s > 0 || exec_time_limit_s > 0 ||
- cpu_limit_s > 0 || scan_bytes_limit > 0) {
+ int64_t join_rows_produced_limit =
+ query_handle->query_options().join_rows_produced_limit;
+ if (idle_timeout_s > 0 || exec_time_limit_s > 0 || cpu_limit_s > 0
+ || scan_bytes_limit > 0 || join_rows_produced_limit > 0) {
lock_guard<mutex> l2(query_expiration_lock_);
int64_t now = UnixMillis();
if (idle_timeout_s > 0) {
@@ -1290,7 +1292,7 @@ Status ImpalaServer::SetQueryInflight(
queries_by_timestamp_.emplace(ExpirationEvent{
now + (1000L * exec_time_limit_s), query_id,
ExpirationKind::EXEC_TIME_LIMIT});
}
- if (cpu_limit_s > 0 || scan_bytes_limit > 0) {
+ if (cpu_limit_s > 0 || scan_bytes_limit > 0 || join_rows_produced_limit >
0) {
if (cpu_limit_s > 0) {
VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
<< PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S);
@@ -1299,6 +1301,10 @@ Status ImpalaServer::SetQueryInflight(
VLOG_QUERY << "Query " << PrintId(query_id) << " has scan bytes limit
of "
<< PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES);
}
+ if (join_rows_produced_limit > 0) {
+ VLOG_QUERY << "Query " << PrintId(query_id) << " has join rows
produced limit of "
+ << PrettyPrinter::Print(join_rows_produced_limit,
TUnit::UNIT);
+ }
queries_by_timestamp_.emplace(ExpirationEvent{
now + EXPIRATION_CHECK_INTERVAL_MS, query_id,
ExpirationKind::RESOURCE_LIMIT});
}
@@ -2673,6 +2679,18 @@ Status
ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
VLOG_QUERY << err.msg().msg();
return err;
}
+
+ auto& max_join_node_entry = utilization.MaxJoinNodeRowsProduced();
+ int32_t join_node_id = max_join_node_entry.first;
+ int64_t join_rows_produced = max_join_node_entry.second;
+ int64_t join_rows_produced_limit =
crs->query_options().join_rows_produced_limit;
+ if (join_rows_produced_limit > 0 && join_rows_produced >
join_rows_produced_limit) {
+ Status err =
Status::Expected(TErrorCode::JOIN_ROWS_PRODUCED_LIMIT_EXCEEDED,
+ PrintId(crs->query_id()),
+ PrettyPrinter::Print(join_rows_produced_limit, TUnit::UNIT),
join_node_id);
+ VLOG_QUERY << err.msg().msg();
+ return err;
+ }
// Query is within the resource limits, check again later.
return Status::OK();
}
diff --git a/be/src/service/query-options-test.cc
b/be/src/service/query-options-test.cc
index c1b51fd..979f4d5 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -279,6 +279,7 @@ TEST(QueryOptions, SetBigIntOptions) {
pair<OptionDef<int64_t>, Range<int64_t>> case_set[] {
{MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}},
{MAKE_OPTIONDEF(num_rows_produced_limit), {0, I64_MAX}},
+ {MAKE_OPTIONDEF(join_rows_produced_limit), {0, I64_MAX}},
};
for (const auto& test_case : case_set) {
const OptionDef<int64_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c986292..96c8809 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1006,6 +1006,17 @@ Status impala::SetQueryOption(const string& key, const
string& value,
query_options->__set_broadcast_to_partition_factor(val);
break;
}
+ case TImpalaQueryOptions::JOIN_ROWS_PRODUCED_LIMIT: {
+ StringParser::ParseResult result;
+ const int64_t join_rows_produced_limit =
+ StringParser::StringToInt<int64_t>(value.c_str(), value.length(),
&result);
+ if (result != StringParser::PARSE_SUCCESS || join_rows_produced_limit
< 0) {
+ return Status(Substitute("Invalid join rows produced limit: '$0'. "
+ "Only non-negative numbers are allowed.",
value));
+ }
+
query_options->__set_join_rows_produced_limit(join_rows_produced_limit);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" <<
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 9abd042..e23805c 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::BROADCAST_TO_PARTITION_FACTOR + 1);\
+ TImpalaQueryOptions::JOIN_ROWS_PRODUCED_LIMIT + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded,
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -229,6 +229,8 @@ typedef std::unordered_map<string,
beeswax::TQueryOptionLevel::type>
TQueryOptionLevel::ADVANCED)\
QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\
TQueryOptionLevel::ADVANCED)\
+ QUERY_OPT_FN(join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT,\
+ TQueryOptionLevel::ADVANCED)\
;
/// Enforce practical limits on some query options to avoid undesired query
state.
diff --git a/common/protobuf/control_service.proto
b/common/protobuf/control_service.proto
index 13e6008..14beea3 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -256,6 +256,9 @@ message ReportExecStatusRequestPB {
// disregarded if the sequence number of this report is less than or equal to
// the previous report received.
optional int64 backend_report_seq_no = 15;
+
+ // For each join node, sum of RowsReturned counters on this backend.
+ map<int32, int64> per_join_rows_produced = 16;
}
message ReportExecStatusResponsePB {
diff --git a/common/thrift/ImpalaInternalService.thrift
b/common/thrift/ImpalaInternalService.thrift
index d61b9eb..6b63b97 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -475,6 +475,8 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
119: optional double broadcast_to_partition_factor = 1.0;
+ // See comment in ImpalaService.thrift
+ 120: optional i64 join_rows_produced_limit = 0;
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift
b/common/thrift/ImpalaService.thrift
index f553d76..0561ae5 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -613,6 +613,10 @@ enum TImpalaQueryOptions {
// favor partition distribution.
BROADCAST_TO_PARTITION_FACTOR = 118
+ // A limit on the number of join rows produced by the query. The query will
be
+ // canceled if the query is still executing after this limit is hit. A value
+ // of 0 means there is no limit on the number of join rows produced.
+ JOIN_ROWS_PRODUCED_LIMIT = 119
}
// The summary of a DML statement.
diff --git a/common/thrift/generate_error_codes.py
b/common/thrift/generate_error_codes.py
index f0f9505..5c38a40 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -463,6 +463,11 @@ error_codes = (
# Note: impala_shell uses a regex to search for this specific error message,
so
# changing it may break older shell version.
("INVALID_QUERY_HANDLE", 150, "Invalid or unknown query handle: $0."),
+
+ ("JOIN_ROWS_PRODUCED_LIMIT_EXCEEDED", 151,
+ "Query $0 terminated due to join rows produced exceeds the limit of $1 "
+ "at node with id $2. Unset or increase JOIN_ROWS_PRODUCED_LIMIT query
option "
+ "to produce more rows."),
)
import sys
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
index 8702da0..fa7554a 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
@@ -68,3 +68,77 @@ STRING,STRING
'ccccc','dddd'
'eeeeeeee','f'
====
+---- QUERY
+# Query should fail due to exceeding hash join rows produced limit.
+# Added a sleep to make sure it runs long enough to get a non-zero update from
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 1000;
+select sleep(10000) union select count(*) from tpch.lineitem l1,tpch.lineitem
l2,
+tpch.lineitem l3 where l1.l_suppkey = l2.l_linenumber and l1.l_orderkey =
l2.l_orderkey
+and l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment having
count(*) = 99
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 1.00K.*
+====
+---- QUERY
+# Query should fail due to exceeding nested loop join rows produced limit.
+# Added a sleep to make sure it runs long enough to get a non-zero update from
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 500;
+select sleep(10000) union select count(*) from functional.alltypestiny t1 left
outer join
+functional.alltypessmall t2 on t1.int_col < t2.int_col;
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 500.*
+====
+---- QUERY
+# If one of the mixed joins exceeds the row produced limit, the query should
fail.
+# Added a sleep to make sure it runs long enough to get a non-zero update from
the
+# fragment instances on the num of join rows produced.
+set JOIN_ROWS_PRODUCED_LIMIT = 10000;
+select sleep(10000) union select count(*) from (
+select
+ *
+from (
+ select
+ ps_partkey,
+ sum(ps_supplycost * ps_availqty) as value
+ from
+ tpch_parquet.partsupp,
+ tpch_parquet.supplier,
+ tpch_parquet.nation
+ where
+ ps_suppkey = s_suppkey
+ and s_nationkey = n_nationkey
+ and n_name = 'GERMANY'
+ group by
+ ps_partkey
+) as inner_query
+where
+ value > (
+ select
+ sum(ps_supplycost * ps_availqty) * 0.0001
+ from
+ tpch_parquet.partsupp,
+ tpch_parquet.supplier,
+ tpch_parquet.nation
+ where
+ ps_suppkey = s_suppkey
+ and s_nationkey = n_nationkey
+ and n_name = 'GERMANY'
+ )
+order by
+ value desc
+) t;
+---- CATCH
+row_regex:.*terminated due to join rows produced exceeds the limit of 10.00k*
+====
+---- QUERY
+# The query should succeed when it doesn't exceed the join rows produced limit.
+set JOIN_ROWS_PRODUCED_LIMIT = 10000;
+select count(*) from alltypessmall a inner join alltypessmall b
+on (a.timestamp_col = b.timestamp_col)
+where a.year=2009 and a.month=1 and b.year=2009 and b.month=1;
+---- TYPES
+BIGINT
+---- RESULTS
+25
+====