IMPALA-3969: stress test: add option to set common query options It can be useful for debugging purposes to run the stress test with custom query options, for example with codegen disabled. This patch adds a command line option to the stress test entry point that allows a caller to set query options.
To reduce support as new options are chosen, we allow freeform, space-delimited option=value arguments as the option's value, like: --common_query_options option1=value1 ... optionN=valueN This means we don't do much validation that these options and values are well-formed. Callers must take care to type the correct options and values. Testing: I ran concurrent_select.py by hand against an Impala cluster with codegen both enabled and disabled, ala --common_query_options DISABLE_CODEGEN=true Both the log written by concurrent_select.py and the Impala logs on the cluster indicated DISABLE_CODEGEN was being set as I directed. I also did negative testing for a few bad --common_query_options values. Either concurrent_select.py catches the error, or when the first query runs, the error is reported. Change-Id: Iada041aace60c218a12178d8f1b9a68ff29de72e Reviewed-on: http://gerrit.cloudera.org:8080/3887 Reviewed-by: David Knupp <[email protected]> Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9162d5d0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9162d5d0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9162d5d0 Branch: refs/heads/master Commit: 9162d5d0544bf080da0a86331e9f77fc56803678 Parents: 5afd9f7 Author: Michael Brown <[email protected]> Authored: Wed Aug 10 08:47:25 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu Aug 11 22:04:03 2016 +0000 ---------------------------------------------------------------------- tests/stress/concurrent_select.py | 50 +++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9162d5d0/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index e7ed96b..dc06503 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -66,7 +66,7 @@ from contextlib import contextmanager from datetime import datetime from multiprocessing import Lock, Process, Queue, Value from random import choice, random, randrange -from sys import maxint +from sys import exit, maxint from tempfile import gettempdir from textwrap import dedent from threading import current_thread, Thread @@ -93,6 +93,7 @@ MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B") # The version of the file format containing the collected query runtime info. RUNTIME_INFO_FILE_VERSION = 2 + def create_and_start_daemon_thread(fn, name): thread = Thread(target=fn, name=name) thread.error = None @@ -267,6 +268,7 @@ class StressRunner(object): def __init__(self): self.use_kerberos = False + self.common_query_options = {} self._mem_broker = None # Synchronized blocking work queue for producer/consumers. @@ -539,6 +541,7 @@ class StressRunner(object): runner.impalad = impalad runner.result_hash_log_dir = self.result_hash_log_dir runner.use_kerberos = self.use_kerberos + runner.common_query_options = self.common_query_options runner.connect() while not self._query_queue.empty(): @@ -674,6 +677,7 @@ class Query(object): self.required_mem_mb_without_spilling = None self.solo_runtime_secs_with_spilling = None self.solo_runtime_secs_without_spilling = None + self.common_query_options = {} def __repr__(self): return dedent(""" @@ -717,6 +721,9 @@ class QueryRunner(object): try: with self.impalad_conn.cursor() as cursor: start_time = time() + for query_option, value in self.common_query_options.iteritems(): + cursor.execute( + "SET {query_option}={value}".format(query_option=query_option, value=value)) cursor.execute("SET ABORT_ON_ERROR=1") LOG.debug("Setting mem limit to %s MB", mem_limit_mb) cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb) @@ -1355,6 +1362,12 @@ def main(): parser.add_argument("--nlj-filter", choices=("in", "out", None), help="'in' means only nested-loop queries will be used, 'out' means no NLJ queries" " will be used. The default is to not filter either way.") + parser.add_argument( + "--common-query-options", default=None, nargs="*", + help="Space-delimited string of query options and values. This is a freeform " + "string with little regard to whether you've spelled the query options correctly " + "or set valid values. Example: --common-query-options " + "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1") args = parser.parse_args() cli_options.configure_logging(args.log_level, debug_log_file=args.debug_log_file, @@ -1366,6 +1379,40 @@ def main(): raise Exception("At least one of --tpcds-db, --tpch-db," "--tpch-nested-db, --random-db, --query-file-path is required") + # The stress test sets these, so callers cannot override them. + IGNORE_QUERY_OPTIONS = frozenset([ + 'ABORT_ON_ERROR', + 'MEM_LIMIT', + ]) + + common_query_options = {} + if args.common_query_options is not None: + for query_option_and_value in args.common_query_options: + try: + query_option, value = query_option_and_value.split('=') + except ValueError: + LOG.error( + "Could not parse --common-query-options: '{common_query_options}'".format( + common_query_options=args.common_query_options)) + exit(1) + query_option = query_option.upper() + if query_option in common_query_options: + LOG.error( + "Query option '{query_option}' already defined in --common-query-options: " + "'{common_query_options}'".format( + query_option=query_option, + common_query_options=args.common_query_options)) + exit(1) + elif query_option in IGNORE_QUERY_OPTIONS: + LOG.warn( + "Ignoring '{query_option}' in common query options: '{opt}': " + "The stress test algorithm needs control of this option.".format( + query_option=query_option, opt=args.common_query_options)) + else: + common_query_options[query_option] = value + LOG.debug("Common query option '{query_option}' set to '{value}'".format( + query_option=query_option, value=value)) + cluster = cli_options.create_cluster(args) cluster.is_kerberized = args.use_kerberos impala = cluster.impala @@ -1491,6 +1538,7 @@ def main(): stress_runner.cancel_probability = args.cancel_probability stress_runner.spill_probability = args.spill_probability stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins + stress_runner.common_query_options = common_query_options stress_runner.run_queries(queries, impala, args.max_queries, args.mem_overcommit_pct, not args.no_status) # This is the value of 'should_print_status'.
