Repository: incubator-impala
Updated Branches:
refs/heads/master d03e7d6ce -> fb735df13
IMPALA-5625: stress test: write profile when queries fail
This change writes query profiles as text files for all of the major
query failure reasons in the concurrent_select stress test.
1) Change the --result-hash-log-dir command-line option to --results-dir
and update the help text.
2) Introduce two new directories under the directory given by the
--results-dir command-line argument:
profiles
result_hashes
3) Move results into the result_hashes directory.
4) Write the query profile to the profiles directory when a query times
out or gets an error or incorrect results.
5) Remove the query profile from the log output for unexpected mem
limit exceeded exceptions. Instead, write those to the profiles
directory as well.
Testing:
Ran the stress test with a driver that changes the hashes of some of the
query results in the runtime info json file to inject incorrect result
failures. Set tight bounds on the mem limit and timeout to ensure there
would be timeouts and exceeded memory limit failures. Restarted the
NameNode mid test run to induce a query failure. That covers the 4 cases
for which an exception is thrown and profile is written for query
failures. Verified that the profiles were written for each kind of
query failure.
Change-Id: I1dbdf5fcf97d6c5681c9fc8fb9eb448bc459b3b0
Reviewed-on: http://gerrit.cloudera.org:8080/7376
Reviewed-by: Michael Brown <[email protected]>
Tested-by: Michael Brown <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/fb735df1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/fb735df1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/fb735df1
Branch: refs/heads/master
Commit: fb735df134652b4131f77e1807684af0cc424647
Parents: d03e7d6
Author: Matthew Mulder <[email protected]>
Authored: Fri Jun 30 16:18:12 2017 -0700
Committer: Michael Brown <[email protected]>
Committed: Thu Aug 24 19:41:33 2017 +0000
----------------------------------------------------------------------
tests/stress/concurrent_select.py | 152 ++++++++++++++++++++-------------
1 file changed, 92 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/fb735df1/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py
b/tests/stress/concurrent_select.py
index d77e306..6626f58 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -92,6 +92,9 @@ MEM_LIMIT_EQ_THRESHOLD_MB = 50
# Regex to extract the estimated memory from an explain plan.
MEM_ESTIMATE_PATTERN = re.compile(r"Estimated.*Memory=(\d+.?\d*)(T|G|M|K)?B")
+PROFILES_DIR = "profiles"
+RESULT_HASHES_DIR = "result_hashes"
+
# The version of the file format containing the collected query runtime info.
RUNTIME_INFO_FILE_VERSION = 3
@@ -164,6 +167,7 @@ class QueryReport(object):
self.timed_out = False
self.was_cancelled = False
self.profile = None
+ self.query_id = None
class MemBroker(object):
@@ -312,7 +316,7 @@ class StressRunner(object):
self.startup_queries_per_sec = 1.0
self.num_successive_errors_needed_to_abort = 1
self._num_successive_errors = Value("i", 0)
- self.result_hash_log_dir = gettempdir()
+ self.results_dir = gettempdir()
self._status_headers = [
"Done", "Running", "Mem Lmt Ex", "Time Out", "Cancel",
@@ -594,7 +598,7 @@ class StressRunner(object):
LOG.debug("New query runner started")
runner = QueryRunner()
runner.impalad = impalad
- runner.result_hash_log_dir = self.result_hash_log_dir
+ runner.results_dir = self.results_dir
runner.use_kerberos = self.use_kerberos
runner.common_query_options = self.common_query_options
runner.connect()
@@ -641,7 +645,7 @@ class StressRunner(object):
else:
timeout = solo_runtime * max(
10, self._num_queries_started.value -
self._num_queries_finished.value)
- report = runner.run_query(query, timeout, mem_limit)
+ report = runner.run_query(query, timeout, mem_limit,
should_cancel=should_cancel)
LOG.debug("Got execution report for query")
if report.timed_out and should_cancel:
report.was_cancelled = True
@@ -675,15 +679,16 @@ class StressRunner(object):
continue
increment(self._num_successive_errors)
increment(self._num_other_errors)
- raise Exception("Query failed: %s" % str(report.non_mem_limit_error))
+ self._write_query_profile(report)
+ raise Exception("Query {0} failed: {1}".format(report.query_id,
error_msg))
if (
report.mem_limit_exceeded and
not self._mem_broker.was_overcommitted(reservation_id)
):
increment(self._num_successive_errors)
- raise Exception(
- "Unexpected mem limit exceeded; mem was not overcommitted\n"
- "Profile: %s" % report.profile)
+ self._write_query_profile(report)
+ raise Exception("Unexpected mem limit exceeded; mem was not
overcommitted. "
+ "Query ID: {0}".format(report.query_id))
if (
not report.mem_limit_exceeded and
not report.timed_out and
@@ -691,9 +696,18 @@ class StressRunner(object):
):
increment(self._num_successive_errors)
increment(self._num_result_mismatches)
+ self._write_query_profile(report)
+ raise Exception(dedent("""\
+ Result hash mismatch; expected {expected},
got {actual}
+ Query ID: {id}
+ Query:
{query}""".format(expected=query.result_hash,
+
actual=report.result_hash,
+ id=report.query_id,
+ query=query.sql)))
+ if report.timed_out and not should_cancel:
+ self._write_query_profile(report)
raise Exception(
- "Result hash mismatch; expected %s, got %s\nQuery: %s"
- % (query.result_hash, report.result_hash, query.sql))
+ "Query unexpectedly timed out. Query ID:
{0}".format(report.query_id))
self._num_successive_errors.value = 0
def _print_status_header(self):
@@ -736,6 +750,14 @@ class StressRunner(object):
if report.timed_out:
increment(self._num_queries_timedout)
+ def _write_query_profile(self, report):
+ if not (report.profile and report.query_id):
+ return
+ file_name = report.query_id.replace(":", "_") + "_profile.txt"
+ profile_log_path = os.path.join(self.results_dir, PROFILES_DIR, file_name)
+ with open(profile_log_path, "w") as profile_log:
+ profile_log.write(report.profile)
+
class QueryTimeout(Exception):
pass
@@ -795,7 +817,7 @@ class QueryRunner(object):
self.impalad = None
self.impalad_conn = None
self.use_kerberos = False
- self.result_hash_log_dir = gettempdir()
+ self.results_dir = gettempdir()
self.check_if_mem_was_spilled = False
self.common_query_options = {}
@@ -807,10 +829,14 @@ class QueryRunner(object):
self.impalad_conn.close()
self.impalad_conn = None
- def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False):
+ def run_query(self, query, timeout_secs, mem_limit_mb, run_set_up=False,
+ should_cancel=False):
"""Run a query and return an execution report. If 'run_set_up' is True,
set up sql
will be executed before the main query. This should be the case during the
binary
search phase of the stress test.
+ If 'should_cancel' is True, don't get the query profile for timed out
queries because
+ the query was purposely cancelled by setting the query timeout too short
to complete,
+ rather than having some problem that needs to be investigated.
"""
if not self.impalad_conn:
raise Exception("connect() must first be called")
@@ -843,13 +869,15 @@ class QueryRunner(object):
cursor.execute_async(
"/* Mem: %s MB. Coordinator: %s. */\n"
% (mem_limit_mb, self.impalad.host_name) + query.sql)
- LOG.debug(
- "Query id is %s",
op_handle_to_query_id(cursor._last_operation.handle if
- cursor._last_operation
else None))
+ report.query_id =
op_handle_to_query_id(cursor._last_operation.handle if
+ cursor._last_operation else
None)
+ LOG.debug("Query id is %s", report.query_id)
sleep_secs = 0.1
secs_since_log = 0
while cursor.is_executing():
if time() > timeout_unix_time:
+ if not should_cancel:
+ fetch_and_set_profile(cursor, report)
self._cancel(cursor, report)
return report
if secs_since_log > 5:
@@ -860,6 +888,8 @@ class QueryRunner(object):
if query.query_type == QueryType.SELECT:
try:
report.result_hash = self._hash_result(cursor,
timeout_unix_time, query)
+ if query.result_hash and report.result_hash != query.result_hash:
+ fetch_and_set_profile(cursor, report)
except QueryTimeout:
self._cancel(cursor, report)
return report
@@ -867,18 +897,15 @@ class QueryRunner(object):
# If query is in error state, this will raise an exception
cursor._wait_to_finish()
except Exception as error:
- LOG.debug(
- "Error running query with id %s: %s",
- op_handle_to_query_id(cursor._last_operation.handle if
- cursor._last_operation else None), error)
+ report.query_id =
op_handle_to_query_id(cursor._last_operation.handle if
+ cursor._last_operation else
None)
+ LOG.debug("Error running query with id %s: %s", report.query_id,
error)
self._check_for_mem_limit_exceeded(report, cursor, error)
if report.non_mem_limit_error or report.mem_limit_exceeded:
return report
report.runtime_secs = time() - start_time
if cursor.execution_failed() or self.check_if_mem_was_spilled:
- # Producing a query profile can be somewhat expensive. A v-tune
profile of
- # impalad showed 10% of cpu time spent generating query profiles.
- report.profile = cursor.get_profile()
+ fetch_and_set_profile(cursor, report)
report.mem_was_spilled = any([
pattern.search(report.profile) is not None
for pattern in QueryRunner.SPILLED_PATTERNS])
@@ -891,35 +918,27 @@ class QueryRunner(object):
def _cancel(self, cursor, report):
report.timed_out = True
- # Copy the operation handle in case another thread causes the handle to be
reset.
- operation_handle = cursor._last_operation.handle if cursor._last_operation
else None
- if not operation_handle:
+ if not report.query_id:
return
- query_id = op_handle_to_query_id(operation_handle)
try:
- LOG.debug("Attempting cancellation of query with id %s", query_id)
+ LOG.debug("Attempting cancellation of query with id %s", report.query_id)
cursor.cancel_operation()
- LOG.debug("Sent cancellation request for query with id %s", query_id)
+ LOG.debug("Sent cancellation request for query with id %s",
report.query_id)
except Exception as e:
- LOG.debug("Error cancelling query with id %s: %s", query_id, e)
+ LOG.debug("Error cancelling query with id %s: %s", report.query_id, e)
try:
LOG.debug("Attempting to cancel query through the web server.")
- self.impalad.cancel_query(query_id)
+ self.impalad.cancel_query(report.query_id)
except Exception as e:
- LOG.debug("Error cancelling query %s through the web server: %s",
query_id, e)
+ LOG.debug("Error cancelling query %s through the web server: %s",
+ report.query_id, e)
def _check_for_mem_limit_exceeded(self, report, cursor, caught_exception):
"""To be called after a query failure to check for signs of failed due to a
mem limit. The report will be updated accordingly.
"""
- if cursor._last_operation:
- try:
- report.profile = cursor.get_profile()
- except Exception as e:
- LOG.debug(
- "Error getting profile for query with id %s: %s",
- op_handle_to_query_id(cursor._last_operation.handle), e)
+ fetch_and_set_profile(cursor, report)
caught_msg = str(caught_exception).lower().strip()
# Exceeding a mem limit may result in the message "cancelled". See
IMPALA-2234
@@ -941,12 +960,8 @@ class QueryRunner(object):
report.mem_limit_exceeded = True
return
- LOG.debug(
- "Non-mem limit error for query with id %s: %s",
- op_handle_to_query_id(
- cursor._last_operation.handle if cursor._last_operation else None),
- caught_exception,
- exc_info=True)
+ LOG.debug("Non-mem limit error for query with id %s: %s", report.query_id,
+ caught_exception, exc_info=True)
report.non_mem_limit_error = caught_exception
def _hash_result(self, cursor, timeout_unix_time, query):
@@ -967,7 +982,8 @@ class QueryRunner(object):
if query.result_hash is None:
file_name += "_initial"
file_name += "_results.txt"
- result_log = open(os.path.join(self.result_hash_log_dir, file_name),
"w")
+ result_log = open(os.path.join(self.results_dir, RESULT_HASHES_DIR,
file_name),
+ "w")
result_log.write(query.sql)
result_log.write("\n")
current_thread().result = 1
@@ -1068,7 +1084,7 @@ def load_queries_from_test_file(file_path, db_name=None):
def load_random_queries_and_populate_runtime_info(
query_generator, model_translator, tables, db_name, impala, use_kerberos,
query_count,
- query_timeout_secs, result_hash_log_dir
+ query_timeout_secs, results_dir
):
"""Returns a list of random queries. Each query will also have its runtime
info
populated. The runtime info population also serves to validate the query.
@@ -1085,12 +1101,11 @@ def load_random_queries_and_populate_runtime_info(
yield query
return populate_runtime_info_for_random_queries(
impala, use_kerberos, generate_candidates(), query_count,
query_timeout_secs,
- result_hash_log_dir)
+ results_dir)
def populate_runtime_info_for_random_queries(
- impala, use_kerberos, candidate_queries,
- query_count, query_timeout_secs, result_hash_log_dir
+ impala, use_kerberos, candidate_queries, query_count, query_timeout_secs,
results_dir
):
"""Returns a list of random queries. Each query will also have its runtime
info
populated. The runtime info population also serves to validate the query.
@@ -1102,8 +1117,7 @@ def populate_runtime_info_for_random_queries(
for query in candidate_queries:
try:
populate_runtime_info(
- query, impala, use_kerberos, result_hash_log_dir,
- timeout_secs=query_timeout_secs)
+ query, impala, use_kerberos, results_dir,
timeout_secs=query_timeout_secs)
queries.append(query)
except Exception as e:
# Ignore any non-fatal errors. These could be query timeouts or bad
queries (
@@ -1119,7 +1133,7 @@ def populate_runtime_info_for_random_queries(
def populate_runtime_info(
- query, impala, use_kerberos, result_hash_log_dir,
+ query, impala, use_kerberos, results_dir,
timeout_secs=maxint, samples=1, max_conflicting_samples=0
):
"""Runs the given query by itself repeatedly until the minimum memory is
determined
@@ -1139,7 +1153,7 @@ def populate_runtime_info(
runner = QueryRunner()
runner.check_if_mem_was_spilled = True
runner.impalad = impala.impalads[0]
- runner.result_hash_log_dir = result_hash_log_dir
+ runner.results_dir = results_dir
runner.use_kerberos = use_kerberos
runner.connect()
limit_exceeded_mem = 0
@@ -1647,13 +1661,25 @@ def populate_all_queries(queries, impala, args,
runtime_info_path,
query.db_name][query.sql][str(sorted(query.options.items()))])
else:
populate_runtime_info(
- query, impala, args.use_kerberos, args.result_hash_log_dir,
+ query, impala, args.use_kerberos, args.results_dir,
samples=args.samples,
max_conflicting_samples=args.max_conflicting_samples)
save_runtime_info(runtime_info_path, query, impala)
result.append(query)
return result
+def fetch_and_set_profile(cursor, report):
+ """Set the report's query profile using the given cursor.
+ Producing a query profile can be somewhat expensive. A v-tune profile of
+ impalad showed 10% of cpu time spent generating query profiles.
+ """
+ if not report.profile and cursor._last_operation:
+ try:
+ report.profile = cursor.get_profile()
+ except Exception as e:
+ LOG.debug("Error getting profile for query with id %s: %s",
report.query_id, e)
+
+
def print_version(cluster):
"""
Print the cluster impalad version info to the console sorted by hostname.
@@ -1703,10 +1729,13 @@ def main():
' max-conflicting-samples=1, then 4/5 queries must not spill at a
particular mem'
' limit.')
parser.add_argument(
- "--result-hash-log-dir", default=gettempdir(),
- help="If query results do not match, a log file will be left in this
dir. The log"
- " file is also created during the first run when runtime info is
collected for"
- " each query.")
+ "--results-dir", default=gettempdir(),
+ help="Directory under which the profiles and result_hashes directories
are created."
+ " Query hash results are written in the result_hashes directory. If
query results"
+ " do not match, a log file will be left in that dir. The log file is
also created"
+ " during the first run when runtime info is collected for each query.
Unexpected"
+ " query timeouts, exceeded memory, failures or result mismatches will
result in a"
+ " profile written in the profiles directory.")
parser.add_argument(
"--no-status", action="store_true", help="Do not print the status
table.")
parser.add_argument(
@@ -1860,6 +1889,9 @@ def main():
LOG.debug("Common query option '{query_option}' set to
'{value}'".format(
query_option=query_option, value=value))
+ os.mkdir(os.path.join(args.results_dir, RESULT_HASHES_DIR))
+ os.mkdir(os.path.join(args.results_dir, PROFILES_DIR))
+
cluster = cli_options.create_cluster(args)
impala = cluster.impala
if impala.find_stopped_impalads():
@@ -1951,7 +1983,7 @@ def main():
queries.extend(load_random_queries_and_populate_runtime_info(
query_generator, SqlWriter.create(), tables, args.random_db, impala,
args.use_kerberos, args.random_query_count,
args.random_query_timeout_seconds,
- args.result_hash_log_dir))
+ args.results_dir))
if args.query_file_path:
file_queries = load_queries_from_test_file(
@@ -1959,7 +1991,7 @@ def main():
shuffle(file_queries)
queries.extend(populate_runtime_info_for_random_queries(
impala, args.use_kerberos, file_queries, args.random_query_count,
- args.random_query_timeout_seconds, args.result_hash_log_dir))
+ args.random_query_timeout_seconds, args.results_dir))
# Apply tweaks to the query's runtime info as requested by CLI options.
for idx in xrange(len(queries) - 1, -1, -1):
@@ -2022,7 +2054,7 @@ def main():
LOG.info("Number of queries in the list: {0}".format(len(queries)))
stress_runner = StressRunner()
- stress_runner.result_hash_log_dir = args.result_hash_log_dir
+ stress_runner.results_dir = args.results_dir
stress_runner.startup_queries_per_sec = args.startup_queries_per_second
stress_runner.num_successive_errors_needed_to_abort =
args.fail_upon_successive_errors
stress_runner.use_kerberos = args.use_kerberos