Repository: impala Updated Branches: refs/heads/master c1be4e967 -> 1dcc6c1be
IMPALA-6904: stress test threshold parameters I needed this to generate numbers in test_mem_usage_scaling.py without a +-50MB error in them. Testing: Ran a local stress test with TPC-H and random queries. Change-Id: I46cc95cbb078c5ef9886971ab1c0f493ddcf8377 Reviewed-on: http://gerrit.cloudera.org:8080/9769 Reviewed-by: Michael Brown <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1dcc6c1b Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1dcc6c1b Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1dcc6c1b Branch: refs/heads/master Commit: 1dcc6c1beb9c36528184aa5a90f5d3f258e32971 Parents: c1be4e9 Author: Tim Armstrong <[email protected]> Authored: Thu Mar 22 15:11:12 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Apr 24 02:46:08 2018 +0000 ---------------------------------------------------------------------- tests/stress/concurrent_select.py | 106 +++++++++++++++------------------ 1 file changed, 49 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/1dcc6c1b/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index 44d7b34..4316672 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -94,11 +94,6 @@ EXPECTED_TPCDS_QUERIES_COUNT = 71 EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22 EXPECTED_TPCH_QUERIES_COUNT = 22 -# Used to short circuit a binary search of the min mem limit. Values will be considered -# equal if they are within this ratio or absolute amount of each other. -MEM_LIMIT_EQ_THRESHOLD_PC = 0.975 -MEM_LIMIT_EQ_THRESHOLD_MB = 50 - # Regex to extract the estimated memory from an explain plan. # The unit prefixes can be found in # fe/src/main/java/org/apache/impala/common/PrintUtils.java @@ -1245,8 +1240,7 @@ def load_queries_from_test_file(file_path, db_name=None): def load_random_queries_and_populate_runtime_info( - query_generator, model_translator, tables, db_name, impala, use_kerberos, query_count, - query_timeout_secs, results_dir + query_generator, model_translator, tables, impala, converted_args ): """Returns a list of random queries. Each query will also have its runtime info populated. The runtime info population also serves to validate the query. @@ -1259,16 +1253,13 @@ def load_random_queries_and_populate_runtime_info( sql = model_translator.write_query(query_model) query = Query() query.sql = sql - query.db_name = db_name + query.db_name = converted_args.random_db yield query return populate_runtime_info_for_random_queries( - impala, use_kerberos, generate_candidates(), query_count, query_timeout_secs, - results_dir) + impala, generate_candidates(), converted_args) -def populate_runtime_info_for_random_queries( - impala, use_kerberos, candidate_queries, query_count, query_timeout_secs, results_dir -): +def populate_runtime_info_for_random_queries(impala, candidate_queries, converted_args): """Returns a list of random queries. Each query will also have its runtime info populated. The runtime info population also serves to validate the query. """ @@ -1279,7 +1270,8 @@ def populate_runtime_info_for_random_queries( for query in candidate_queries: try: populate_runtime_info( - query, impala, use_kerberos, results_dir, timeout_secs=query_timeout_secs) + query, impala, converted_args, + timeout_secs=converted_args.random_query_timeout_seconds) queries.append(query) except Exception as e: # Ignore any non-fatal errors. These could be query timeouts or bad queries ( @@ -1289,36 +1281,37 @@ def populate_runtime_info_for_random_queries( LOG.warn( "Error running query (the test will continue)\n%s\n%s", e, query.sql, exc_info=True) - if len(queries) == query_count: + if len(queries) == converted_args.random_query_count: break return queries -def populate_runtime_info( - query, impala, use_kerberos, results_dir, - timeout_secs=maxint, samples=1, max_conflicting_samples=0, - common_query_options=None -): +def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint): """Runs the given query by itself repeatedly until the minimum memory is determined with and without spilling. Potentially all fields in the Query class (except 'sql') will be populated by this method. 'required_mem_mb_without_spilling' and the corresponding runtime field may still be None if the query could not be run without spilling. - 'samples' and 'max_conflicting_samples' control the reliability of the collected - information. The problem is that memory spilling or usage may differ (by a large - amount) from run to run due to races during execution. The parameters provide a way - to express "X out of Y runs must have resulted in the same outcome". Increasing the - number of samples and decreasing the tolerance (max conflicts) increases confidence - but also increases the time to collect the data. + converted_args.samples and converted_args.max_conflicting_samples control the + reliability of the collected information. The problem is that memory spilling or usage + may differ (by a large amount) from run to run due to races during execution. The + parameters provide a way to express "X out of Y runs must have resulted in the same + outcome". Increasing the number of samples and decreasing the tolerance (max conflicts) + increases confidence but also increases the time to collect the data. """ LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql) + samples = converted_args.samples + max_conflicting_samples = converted_args.max_conflicting_samples + results_dir = converted_args.results_dir + mem_limit_eq_threshold_mb = converted_args.mem_limit_eq_threshold_mb + mem_limit_eq_threshold_percent = converted_args.mem_limit_eq_threshold_percent runner = QueryRunner() runner.check_if_mem_was_spilled = True - runner.common_query_options = common_query_options + runner.common_query_options = converted_args.common_query_options runner.impalad = impala.impalads[0] runner.results_dir = results_dir - runner.use_kerberos = use_kerberos + runner.use_kerberos = converted_args.use_kerberos runner.connect() limit_exceeded_mem = 0 non_spill_mem = None @@ -1361,14 +1354,12 @@ def populate_runtime_info( run_set_up=True, retain_profile=True) if report.timed_out: report.write_query_profile( - os.path.join(results_dir, PROFILES_DIR), - profile_error_prefix) + os.path.join(results_dir, PROFILES_DIR), profile_error_prefix) raise QueryTimeout( "query {0} timed out during binary search".format(query.logical_query_id)) if report.non_mem_limit_error: report.write_query_profile( - os.path.join(results_dir, PROFILES_DIR), - profile_error_prefix) + os.path.join(results_dir, PROFILES_DIR), profile_error_prefix) raise Exception( "query {0} errored during binary search: {1}".format( query.logical_query_id, str(report.non_mem_limit_error))) @@ -1378,8 +1369,7 @@ def populate_runtime_info( query.result_hash = report.result_hash elif query.result_hash != report.result_hash: report.write_query_profile( - os.path.join(results_dir, PROFILES_DIR), - profile_error_prefix) + os.path.join(results_dir, PROFILES_DIR), profile_error_prefix) raise Exception( "Result hash mismatch for query %s; expected %s, got %s" % (query.logical_query_id, query.result_hash, report.result_hash)) @@ -1416,6 +1406,7 @@ def populate_runtime_info( LOG.info("Finding a starting point for binary search") mem_limit = min(mem_estimate, impala.min_impalad_mem_mb) or impala.min_impalad_mem_mb while True: + LOG.info("Next mem_limit: {0}".format(mem_limit)) report = get_report() if not report or report.mem_limit_exceeded: if report and report.mem_limit_exceeded: @@ -1442,8 +1433,9 @@ def populate_runtime_info( old_required_mem_mb_without_spilling = None else: mem_limit = (lower_bound + upper_bound) / 2 - should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC or \ - upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB + LOG.info("Next mem_limit: {0}".format(mem_limit)) + should_break = mem_limit / float(upper_bound) > 1 - mem_limit_eq_threshold_percent \ + or upper_bound - mem_limit < mem_limit_eq_threshold_mb report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None)) if not report: lower_bound = mem_limit @@ -1478,8 +1470,9 @@ def populate_runtime_info( old_required_mem_mb_with_spilling = None else: mem_limit = (lower_bound + upper_bound) / 2 - should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \ - or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB + LOG.info("Next mem_limit: {0}".format(mem_limit)) + should_break = mem_limit / float(upper_bound) > 1 - mem_limit_eq_threshold_percent \ + or upper_bound - mem_limit < mem_limit_eq_threshold_mb report = get_report(desired_outcome="SPILLED") if not report or report.mem_limit_exceeded: lower_bound = mem_limit @@ -1488,12 +1481,12 @@ def populate_runtime_info( upper_bound = mem_limit if should_break: if not query.required_mem_mb_with_spilling: - if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB: + if upper_bound - mem_limit < mem_limit_eq_threshold_mb: # IMPALA-6604: A fair amount of queries go down this path. LOG.info( "Unable to find a memory limit with spilling within the threshold of {0} " "MB. Using the same memory limit for both.".format( - MEM_LIMIT_EQ_THRESHOLD_MB)) + mem_limit_eq_threshold_mb)) query.required_mem_mb_with_spilling = query.required_mem_mb_without_spilling query.solo_runtime_secs_with_spilling = query.solo_runtime_secs_without_spilling query.solo_runtime_profile_with_spilling = \ @@ -1862,8 +1855,6 @@ def populate_all_queries( queries, impala, converted_args, queries_with_runtime_info_by_db_sql_and_options ): """Populate runtime info for all queries, ordered by the population_order property.""" - common_query_options = converted_args.common_query_options - runtime_info_path = converted_args.runtime_info_path result = [] queries_by_order = {} for query in queries: @@ -1882,12 +1873,8 @@ def populate_all_queries( result.append(queries_with_runtime_info_by_db_sql_and_options[ query.db_name][query.sql][str(sorted(query.options.items()))]) else: - populate_runtime_info( - query, impala, converted_args.use_kerberos, converted_args.results_dir, - samples=converted_args.samples, - max_conflicting_samples=converted_args.max_conflicting_samples, - common_query_options=common_query_options) - save_runtime_info(runtime_info_path, query, impala) + populate_runtime_info(query, impala, converted_args) + save_runtime_info(converted_args.runtime_info_path, query, impala) query.write_runtime_info_profiles( os.path.join(converted_args.results_dir, PROFILES_DIR)) result.append(query) @@ -1951,6 +1938,16 @@ def main(): ' max-conflicting-samples=1, then 4/5 queries must not spill at a particular mem' ' limit.') parser.add_argument( + "--mem-limit-eq-threshold-percent", default=0.025, + type=float, help='Used when collecting "runtime info". If the difference between' + ' two memory limits is less than this percentage, we consider the two limits to' + ' be equal and stop the memory binary search.') + parser.add_argument( + "--mem-limit-eq-threshold-mb", default=50, + type=int, help='Used when collecting "runtime info". If the difference between' + ' two memory limits is less than this value in MB, we consider the two limits to' + ' be equal and stop the memory binary search.') + parser.add_argument( "--results-dir", default=gettempdir(), help="Directory under which the profiles and result_hashes directories are created." " Query hash results are written in the result_hashes directory. If query results" @@ -2063,8 +2060,6 @@ def main(): "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1") args = parser.parse_args() converted_args = StressArgConverter(args) - common_query_options = converted_args.common_query_options - runtime_info_path = converted_args.runtime_info_path cli_options.configure_logging( args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True, @@ -2097,7 +2092,7 @@ def main(): impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit()) queries_with_runtime_info_by_db_sql_and_options = load_runtime_info( - runtime_info_path, impala) + converted_args.runtime_info_path, impala) # Start loading the test queries. queries = list() @@ -2174,17 +2169,14 @@ def main(): with impala.cursor(db_name=args.random_db) as cursor: tables = [cursor.describe_table(t) for t in cursor.list_table_names()] queries.extend(load_random_queries_and_populate_runtime_info( - query_generator, SqlWriter.create(), tables, args.random_db, impala, - args.use_kerberos, args.random_query_count, args.random_query_timeout_seconds, - args.results_dir)) + query_generator, SqlWriter.create(), tables, impala, converted_args)) if args.query_file_path: file_queries = load_queries_from_test_file( args.query_file_path, db_name=args.query_file_db) shuffle(file_queries) queries.extend(populate_runtime_info_for_random_queries( - impala, args.use_kerberos, file_queries, args.random_query_count, - args.random_query_timeout_seconds, args.results_dir)) + impala, file_queries, converted_args)) # Apply tweaks to the query's runtime info as requested by CLI options. for idx in xrange(len(queries) - 1, -1, -1): @@ -2261,7 +2253,7 @@ def main(): stress_runner.cancel_probability = args.cancel_probability stress_runner.spill_probability = args.spill_probability stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins - stress_runner.common_query_options = common_query_options + stress_runner.common_query_options = converted_args.common_query_options stress_runner.run_queries( queries, impala, args.max_queries, args.mem_overcommit_pct, should_print_status=not args.no_status,
