Repository: impala
Updated Branches:
  refs/heads/master c1be4e967 -> 1dcc6c1be


IMPALA-6904: stress test threshold parameters

I needed this to generate numbers in test_mem_usage_scaling.py without
a +-50MB error in them.

Testing:
Ran a local stress test with TPC-H and random queries.

Change-Id: I46cc95cbb078c5ef9886971ab1c0f493ddcf8377
Reviewed-on: http://gerrit.cloudera.org:8080/9769
Reviewed-by: Michael Brown <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/1dcc6c1b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/1dcc6c1b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/1dcc6c1b

Branch: refs/heads/master
Commit: 1dcc6c1beb9c36528184aa5a90f5d3f258e32971
Parents: c1be4e9
Author: Tim Armstrong <[email protected]>
Authored: Thu Mar 22 15:11:12 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Apr 24 02:46:08 2018 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 106 +++++++++++++++------------------
 1 file changed, 49 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/1dcc6c1b/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py 
b/tests/stress/concurrent_select.py
index 44d7b34..4316672 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -94,11 +94,6 @@ EXPECTED_TPCDS_QUERIES_COUNT = 71
 EXPECTED_TPCH_NESTED_QUERIES_COUNT = 22
 EXPECTED_TPCH_QUERIES_COUNT = 22
 
-# Used to short circuit a binary search of the min mem limit. Values will be 
considered
-# equal if they are within this ratio or absolute amount of each other.
-MEM_LIMIT_EQ_THRESHOLD_PC = 0.975
-MEM_LIMIT_EQ_THRESHOLD_MB = 50
-
 # Regex to extract the estimated memory from an explain plan.
 # The unit prefixes can be found in
 # fe/src/main/java/org/apache/impala/common/PrintUtils.java
@@ -1245,8 +1240,7 @@ def load_queries_from_test_file(file_path, db_name=None):
 
 
 def load_random_queries_and_populate_runtime_info(
-    query_generator, model_translator, tables, db_name, impala, use_kerberos, 
query_count,
-    query_timeout_secs, results_dir
+    query_generator, model_translator, tables, impala, converted_args
 ):
   """Returns a list of random queries. Each query will also have its runtime 
info
   populated. The runtime info population also serves to validate the query.
@@ -1259,16 +1253,13 @@ def load_random_queries_and_populate_runtime_info(
       sql = model_translator.write_query(query_model)
       query = Query()
       query.sql = sql
-      query.db_name = db_name
+      query.db_name = converted_args.random_db
       yield query
   return populate_runtime_info_for_random_queries(
-      impala, use_kerberos, generate_candidates(), query_count, 
query_timeout_secs,
-      results_dir)
+      impala, generate_candidates(), converted_args)
 
 
-def populate_runtime_info_for_random_queries(
-    impala, use_kerberos, candidate_queries, query_count, query_timeout_secs, 
results_dir
-):
+def populate_runtime_info_for_random_queries(impala, candidate_queries, 
converted_args):
   """Returns a list of random queries. Each query will also have its runtime 
info
   populated. The runtime info population also serves to validate the query.
   """
@@ -1279,7 +1270,8 @@ def populate_runtime_info_for_random_queries(
   for query in candidate_queries:
     try:
       populate_runtime_info(
-          query, impala, use_kerberos, results_dir, 
timeout_secs=query_timeout_secs)
+          query, impala, converted_args,
+          timeout_secs=converted_args.random_query_timeout_seconds)
       queries.append(query)
     except Exception as e:
       # Ignore any non-fatal errors. These could be query timeouts or bad 
queries (
@@ -1289,36 +1281,37 @@ def populate_runtime_info_for_random_queries(
       LOG.warn(
           "Error running query (the test will continue)\n%s\n%s",
           e, query.sql, exc_info=True)
-    if len(queries) == query_count:
+    if len(queries) == converted_args.random_query_count:
       break
   return queries
 
 
-def populate_runtime_info(
-    query, impala, use_kerberos, results_dir,
-    timeout_secs=maxint, samples=1, max_conflicting_samples=0,
-    common_query_options=None
-):
+def populate_runtime_info(query, impala, converted_args, timeout_secs=maxint):
   """Runs the given query by itself repeatedly until the minimum memory is 
determined
   with and without spilling. Potentially all fields in the Query class (except
   'sql') will be populated by this method. 'required_mem_mb_without_spilling' 
and
   the corresponding runtime field may still be None if the query could not be 
run
   without spilling.
 
