IMPALA-5721,IMPALA-6717,IMPALA-6738: improve stress test binary search IMPALA-5721: - Save profiles of queries at the end of both the spilling and non-spilling binary search. These were not being saved before. Note these profiles won't have ExecSummary until IMPALA-6640 is addressed.
- Save the profile of any query that produces incorrect results during binary search. These were not being saved before, either. - Use descriptive names, like tpch_100_parquet_q12_profile_without_spilling.txt, for profiles mentioned above. We do this by introducing the concept of a "logical_query_id" whose values look like "tpch_100_parquet_q12". - Use the logical_query_id in critical error paths and include the logical_query_id in result hash files. IMPALA-6717: - Plumb --common-query-options through to the binary search. IMPALA-6738: - Begin a refactoring to reduce the number of parameters used when doing the binary search. - Introduce a notion of "converted args" via class that does the conversion (if needed) via property getters. - Adjust populate_all_queries() to use converted_args Change-Id: I33d036ec93df3016cd4703205078dbdba0168acb Reviewed-on: http://gerrit.cloudera.org:8080/9770 Reviewed-by: David Knupp <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8091b2f4 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8091b2f4 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8091b2f4 Branch: refs/heads/master Commit: 8091b2f469a3678561e47e918ffe8db0eb5d48db Parents: 4028e9c Author: Michael Brown <[email protected]> Authored: Wed Feb 28 11:02:11 2018 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Mar 29 03:38:35 2018 +0000 ---------------------------------------------------------------------- tests/stress/concurrent_select.py | 289 ++++++++++++++++++++++++--------- 1 file changed, 213 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8091b2f4/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index a4bffd9..44d7b34 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -62,17 +62,19 @@ import sys import threading import traceback from Queue import Empty # Must be before Queue below +from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace from collections import defaultdict from contextlib import contextmanager from datetime import datetime from multiprocessing import Lock, Process, Queue, Value -from random import choice, random, randrange +from random import choice, random, randrange, shuffle from sys import exit, maxint from tempfile import gettempdir from textwrap import dedent from threading import current_thread, Thread from time import sleep, time +import tests.comparison.cli_options as cli_options import tests.util.test_file_parser as test_file_parser from tests.comparison.cluster import Timeout from tests.comparison.db_types import Int, TinyInt, SmallInt, BigInt @@ -110,6 +112,83 @@ RESULT_HASHES_DIR = "result_hashes" RUNTIME_INFO_FILE_VERSION = 3 +class StressArgConverter(object): + def __init__(self, args): + """ + Convert arguments as returned from from argparse parse_args() into internal forms. + + The purpose of this object is to do any conversions needed from the type given by + parge_args() into internal forms. For example, if a commandline option takes in a + complicated string that needs to be converted into a list or dictionary, this is the + place to do it. Access works the same as on the object returned by parse_args(), + i.e., object.option_attribute. + + In most cases, simple arguments needn't be converted, because argparse handles the + type conversion already, and in most cases, type conversion (e.g., "8" <str> to 8 + <int>) is all that's needed. If a property getter below doesn't exist, it means the + argument value is just passed along unconverted. + + Params: + args: argparse.Namespace object (from argparse.ArgumentParser().parse_args()) + """ + assert isinstance(args, Namespace), "expected Namespace, got " + str(type(args)) + self._args = args + self._common_query_options = None + + def __getattr__(self, attr): + # This "proxies through" all the attributes from the Namespace object that are not + # defined in this object via property getters below. + return getattr(self._args, attr) + + @property + def common_query_options(self): + # Memoize this, as the integrity checking of --common-query-options need only + # happen once. + if self._common_query_options is not None: + return self._common_query_options + # The stress test sets these, so callers cannot override them. + IGNORE_QUERY_OPTIONS = frozenset([ + 'ABORT_ON_ERROR', + 'MEM_LIMIT', + ]) + common_query_options = {} + if self._args.common_query_options is not None: + for query_option_and_value in self._args.common_query_options: + try: + query_option, value = query_option_and_value.split('=') + except ValueError: + LOG.error( + "Could not parse --common-query-options: '{common_query_options}'".format( + common_query_options=self._args.common_query_options)) + exit(1) + query_option = query_option.upper() + if query_option in common_query_options: + LOG.error( + "Query option '{query_option}' already defined in --common-query-options: " + "'{common_query_options}'".format( + query_option=query_option, + common_query_options=self._args.common_query_options)) + exit(1) + elif query_option in IGNORE_QUERY_OPTIONS: + LOG.warn( + "Ignoring '{query_option}' in common query options: '{opt}': " + "The stress test algorithm needs control of this option.".format( + query_option=query_option, opt=self._args.common_query_options)) + else: + common_query_options[query_option] = value + LOG.debug("Common query option '{query_option}' set to '{value}'".format( + query_option=query_option, value=value)) + self._common_query_options = common_query_options + return self._common_query_options + + @property + def runtime_info_path(self): + runtime_info_path = self._args.runtime_info_path + if "{cm_host}" in runtime_info_path: + runtime_info_path = runtime_info_path.format(cm_host=self._args.cm_host) + return runtime_info_path + + def create_and_start_daemon_thread(fn, name): thread = Thread(target=fn, name=name) thread.error = None @@ -169,7 +248,9 @@ def print_crash_info_if_exists(impala, start_time): class QueryReport(object): """Holds information about a single query run.""" - def __init__(self): + def __init__(self, query): + self.query = query + self.result_hash = None self.runtime_secs = None self.mem_was_spilled = False @@ -180,6 +261,32 @@ class QueryReport(object): self.profile = None self.query_id = None + def write_query_profile(self, directory, prefix=None): + """ + Write out the query profile bound to this object to a given directory. + + The file name is generated and will contain the query ID. Use the optional prefix + parameter to set a prefix on the filename. + + Example return: + tpcds_300_decimal_parquet_q21_00000001_a38c8331_profile.txt + + Parameters: + directory (str): Directory to write profile. + prefix (str): Prefix for filename. + """ + if not (self.profile and self.query_id): + return + if prefix is not None: + file_name = prefix + '_' + else: + file_name = '' + file_name += self.query.logical_query_id + '_' + file_name += self.query_id.replace(":", "_") + "_profile.txt" + profile_log_path = os.path.join(directory, file_name) + with open(profile_log_path, "w") as profile_log: + profile_log.write(self.profile) + class MemBroker(object): """Provides memory usage coordination for clients running in different processes. @@ -632,14 +739,18 @@ class StressRunner(object): continue increment(self._num_successive_errors) increment(self._num_other_errors) - self._write_query_profile(report) - raise Exception("Query {0} failed: {1}".format(report.query_id, error_msg)) + self._write_query_profile(report, PROFILES_DIR, prefix='error') + raise Exception("Query {query} ID {id} failed: {mesg}".format( + query=query.logical_query_id, + id=report.query_id, + mesg=error_msg)) if ( report.mem_limit_exceeded and not self._mem_broker.was_overcommitted(reservation_id) ): increment(self._num_successive_errors) - self._write_query_profile(report) + self._write_query_profile( + report, PROFILES_DIR, prefix='unexpected_mem_exceeded') raise Exception("Unexpected mem limit exceeded; mem was not overcommitted. " "Query ID: {0}".format(report.query_id)) if ( @@ -649,18 +760,19 @@ class StressRunner(object): ): increment(self._num_successive_errors) increment(self._num_result_mismatches) - self._write_query_profile(report) + self._write_query_profile(report, PROFILES_DIR, prefix='incorrect_results') raise Exception(dedent("""\ Result hash mismatch; expected {expected}, got {actual} Query ID: {id} Query: {query}""".format(expected=query.result_hash, actual=report.result_hash, id=report.query_id, - query=query.sql))) + query=query.logical_query_id))) if report.timed_out and not should_cancel: - self._write_query_profile(report) + self._write_query_profile(report, PROFILES_DIR, prefix='timed_out') raise Exception( - "Query unexpectedly timed out. Query ID: {0}".format(report.query_id)) + "Query {query} unexpectedly timed out. Query ID: {id}".format( + query=query.logical_query_id, id=report.query_id)) self._num_successive_errors.value = 0 def _print_status_header(self): @@ -706,13 +818,10 @@ class StressRunner(object): if report.timed_out: increment(self._num_queries_timedout) - def _write_query_profile(self, report): - if not (report.profile and report.query_id): - return - file_name = report.query_id.replace(":", "_") + "_profile.txt" - profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name) - with open(profile_log_path, "w") as profile_log: - profile_log.write(report.profile) + def _write_query_profile(self, report, subdir, prefix=None): + report.write_query_profile( + os.path.join(self.results_dir, subdir), + prefix) def _check_successive_errors(self): if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort): @@ -807,6 +916,8 @@ class Query(object): self.result_hash = None self.required_mem_mb_with_spilling = None self.required_mem_mb_without_spilling = None + self.solo_runtime_profile_with_spilling = None + self.solo_runtime_profile_without_spilling = None self.solo_runtime_secs_with_spilling = None self.solo_runtime_secs_without_spilling = None # Query options to set before running the query. @@ -818,6 +929,8 @@ class Query(object): # UPSERT, DELETE. self.query_type = QueryType.SELECT + self._logical_query_id = None + def __repr__(self): return dedent(""" <Query @@ -832,6 +945,29 @@ class Query(object): Population order: %(population_order)r> """.strip() % self.__dict__) + @property + def logical_query_id(self): + """ + Return a meanginful unique str identifier for the query. + + Example: "tpcds_300_decimal_parquet_q21" + """ + if self._logical_query_id is None: + self._logical_query_id = '{0}_{1}'.format(self.db_name, self.name) + return self._logical_query_id + + def write_runtime_info_profiles(self, directory): + """Write profiles for spilling and non-spilling into directory (str).""" + profiles_to_write = [ + (self.logical_query_id + "_profile_with_spilling.txt", + self.solo_runtime_profile_with_spilling), + (self.logical_query_id + "_profile_without_spilling.txt", + self.solo_runtime_profile_without_spilling), + ] + for filename, profile in profiles_to_write: + with open(os.path.join(directory, filename), "w") as fh: + fh.write(profile) + class QueryRunner(object): """Encapsulates functionality to run a query and provide a runtime report.""" @@ -856,7 +992,7 @@ class QueryRunner(object): self.impalad_conn = None def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False, - should_cancel=False): + should_cancel=False, retain_profile=False): """Run a query and return an execution report. If 'run_set_up' is True, set up sql will be executed before the main query. This should be the case during the binary search phase of the stress test. @@ -868,7 +1004,7 @@ class QueryRunner(object): raise Exception("connect() must first be called") timeout_unix_time = time() + timeout_secs - report = QueryReport() + report = QueryReport(query) try: with self.impalad_conn.cursor() as cursor: start_time = time() @@ -914,7 +1050,8 @@ class QueryRunner(object): if query.query_type == QueryType.SELECT: try: report.result_hash = self._hash_result(cursor, timeout_unix_time, query) - if query.result_hash and report.result_hash != query.result_hash: + if retain_profile or \ + query.result_hash and report.result_hash != query.result_hash: fetch_and_set_profile(cursor, report) except QueryTimeout: self._cancel(cursor, report) @@ -1004,7 +1141,7 @@ class QueryRunner(object): def hash_result_impl(): result_log = None try: - file_name = query_id.replace(":", "_") + file_name = '_'.join([query.logical_query_id, query_id.replace(":", "_")]) if query.result_hash is None: file_name += "_initial" file_name += "_results.txt" @@ -1159,7 +1296,8 @@ def populate_runtime_info_for_random_queries( def populate_runtime_info( query, impala, use_kerberos, results_dir, - timeout_secs=maxint, samples=1, max_conflicting_samples=0 + timeout_secs=maxint, samples=1, max_conflicting_samples=0, + common_query_options=None ): """Runs the given query by itself repeatedly until the minimum memory is determined with and without spilling. Potentially all fields in the Query class (except @@ -1177,6 +1315,7 @@ def populate_runtime_info( LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql) runner = QueryRunner() runner.check_if_mem_was_spilled = True + runner.common_query_options = common_query_options runner.impalad = impala.impalads[0] runner.results_dir = results_dir runner.use_kerberos = use_kerberos @@ -1191,6 +1330,8 @@ def populate_runtime_info( old_required_mem_mb_without_spilling = query.required_mem_mb_without_spilling old_required_mem_mb_with_spilling = query.required_mem_mb_with_spilling + profile_error_prefix = query.logical_query_id + "_binsearch_error" + # TODO: This method is complicated enough now that breaking it out into a class may be # helpful to understand the structure. @@ -1203,30 +1344,45 @@ def populate_runtime_info( ): query.required_mem_mb_with_spilling = required_mem query.solo_runtime_secs_with_spilling = report.runtime_secs + query.solo_runtime_profile_with_spilling = report.profile elif ( query.required_mem_mb_without_spilling is None or required_mem < query.required_mem_mb_without_spilling ): query.required_mem_mb_without_spilling = required_mem query.solo_runtime_secs_without_spilling = report.runtime_secs + query.solo_runtime_profile_without_spilling = report.profile def get_report(desired_outcome=None): reports_by_outcome = defaultdict(list) leading_outcome = None for remaining_samples in xrange(samples - 1, -1, -1): - report = runner.run_query(query, timeout_secs, mem_limit, run_set_up=True) + report = runner.run_query(query, timeout_secs, mem_limit, + run_set_up=True, retain_profile=True) if report.timed_out: - raise QueryTimeout() + report.write_query_profile( + os.path.join(results_dir, PROFILES_DIR), + profile_error_prefix) + raise QueryTimeout( + "query {0} timed out during binary search".format(query.logical_query_id)) if report.non_mem_limit_error: - raise report.non_mem_limit_error + report.write_query_profile( + os.path.join(results_dir, PROFILES_DIR), + profile_error_prefix) + raise Exception( + "query {0} errored during binary search: {1}".format( + query.logical_query_id, str(report.non_mem_limit_error))) LOG.debug("Spilled: %s" % report.mem_was_spilled) if not report.mem_limit_exceeded: if query.result_hash is None: query.result_hash = report.result_hash elif query.result_hash != report.result_hash: + report.write_query_profile( + os.path.join(results_dir, PROFILES_DIR), + profile_error_prefix) raise Exception( - "Result hash mismatch; expected %s, got %s" % - (query.result_hash, report.result_hash)) + "Result hash mismatch for query %s; expected %s, got %s" % + (query.logical_query_id, query.result_hash, report.result_hash)) if report.mem_limit_exceeded: outcome = "EXCEEDED" @@ -1332,8 +1488,16 @@ def populate_runtime_info( upper_bound = mem_limit if should_break: if not query.required_mem_mb_with_spilling: + if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB: + # IMPALA-6604: A fair amount of queries go down this path. + LOG.info( + "Unable to find a memory limit with spilling within the threshold of {0} " + "MB. Using the same memory limit for both.".format( + MEM_LIMIT_EQ_THRESHOLD_MB)) query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling + query.solo_runtime_profile_with_spilling = \ + query.solo_runtime_profile_without_spilling break LOG.info("Minimum memory is %s MB" % query.required_mem_mb_with_spilling) if ( @@ -1349,6 +1513,7 @@ def populate_runtime_info( " the absolute minimum memory.") query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling + query.solo_runtime_profile_with_spilling = query.solo_runtime_profile_without_spilling LOG.debug("Query after populating runtime info: %s", query) @@ -1420,9 +1585,12 @@ def save_runtime_info(path, query, impala): class JsonEncoder(json.JSONEncoder): def default(self, obj): data = dict(obj.__dict__) - # Queries are stored by sql, so remove the duplicate data. - if "sql" in data: - del data["sql"] + # Queries are stored by sql, so remove the duplicate data. Also don't store + # profiles as JSON values, but instead separately. + for k in ("sql", "solo_runtime_profile_with_spilling", + "solo_runtime_profile_without_spilling"): + if k in data: + del data[k] return data json.dump( store, file, cls=JsonEncoder, sort_keys=True, indent=2, separators=(',', ': ')) @@ -1690,9 +1858,12 @@ def reset_databases(cursor): " exist in '{1}' database.".format(table_name, cursor.db_name)) -def populate_all_queries(queries, impala, args, runtime_info_path, - queries_with_runtime_info_by_db_sql_and_options): +def populate_all_queries( + queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options +): """Populate runtime info for all queries, ordered by the population_order property.""" + common_query_options = converted_args.common_query_options + runtime_info_path = converted_args.runtime_info_path result = [] queries_by_order = {} for query in queries: @@ -1712,9 +1883,13 @@ def populate_all_queries(queries, impala, args, runtime_info_path, query.db_name][query.sql][str(sorted(query.options.items()))]) else: populate_runtime_info( - query, impala, args.use_kerberos, args.results_dir, - samples=args.samples, max_conflicting_samples=args.max_conflicting_samples) + query, impala, converted_args.use_kerberos, converted_args.results_dir, + samples=converted_args.samples, + max_conflicting_samples=converted_args.max_conflicting_samples, + common_query_options=common_query_options) save_runtime_info(runtime_info_path, query, impala) + query.write_runtime_info_profiles( + os.path.join(converted_args.results_dir, PROFILES_DIR)) result.append(query) return result @@ -1745,10 +1920,6 @@ def print_version(cluster): def main(): - from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser - from random import shuffle - import tests.comparison.cli_options as cli_options - parser = ArgumentParser( epilog=dedent(""" Before running this script a CM cluster must be setup and any needed data @@ -1891,6 +2062,9 @@ def main(): "or set valid values. Example: --common-query-options " "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1") args = parser.parse_args() + converted_args = StressArgConverter(args) + common_query_options = converted_args.common_query_options + runtime_info_path = converted_args.runtime_info_path cli_options.configure_logging( args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True, @@ -1906,40 +2080,6 @@ def main(): "At least one of --tpcds-db, --tpch-db, --tpch-kudu-db," "--tpcds-kudu-db, --tpch-nested-db, --random-db, --query-file-path is required") - # The stress test sets these, so callers cannot override them. - IGNORE_QUERY_OPTIONS = frozenset([ - 'ABORT_ON_ERROR', - 'MEM_LIMIT', - ]) - - common_query_options = {} - if args.common_query_options is not None: - for query_option_and_value in args.common_query_options: - try: - query_option, value = query_option_and_value.split('=') - except ValueError: - LOG.error( - "Could not parse --common-query-options: '{common_query_options}'".format( - common_query_options=args.common_query_options)) - exit(1) - query_option = query_option.upper() - if query_option in common_query_options: - LOG.error( - "Query option '{query_option}' already defined in --common-query-options: " - "'{common_query_options}'".format( - query_option=query_option, - common_query_options=args.common_query_options)) - exit(1) - elif query_option in IGNORE_QUERY_OPTIONS: - LOG.warn( - "Ignoring '{query_option}' in common query options: '{opt}': " - "The stress test algorithm needs control of this option.".format( - query_option=query_option, opt=args.common_query_options)) - else: - common_query_options[query_option] = value - LOG.debug("Common query option '{query_option}' set to '{value}'".format( - query_option=query_option, value=value)) - os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR)) os.mkdir(os.path.join(args.results_dir, PROFILES_DIR)) @@ -1956,9 +2096,6 @@ def main(): raise Exception("Queries are currently running on the cluster") impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit()) - runtime_info_path = args.runtime_info_path - if "{cm_host}" in runtime_info_path: - runtime_info_path = runtime_info_path.format(cm_host=args.cm_host) queries_with_runtime_info_by_db_sql_and_options = load_runtime_info( runtime_info_path, impala) @@ -2026,8 +2163,8 @@ def main(): with impala.cursor(db_name=database) as cursor: reset_databases(cursor) - queries = populate_all_queries(queries, impala, args, runtime_info_path, - queries_with_runtime_info_by_db_sql_and_options) + queries = populate_all_queries( + queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options) # A particular random query may either fail (due to a generator or Impala bug) or # take a really long time to complete. So the queries needs to be validated. Since the
