IMPALA-3969: stress test: add option to set common query options

It can be useful for debugging purposes to run the stress test with
custom query options, for example with codegen disabled. This patch adds
a command line option to the stress test entry point that allows a
caller to set query options.

To reduce support as new options are chosen, we allow freeform,
space-delimited option=value arguments as the option's value, like:

 --common_query_options option1=value1 ... optionN=valueN

This means we don't do much validation that these options and values are
well-formed.  Callers must take care to type the correct options and
values.

Testing: I ran concurrent_select.py by hand against an Impala cluster
with codegen both enabled and disabled, ala

 --common_query_options DISABLE_CODEGEN=true

Both the log written by concurrent_select.py and the Impala logs on the
cluster indicated DISABLE_CODEGEN was being set as I directed. I also
did negative testing for a few bad --common_query_options values. Either
concurrent_select.py catches the error, or when the first query runs,
the error is reported.

Change-Id: Iada041aace60c218a12178d8f1b9a68ff29de72e
Reviewed-on: http://gerrit.cloudera.org:8080/3887
Reviewed-by: David Knupp <[email protected]>
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Tim Armstrong <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9162d5d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9162d5d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9162d5d0

Branch: refs/heads/master
Commit: 9162d5d0544bf080da0a86331e9f77fc56803678
Parents: 5afd9f7
Author: Michael Brown <[email protected]>
Authored: Wed Aug 10 08:47:25 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu Aug 11 22:04:03 2016 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 50 +++++++++++++++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9162d5d0/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py 
b/tests/stress/concurrent_select.py
index e7ed96b..dc06503 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -66,7 +66,7 @@ from contextlib import contextmanager
 from datetime import datetime
 from multiprocessing import Lock, Process, Queue, Value
 from random import choice, random, randrange
-from sys import maxint
+from sys import exit, maxint
 from tempfile import gettempdir
 from textwrap import dedent
 from threading import current_thread, Thread
@@ -93,6 +93,7 @@ MEM_ESTIMATE_PATTERN = 
re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
 # The version of the file format containing the collected query runtime info.
 RUNTIME_INFO_FILE_VERSION = 2
 
+
 def create_and_start_daemon_thread(fn, name):
   thread = Thread(target=fn, name=name)
   thread.error = None
@@ -267,6 +268,7 @@ class StressRunner(object):
 
   def __init__(self):
     self.use_kerberos = False
+    self.common_query_options = {}
     self._mem_broker = None
 
     # Synchronized blocking work queue for producer/consumers.
@@ -539,6 +541,7 @@ class StressRunner(object):
     runner.impalad = impalad
     runner.result_hash_log_dir = self.result_hash_log_dir
     runner.use_kerberos = self.use_kerberos
+    runner.common_query_options = self.common_query_options
     runner.connect()
 
     while not self._query_queue.empty():
@@ -674,6 +677,7 @@ class Query(object):
     self.required_mem_mb_without_spilling = None
     self.solo_runtime_secs_with_spilling = None
     self.solo_runtime_secs_without_spilling = None
+    self.common_query_options = {}
 
   def __repr__(self):
     return dedent("""
@@ -717,6 +721,9 @@ class QueryRunner(object):
     try:
       with self.impalad_conn.cursor() as cursor:
         start_time = time()
+        for query_option, value in self.common_query_options.iteritems():
+          cursor.execute(
+              "SET {query_option}={value}".format(query_option=query_option, 
value=value))
         cursor.execute("SET ABORT_ON_ERROR=1")
         LOG.debug("Setting mem limit to %s MB", mem_limit_mb)
         cursor.execute("SET MEM_LIMIT=%sM" % mem_limit_mb)
@@ -1355,6 +1362,12 @@ def main():
   parser.add_argument("--nlj-filter", choices=("in", "out", None),
       help="'in' means only nested-loop queries will be used, 'out' means no 
NLJ queries"
       " will be used. The default is to not filter either way.")
+  parser.add_argument(
+      "--common-query-options", default=None, nargs="*",
+      help="Space-delimited string of query options and values. This is a 
freeform "
+      "string with little regard to whether you've spelled the query options 
correctly "
+      "or set valid values. Example: --common-query-options "
+      "DISABLE_CODEGEN=true RUNTIME_FILTER_MODE=1")
   args = parser.parse_args()
 
   cli_options.configure_logging(args.log_level, 
debug_log_file=args.debug_log_file,
@@ -1366,6 +1379,40 @@ def main():
     raise Exception("At least one of --tpcds-db, --tpch-db,"
         "--tpch-nested-db, --random-db, --query-file-path is required")
 
+  # The stress test sets these, so callers cannot override them.
+  IGNORE_QUERY_OPTIONS = frozenset([
+      'ABORT_ON_ERROR',
+      'MEM_LIMIT',
+  ])
+
+  common_query_options = {}
+  if args.common_query_options is not None:
+    for query_option_and_value in args.common_query_options:
+      try:
+        query_option, value = query_option_and_value.split('=')
+      except ValueError:
+        LOG.error(
+            "Could not parse --common-query-options: 
'{common_query_options}'".format(
+                common_query_options=args.common_query_options))
+        exit(1)
+      query_option = query_option.upper()
+      if query_option in common_query_options:
+        LOG.error(
+            "Query option '{query_option}' already defined in 
--common-query-options: "
+            "'{common_query_options}'".format(
+                query_option=query_option,
+                common_query_options=args.common_query_options))
+        exit(1)
+      elif query_option in IGNORE_QUERY_OPTIONS:
+        LOG.warn(
+            "Ignoring '{query_option}' in common query options: '{opt}': "
+            "The stress test algorithm needs control of this option.".format(
+                query_option=query_option, opt=args.common_query_options))
+      else:
+        common_query_options[query_option] = value
+        LOG.debug("Common query option '{query_option}' set to 
'{value}'".format(
+            query_option=query_option, value=value))
+
   cluster = cli_options.create_cluster(args)
   cluster.is_kerberized = args.use_kerberos
   impala = cluster.impala
@@ -1491,6 +1538,7 @@ def main():
   stress_runner.cancel_probability = args.cancel_probability
   stress_runner.spill_probability = args.spill_probability
   stress_runner.leak_check_interval_mins = args.mem_leak_check_interval_mins
+  stress_runner.common_query_options = common_query_options
   stress_runner.run_queries(queries, impala, args.max_queries, 
args.mem_overcommit_pct,
       not args.no_status)   # This is the value of 'should_print_status'.
 

Reply via email to