This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5e39afc2d90a606ce05de376af5cd54035f7f99a Author: Mihaly Szjatinya <[email protected]> AuthorDate: Fri Apr 25 12:18:09 2025 +0200 IMPALA-10268: Validate the debug actions when they are set This patch aims to extract existing verifications on DEBUG_ACTION query option format onto pre-planner stage SetQueryOption(), in order to prevent failures on execution stage. Also, it localizes verification code for two existing types of debug actions. There are two types of debug actions, global e.g. 'RECVR_ADD_BATCH:FAIL' and ExecNode debug actions, e.g. '0:GETNEXT:FAIL'. Two types are implemented independently in source code, both having verification code intertwined with execution. In addition, global debug actions subdivide into C++ and Java, the two being more or less synchronized though. In case of global debug actions, most of the code inside existing DebugActionImpl() consists of verification, therefore it makes sense to make a wrapper around it for separating out the execution code. Things are worse for ExecNode debug actions, where verification code consists of two parts, one in DebugOptions() constructor and another one in ExecNode::ExecDebugActionImpl(). Additionally, some verification in constructor produces warnings, while ExecDebugActionImpl() verification either fails on DCHECK() or (in a single case) returns an error. For this case, a reasonable solution seems to be simply calling the constructor for a temporary object and extracting verification code from ExecNode::ExecDebugActionImpl(). This has the drawback of having the same warning being produced two times. Finally, having extracted verification code for both types, logic in impala::SetQueryOption() combines the two verification mechanisms. Note: In the long run, it is better to write a single verification routine for both Global and ExecNode debug actions, ideally as part of a general unification of the two existing debug_action mechanisms. With this in mind, the current patch intends to preserve current behavior, while avoiding complex refactoring. Change-Id: I53816aba2c79b556688d3b916883fee7476fdbb5 Reviewed-on: http://gerrit.cloudera.org:8080/22734 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/exec-node.cc | 28 ++++++---- be/src/exec/exec-node.h | 3 ++ be/src/runtime/debug-options.cc | 10 +++- be/src/runtime/debug-options.h | 1 + be/src/service/query-options.cc | 20 ++++++++ be/src/util/debug-util.cc | 75 ++++++++++++++++----------- be/src/util/debug-util.h | 13 +++-- tests/query_test/test_debug_action.py | 97 +++++++++++++++++++++++++++++++++++ 8 files changed, 203 insertions(+), 44 deletions(-) diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 8f58bdd3c..78b8ff555 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -463,22 +463,30 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s } else { DCHECK_EQ(debug_options_.action, TDebugAction::DELAY); int64_t sleep_duration_ms = 100; - if (!debug_options_.action_param.empty()) { - const string& param = debug_options_.action_param; - StringParser::ParseResult result; - sleep_duration_ms = - StringParser::StringToInt<int64_t>(param.c_str(), param.length(), &result); - if (result != StringParser::PARSE_SUCCESS || sleep_duration_ms < 0) { - return Status(Substitute("Invalid sleep duration: '$0'. " - "Only non-negative numbers are allowed.", param)); - } - } + RETURN_IF_ERROR( + ParseAndValidateSleepDuration(debug_options_.action_param, &sleep_duration_ms)); VLOG(1) << "DEBUG_ACTION: Sleeping for " << sleep_duration_ms << "ms"; SleepForMs(sleep_duration_ms); } return Status::OK(); } +Status ExecNode::ParseAndValidateSleepDuration( + const string& param, int64_t* sleep_duration_ms) { + if (!param.empty()) { + DCHECK(sleep_duration_ms != nullptr); + StringParser::ParseResult result; + *sleep_duration_ms = + StringParser::StringToInt<int64_t>(param.c_str(), param.length(), &result); + if (result != StringParser::PARSE_SUCCESS || *sleep_duration_ms < 0) { + return Status(Substitute("Invalid sleep duration: '$0'. " + "Only non-negative numbers are allowed.", + param)); + } + } + return Status::OK(); +} + bool ExecNode::CheckLimitAndTruncateRowBatchIfNeeded(RowBatch* row_batch, bool* eos) { DCHECK(limit_ != 0); const int row_batch_size = row_batch->num_rows(); diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 5f1f2c577..ac3d7bcc7 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -361,6 +361,9 @@ class ExecNode { /// Names of counters shared by all exec nodes static const std::string ROW_THROUGHPUT_COUNTER; + static Status ParseAndValidateSleepDuration( + const string& param, int64_t* sleep_duration_ms); + protected: friend class DataSink; friend class ScopedGetNextEventAdder; diff --git a/be/src/runtime/debug-options.cc b/be/src/runtime/debug-options.cc index b8b6b3596..f135c4f1f 100644 --- a/be/src/runtime/debug-options.cc +++ b/be/src/runtime/debug-options.cc @@ -29,15 +29,21 @@ using namespace std; using namespace boost; DebugOptions::DebugOptions(const TQueryOptions& query_options) + : DebugOptions(query_options.__isset.debug_action ? query_options.debug_action : "") { +} + +DebugOptions::DebugOptions(const string& debug_action) : instance_idx_(-1), node_id_(-1), action_(TDebugAction::WAIT), phase_(TExecNodePhase::INVALID) { - if (!query_options.__isset.debug_action || query_options.debug_action.empty()) { + + if (debug_action.empty()) { // signal not set return; } - const DebugActionTokens& actions = TokenizeDebugActions(query_options.debug_action); + + const DebugActionTokens& actions = TokenizeDebugActions(debug_action); for (const vector<string>& components : actions) { // This will filter out global debug actions which only have two components. if (components.size() < 3 || components.size() > 4) continue; diff --git a/be/src/runtime/debug-options.h b/be/src/runtime/debug-options.h index 4b723b309..0e565b264 100644 --- a/be/src/runtime/debug-options.h +++ b/be/src/runtime/debug-options.h @@ -33,6 +33,7 @@ namespace impala { class DebugOptions { public: DebugOptions(const TQueryOptions& query_options); + DebugOptions(const string& debug_action); TDebugOptions ToThrift() const; /// query-wide fragment instance index; -1 if not set diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index ec1e09c27..72a00b994 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -28,9 +28,11 @@ #include <gutil/strings/strip.h> #include <gutil/strings/substitute.h> +#include "exec/exec-node.h" #include "exprs/timezone_db.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/Query_constants.h" +#include "runtime/debug-options.h" #include "runtime/runtime-filter.h" #include "service/query-option-parser.h" #include "thirdparty/datasketches/MurmurHash3.h" @@ -230,6 +232,15 @@ static bool IsTrue(const string& value) { return iequals(value, "true") || iequals(value, "1"); } +static Status VerifyExecNodeDebugAction(const TDebugOptions& debug_options) { + if (debug_options.action == TDebugAction::DELAY) { + int64_t sleep_duration_ms; + RETURN_IF_ERROR(ExecNode::ParseAndValidateSleepDuration( + debug_options.action_param, &sleep_duration_ms)); + } + return Status::OK(); +} + // Note that we allow numerical values for boolean and enum options. This is because // TQueryOptionsToMap() will output the numerical values, and we need to parse its output // configuration. @@ -302,6 +313,15 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& va break; }; case TImpalaQueryOptions::DEBUG_ACTION: { + // Verify General DebugAction's first. In case it's valid proceed to + // ExecNode verification. 'invalid command' error status might indicate ExecNode + // DebugAction, so in that case also proceed to ExecNode verification. + Status status = DebugActionVerifyOnly(value); + if (!status.ok() && string::npos == status.msg().msg().find("invalid command")){ + return status; + } + DebugOptions debug_options(value); + RETURN_IF_ERROR(VerifyExecNodeDebugAction(debug_options.ToThrift())); query_options->__set_debug_action(value); break; }; diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc index 3c61a1d1b..c7f629548 100644 --- a/be/src/util/debug-util.cc +++ b/be/src/util/debug-util.cc @@ -357,25 +357,35 @@ static bool ParseProbability(const string& prob_str, bool* should_execute) { /// actions in the Java code. See DebugUtils.java for more details. Any changes to the /// implementation logic here like adding a new type of action, should make changes in /// the DebugUtils.java too. -Status DebugActionImpl( - const string& debug_action, const char* label, const std::vector<string>& args) { +Status DebugActionImpl(const string& debug_action, const char* label, + const std::vector<string>& args, bool verify_only) { const DebugActionTokens& action_list = TokenizeDebugActions(debug_action); static const char ERROR_MSG[] = "Invalid debug_action $0:$1 ($2)"; for (const vector<string>& components : action_list) { - // 'components' should be of the form {label, arg_0, ..., arg_n, action} - if (components.size() != 2 + args.size() || !iequals(components[0], label)) { - continue; + string action_str; + if (!verify_only) { + // 'components' should be of the form {label, arg_0, ..., arg_n, action} + if (components.size() != 2 + args.size() || !iequals(components[0], label)) { + continue; + } + // Check if the arguments match. + bool matches = true; + for (int i = 0; i < args.size(); ++i) { + if (!iequals(components[i + 1], args[i])) { + matches = false; + break; + } + } + if (!matches) continue; + action_str = components[args.size() + 1]; } - // Check if the arguments match. - bool matches = true; - for (int i = 0; i < args.size(); ++i) { - if (!iequals(components[i + 1], args[i])) { - matches = false; - break; + else { + if (components.size() < 2) { + continue; } + action_str = components.back(); } - if (!matches) continue; - const string& action_str = components[args.size() + 1]; + // 'tokens' becomes {command, param0, param1, ... } vector<string> tokens = TokenizeDebugActionParams(action_str); DCHECK_GE(tokens.size(), 1); @@ -418,33 +428,40 @@ Status DebugActionImpl( } if (!should_execute) continue; } - string error_msg = tokens.size() == 3 ? - tokens[2] : - Substitute("Debug Action: $0:$1", components[0], action_str); + if (!verify_only) { + string error_msg = tokens.size() == 3 ? + tokens[2] : + Substitute("Debug Action: $0:$1", components[0], action_str); - if (ImpaladMetrics::DEBUG_ACTION_NUM_FAIL != nullptr) { - ImpaladMetrics::DEBUG_ACTION_NUM_FAIL->Increment(1l); + if (ImpaladMetrics::DEBUG_ACTION_NUM_FAIL != nullptr) { + ImpaladMetrics::DEBUG_ACTION_NUM_FAIL->Increment(1l); + } + return Status(TErrorCode::INTERNAL_ERROR, error_msg); } - return Status(TErrorCode::INTERNAL_ERROR, error_msg); } else if (iequals(cmd, "EXCEPTION")) { //EXCEPTION@<exception_type> - if (tokens.size() != 2) { + // Java debug_actions also support "EXCEPTION@<exception_type>@<error message>" + if (tokens.size() != 2 && tokens.size() != 3) { return Status(Substitute(ERROR_MSG, components[0], action_str, "expected EXCEPTION@<exception_type>")); } - static const auto end = EXCEPTION_STR_MAP.end(); - auto it = EXCEPTION_STR_MAP.find(tokens[1]); - if (it != end) { - it->second(); - } else { - return Status( - Substitute(ERROR_MSG, components[0], action_str, "Invalid exception type")); + if (!verify_only) { + static const auto end = EXCEPTION_STR_MAP.end(); + auto it = EXCEPTION_STR_MAP.find(tokens[1]); + if (it != end) { + it->second(); + } else { + return Status( + Substitute(ERROR_MSG, components[0], action_str, "Invalid exception type")); + } } } else { - DCHECK(false) << "Invalid debug action"; + if (!verify_only) { + DCHECK(false) << "Invalid debug action"; + } return Status(Substitute(ERROR_MSG, components[0], action_str, "invalid command")); } - if (sleep_millis > 0) { + if (!verify_only && sleep_millis > 0) { VLOG(1) << Substitute("Debug Action: $0:$1 sleeping for $2 ms", components[0], action_str, sleep_millis); SleepForMs(sleep_millis); diff --git a/be/src/util/debug-util.h b/be/src/util/debug-util.h index 9ac13aa0d..396fa2083 100644 --- a/be/src/util/debug-util.h +++ b/be/src/util/debug-util.h @@ -145,8 +145,9 @@ DebugActionTokens TokenizeDebugActions(const string& debug_actions); std::vector<std::string> TokenizeDebugActionParams(const string& action); /// Slow path implementing DebugAction() for the case where 'debug_action' is non-empty. +/// In case of 'verify_only' the debug action is only verified and not executed. Status DebugActionImpl(const string& debug_action, const char* label, - const std::vector<string>& args) WARN_UNUSED_RESULT; + const std::vector<string>& args, bool verify_only) WARN_UNUSED_RESULT; /// If debug_action query option has a "global action" (i.e. not exec-node specific) /// and matches the given 'label' and 'args', apply the the action. See @@ -154,9 +155,10 @@ Status DebugActionImpl(const string& debug_action, const char* label, /// ExecNode code, use ExecNode::ExecDebugAction() instead. Will return OK unless either /// an invalid debug action is specified or the FAIL action is executed. WARN_UNUSED_RESULT static inline Status DebugAction(const string& debug_action, - const char* label, const std::vector<string>& args = std::vector<string>()) { + const char* label, const std::vector<string>& args = std::vector<string>(), + bool verify_only = false) { if (LIKELY(debug_action.empty())) return Status::OK(); - return DebugActionImpl(debug_action, label, args); + return DebugActionImpl(debug_action, label, args, verify_only); } WARN_UNUSED_RESULT static inline Status DebugAction( @@ -179,6 +181,11 @@ static inline void DebugActionNoFail( DebugActionNoFail(query_options.debug_action, label); } +WARN_UNUSED_RESULT static inline Status DebugActionVerifyOnly(const string& debug_action){ + if (LIKELY(debug_action.empty())) return Status::OK(); + return DebugAction(debug_action, "", std::vector<string>(), true); +} + /// Map of exception string to the exception throwing function which is used when /// executing the EXCEPTION debug action. static const std::unordered_map<std::string,std::function<void()>> EXCEPTION_STR_MAP { diff --git a/tests/query_test/test_debug_action.py b/tests/query_test/test_debug_action.py new file mode 100644 index 000000000..21b71a056 --- /dev/null +++ b/tests/query_test/test_debug_action.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_vector import ImpalaTestDimension +from tests.common.test_dimensions import create_parquet_dimension + + +class TestDebugAction(ImpalaTestSuite): + """Test different verification scenarios for DEBUG_ACTION query option.""" + + _debug_actions = { + # Debug action and its corresponding expected error. + # Correct debug actions: + 'RECVR_ADD_BATCH:[email protected]': + 'Debug Action: RECVR_ADD_BATCH:[email protected]', + '0:GETNEXT:FAIL': + 'Debug Action: FAIL', + + # Invalid global debug actions: + 'RECVR_ADD_BATCH:SLEEP': + 'Invalid debug_action RECVR_ADD_BATCH:SLEEP (expected SLEEP@<ms>)', + 'RECVR_ADD_BATCH:JITTER@8@8': + 'Invalid debug_action RECVR_ADD_BATCH:JITTER@8@8 (invalid probability)', + 'RECVR_ADD_BATCH:FAIL@8': + 'Invalid debug_action RECVR_ADD_BATCH:FAIL@8 (invalid probability)', + 'RECVR_ADD_BATCH:EXCEPTION@Unknown': + 'Invalid debug_action RECVR_ADD_BATCH:EXCEPTION@Unknown (Invalid exception type)', + + # Invalid ExecNode debug actions: + '0:GETNEXT:DELAY@aa': + 'Invalid sleep duration: \'aa\'. Only non-negative numbers are allowed.', + + # Both global and ExecNode debug actions are valid + 'RECVR_ADD_BATCH:[email protected]|0:GETNEXT:FAIL': + 'Debug Action: FAIL', + + # Both global and ExecNode debug actions are valid + '0:GETNEXT:FAIL|RECVR_ADD_BATCH:[email protected]': + 'Debug Action: FAIL', + + # Global debug action is invalid + 'RECVR_ADD_BATCH:FAIL@8|0:GETNEXT:FAIL': + 'Invalid debug_action RECVR_ADD_BATCH:FAIL@8 (invalid probability)', + + # Global debug action is invalid + '0:GETNEXT:FAIL|RECVR_ADD_BATCH:FAIL@8': + 'Invalid debug_action RECVR_ADD_BATCH:FAIL@8 (invalid probability)', + + # ExecNode debug action is invalid + 'RECVR_ADD_BATCH:[email protected]|0:GETNEXT:DELAY@aa': + 'Invalid sleep duration: \'aa\'. Only non-negative numbers are allowed.', + + # ExecNode debug action is invalid + '0:GETNEXT:DELAY@aa|RECVR_ADD_BATCH:[email protected]': + 'Invalid sleep duration: \'aa\'. Only non-negative numbers are allowed.', + + # Both global and ExecNode debug actions are invalid, global prevails + 'RECVR_ADD_BATCH:FAIL@8|0:GETNEXT:DELAY@aa': + 'Invalid debug_action RECVR_ADD_BATCH:FAIL@8 (invalid probability)', + + # Both ExecNode and global debug actions are invalid, ExecNode prevails + '0:GETNEXT:DELAY@aa|RECVR_ADD_BATCH:FAIL@8': + 'Invalid sleep duration: \'aa\'. Only non-negative numbers are allowed.', + } + + _query = "select * from functional.alltypes" + + @classmethod + def add_test_dimensions(cls): + super(TestDebugAction, cls).add_test_dimensions() + # Pass only the keys (debug actions) to add_dimension() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension( + 'debug_action', *cls._debug_actions.keys())) + cls.ImpalaTestMatrix.add_dimension(create_parquet_dimension(cls.get_workload())) + + def test_failpoints(self, vector): + vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action') + result = self.execute_query_expect_failure( + self.client, self._query, vector.get_value('exec_option')) + assert self._debug_actions[vector.get_value('debug_action')] in str(result)