-  'samples' and 'max_conflicting_samples' control the reliability of the 
collected
-  information. The problem is that memory spilling or usage may differ (by a 
large
-  amount) from run to run due to races during execution. The parameters 
provide a way
-  to express "X out of Y runs must have resulted in the same outcome". 
Increasing the
-  number of samples and decreasing the tolerance (max conflicts) increases 
confidence
-  but also increases the time to collect the data.
+  converted_args.samples and converted_args.max_conflicting_samples control the
+  reliability of the collected information. The problem is that memory 
spilling or usage
+  may differ (by a large amount) from run to run due to races during 
execution. The
+  parameters provide a way to express "X out of Y runs must have resulted in 
the same
+  outcome". Increasing the number of samples and decreasing the tolerance (max 
conflicts)
+  increases confidence but also increases the time to collect the data.
   """
   LOG.info("Collecting runtime info for query %s: \n%s", query.name, query.sql)
+  samples = converted_args.samples
+  max_conflicting_samples = converted_args.max_conflicting_samples
+  results_dir = converted_args.results_dir
+  mem_limit_eq_threshold_mb = converted_args.mem_limit_eq_threshold_mb
+  mem_limit_eq_threshold_percent = 
converted_args.mem_limit_eq_threshold_percent
   runner = QueryRunner()
   runner.check_if_mem_was_spilled = True
-  runner.common_query_options = common_query_options
+  runner.common_query_options = converted_args.common_query_options
   runner.impalad = impala.impalads[0]
   runner.results_dir = results_dir
-  runner.use_kerberos = use_kerberos
+  runner.use_kerberos = converted_args.use_kerberos
   runner.connect()
   limit_exceeded_mem = 0
   non_spill_mem = None
@@ -1361,14 +1354,12 @@ def populate_runtime_info(
                                 run_set_up=True, retain_profile=True)
       if report.timed_out:
         report.write_query_profile(
-            os.path.join(results_dir, PROFILES_DIR),
-            profile_error_prefix)
+            os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise QueryTimeout(
             "query {0} timed out during binary 
search".format(query.logical_query_id))
       if report.non_mem_limit_error:
         report.write_query_profile(
-            os.path.join(results_dir, PROFILES_DIR),
-            profile_error_prefix)
+            os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
         raise Exception(
             "query {0} errored during binary search: {1}".format(
                 query.logical_query_id, str(report.non_mem_limit_error)))
@@ -1378,8 +1369,7 @@ def populate_runtime_info(
           query.result_hash = report.result_hash
         elif query.result_hash != report.result_hash:
           report.write_query_profile(
-              os.path.join(results_dir, PROFILES_DIR),
-              profile_error_prefix)
+              os.path.join(results_dir, PROFILES_DIR), profile_error_prefix)
           raise Exception(
               "Result hash mismatch for query %s; expected %s, got %s" %
               (query.logical_query_id, query.result_hash, report.result_hash))
@@ -1416,6 +1406,7 @@ def populate_runtime_info(
     LOG.info("Finding a starting point for binary search")
     mem_limit = min(mem_estimate, impala.min_impalad_mem_mb) or 
impala.min_impalad_mem_mb
     while True:
+      LOG.info("Next mem_limit: {0}".format(mem_limit))
       report = get_report()
       if not report or report.mem_limit_exceeded:
         if report and report.mem_limit_exceeded:
@@ -1442,8 +1433,9 @@ def populate_runtime_info(
       old_required_mem_mb_without_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC 
or \
-        upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    LOG.info("Next mem_limit: {0}".format(mem_limit))
+    should_break = mem_limit / float(upper_bound) > 1 - 
mem_limit_eq_threshold_percent \
+        or upper_bound - mem_limit < mem_limit_eq_threshold_mb
     report = get_report(desired_outcome=("NOT_SPILLED" if spill_mem else None))
     if not report:
       lower_bound = mem_limit
@@ -1478,8 +1470,9 @@ def populate_runtime_info(
       old_required_mem_mb_with_spilling = None
     else:
       mem_limit = (lower_bound + upper_bound) / 2
-    should_break = mem_limit / float(upper_bound) > MEM_LIMIT_EQ_THRESHOLD_PC \
-        or upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB
+    LOG.info("Next mem_limit: {0}".format(mem_limit))
+    should_break = mem_limit / float(upper_bound) > 1 - 
mem_limit_eq_threshold_percent \
+        or upper_bound - mem_limit < mem_limit_eq_threshold_mb
     report = get_report(desired_outcome="SPILLED")
     if not report or report.mem_limit_exceeded:
       lower_bound = mem_limit
@@ -1488,12 +1481,12 @@ def populate_runtime_info(
       upper_bound = mem_limit
     if should_break:
       if not query.required_mem_mb_with_spilling:
-        if upper_bound - mem_limit < MEM_LIMIT_EQ_THRESHOLD_MB:
+        if upper_bound - mem_limit < mem_limit_eq_threshold_mb:
           # IMPALA-6604: A fair amount of queries go down this path.
           LOG.info(
               "Unable to find a memory limit with spilling within the 
threshold of {0} "
               "MB. Using the same memory limit for both.".format(
-                  MEM_LIMIT_EQ_THRESHOLD_MB))
+                  mem_limit_eq_threshold_mb))
         query.required_mem_mb_with_spilling = 
query.required_mem_mb_without_spilling
         query.solo_runtime_secs_with_spilling = 
query.solo_runtime_secs_without_spilling
         query.solo_runtime_profile_with_spilling = \
@@ -1862,8 +1855,6 @@ def populate_all_queries(
     queries, impala, converted_args, 
queries_with_runtime_info_by_db_sql_and_options
 ):
   """Populate runtime info for all queries, ordered by the population_order 
property."""
-  common_query_options = converted_args.common_query_options
-  runtime_info_path = converted_args.runtime_info_path
   result = []
   queries_by_order = {}
   for query in queries:
@@ -1882,12 +1873,8 @@ def populate_all_queries(
         result.append(queries_with_runtime_info_by_db_sql_and_options[
             query.db_name][query.sql][str(sorted(query.options.items()))])
       else:
-        populate_runtime_info(
-            query, impala, converted_args.use_kerberos, 
converted_args.results_dir,
-            samples=converted_args.samples,
-            max_conflicting_samples=converted_args.max_conflicting_samples,
-            common_query_options=common_query_options)
-        save_runtime_info(runtime_info_path, query, impala)
+        populate_runtime_info(query, impala, converted_args)
+        save_runtime_info(converted_args.runtime_info_path, query, impala)
         query.write_runtime_info_profiles(
             os.path.join(converted_args.results_dir, PROFILES_DIR))
         result.append(query)
@@ -1951,6 +1938,16 @@ def main():
       ' max-conflicting-samples=1, then 4/5 queries must not spill at a 
particular mem'
       ' limit.')
   parser.add_argument(
+      "--mem-limit-eq-threshold-percent", default=0.025,
+      type=float, help='Used when collecting "runtime info". If the difference 
between'
+      ' two memory limits is less than this percentage, we consider the two 
limits to'
+      ' be equal and stop the memory binary search.')
+  parser.add_argument(
+      "--mem-limit-eq-threshold-mb", default=50,
+      type=int, help='Used when collecting "runtime info". If the difference 
between'
+      ' two memory limits is less than this value in MB, we consider the two 
limits to'
+      ' be equal and stop the memory binary search.')
+  parser.add_argument(
       "--results-dir", default=gettempdir(),
       help="Directory under which the profiles and result_hashes directories 
are created."
       " Query hash results are written in the result_hashes directory. If 
query results"
@@ -2063,8 +2060,6 @@ def main():
       "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
   converted_args = StressArgConverter(args)
-  common_query_options = converted_args.common_query_options
-  runtime_info_path = converted_args.runtime_info_path
 
   cli_options.configure_logging(
       args.log_level, debug_log_file=args.debug_log_file, log_thread_name=True,
@@ -2097,7 +2092,7 @@ def main():
   impala.min_impalad_mem_mb = min(impala.find_impalad_mem_mb_limit())
 
   queries_with_runtime_info_by_db_sql_and_options = load_runtime_info(
-      runtime_info_path, impala)
+      converted_args.runtime_info_path, impala)
 
   # Start loading the test queries.
   queries = list()
@@ -2174,17 +2169,14 @@ def main():
     with impala.cursor(db_name=args.random_db) as cursor:
       tables = [cursor.describe_table(t) for t in cursor.list_table_names()]
     queries.extend(load_random_queries_and_populate_runtime_info(
-        query_generator, SqlWriter.create(), tables, args.random_db, impala,
-        args.use_kerberos, args.random_query_count, 
args.random_query_timeout_seconds,
-        args.results_dir))
+        query_generator, SqlWriter.create(), tables, impala, converted_args))
 
   if args.query_file_path:
     file_queries = load_queries_from_test_file(
         args.query_file_path, db_name=args.query_file_db)
     shuffle(file_queries)
     queries.extend(populate_runtime_info_for_random_queries(
-        impala, args.use_kerberos, file_queries, args.random_query_count,
-        args.random_query_timeout_seconds, args.results_dir))
+        impala, file_queries, converted_args))
 
   # Apply tweaks to the query's runtime info as requested by CLI options.
   for idx in xrange(len(queries) - 1, -1, -1):
@@ -2261,7 +2253,7 @@ def main():
   stress_runner.cancel_probability = args.cancel_probability
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
-  stress_runner.common_query_options = common_query_options
+  stress_runner.common_query_options = converted_args.common_query_options
   stress_runner.run_queries(
       queries, impala, args.max_queries, args.mem_overcommit_pct,
       should_print_status=not args.no_status,

Reply via email to