Repository: incubator-impala Updated Branches: refs/heads/master e06268e82 -> 0bccb3ea0
IMPALA-6063: stress test: report test duration At the end of the stress test, report the number of seconds the test ran like this: Test Duration: 1234 seconds Time spent calculating runtime information isn't counted in the test duration. There are some additional changes to simplify the run_queries method because Flake8 complained that it was too complex and this change was making it even longer. Testing: Ran the stress test and verified the status output and Test Duration output. Change-Id: Ic1769c6c27cf064a330026d12d50562abfaf656f Reviewed-on: http://gerrit.cloudera.org:8080/8339 Reviewed-by: Michael Brown <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57bde365 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57bde365 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57bde365 Branch: refs/heads/master Commit: 57bde365a31805440103b207e443991b127ce873 Parents: e06268e Author: Matthew Mulder <[email protected]> Authored: Thu Oct 19 14:35:35 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Oct 20 03:01:07 2017 +0000 ---------------------------------------------------------------------- tests/stress/concurrent_select.py | 143 ++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57bde365/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index bc1a530..ac86deb 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -364,7 +364,7 @@ class StressRunner(object): raise Exception("Memory leak check interval must be positive") # If there is a crash, start looking for errors starting from this time. - start_time = datetime.now() + self.start_time = datetime.now() self._mem_broker = MemBroker( impala.min_impalad_mem_mb, @@ -375,10 +375,7 @@ class StressRunner(object): # Print the status to show the state before starting. if should_print_status: - self._print_status_header() - self._print_status() - lines_printed = 1 - last_report_secs = 0 + self._print_status(print_header=True) self._num_queries_to_run = num_queries_to_run self._start_polling_mem_usage(impala) @@ -386,69 +383,14 @@ class StressRunner(object): self._start_consuming_queries(impala) # Wait for everything to finish. - sleep_secs = 0.1 - while ( - self._query_producer_thread.is_alive() or - self._query_consumer_thread.is_alive() or - self._query_runners - ): - if self._query_producer_thread.error or self._query_consumer_thread.error: - # This is bad enough to abort early. A failure here probably means there's a - # bug in this script. The mem poller could be checked for an error too. It is - # not critical so is ignored. - LOG.error("Aborting due to error in producer/consumer") - sys.exit(1) - checked_for_crashes = False - for idx, runner in enumerate(self._query_runners): - if runner.exitcode is not None: - if runner.exitcode != 0: - if not checked_for_crashes: - LOG.info("Checking for crashes") - if print_crash_info_if_exists(impala, start_time): - sys.exit(runner.exitcode) - LOG.info("No crashes detected") - checked_for_crashes = True - if ( - self._num_successive_errors.value >= - self.num_successive_errors_needed_to_abort - ): - print( - "Aborting due to %s successive errors encountered" - % self._num_successive_errors.value, file=sys.stderr) - sys.exit(1) - del self._query_runners[idx] - sleep(sleep_secs) - if should_print_status: - last_report_secs += sleep_secs - if last_report_secs > 5: - if ( - not self._query_producer_thread.is_alive() or - not self._query_consumer_thread.is_alive() or - not self._query_runners - ): - LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive()) - LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive()) - LOG.debug("Queue size: %s" % self._query_queue.qsize()) - LOG.debug("Runners: %s" % len(self._query_runners)) - last_report_secs = 0 - lines_printed %= 50 - if lines_printed == 0: - self._print_status_header() - self._print_status() - lines_printed += 1 + self._wait_for_test_to_finish(impala, should_print_status) # And print the final state. if should_print_status: self._print_status() - if ( - self._num_other_errors.value > 0 or - self._num_result_mismatches.value > 0 or - self._num_queries_timedout.value - self._num_queries_cancelled.value > 0 - ): - LOG.error("Failing the stress test due to unexpected errors, incorrect results, or " - "timed out queries. See the report line above for details.") - sys.exit(1) + self._check_for_test_failure() + self.print_duration() def _start_producing_queries(self, queries): def enqueue_queries(): @@ -713,7 +655,10 @@ class StressRunner(object): def _print_status_header(self): print(" | ".join(self._status_headers)) - def _print_status(self): + def _print_status(self, print_header=False): + if print_header: + self._print_status_header() + reported_mem, actual_mem = self._get_mem_usage_values(reset=True) status_format = " | ".join(["%%%ss" % len(header) for header in self._status_headers]) print(status_format % ( @@ -758,6 +703,76 @@ class StressRunner(object): with open(profile_log_path, "w") as profile_log: profile_log.write(report.profile) + def _check_successive_errors(self): + if (self._num_successive_errors.value >= self.num_successive_errors_needed_to_abort): + print( + "Aborting due to %s successive errors encountered" + % self._num_successive_errors.value, file=sys.stderr) + self.print_duration() + sys.exit(1) + + def _check_for_test_failure(self): + if ( + self._num_other_errors.value > 0 or + self._num_result_mismatches.value > 0 or + self._num_queries_timedout.value - self._num_queries_cancelled.value > 0 + ): + LOG.error("Failing the stress test due to unexpected errors, incorrect results, or " + "timed out queries. See the report line above for details.") + self.print_duration() + sys.exit(1) + + def _wait_for_test_to_finish(self, impala, should_print_status): + last_report_secs = 0 + lines_printed = 1 + sleep_secs = 0.1 + + while ( + self._query_producer_thread.is_alive() or + self._query_consumer_thread.is_alive() or + self._query_runners + ): + if self._query_producer_thread.error or self._query_consumer_thread.error: + # This is bad enough to abort early. A failure here probably means there's a + # bug in this script. The mem poller could be checked for an error too. It is + # not critical so is ignored. + LOG.error("Aborting due to error in producer/consumer") + sys.exit(1) + checked_for_crashes = False + for idx, runner in enumerate(self._query_runners): + if runner.exitcode is not None: + if runner.exitcode != 0: + if not checked_for_crashes: + LOG.info("Checking for crashes") + if print_crash_info_if_exists(impala, self.start_time): + self.print_duration() + sys.exit(runner.exitcode) + LOG.info("No crashes detected") + checked_for_crashes = True + self._check_successive_errors() + del self._query_runners[idx] + sleep(sleep_secs) + if should_print_status: + last_report_secs += sleep_secs + if last_report_secs > 5: + if ( + not self._query_producer_thread.is_alive() or + not self._query_consumer_thread.is_alive() or + not self._query_runners + ): + LOG.debug("Producer is alive: %s" % self._query_producer_thread.is_alive()) + LOG.debug("Consumer is alive: %s" % self._query_consumer_thread.is_alive()) + LOG.debug("Queue size: %s" % self._query_queue.qsize()) + LOG.debug("Runners: %s" % len(self._query_runners)) + last_report_secs = 0 + lines_printed %= 50 + self._print_status(print_header=(lines_printed == 0)) + lines_printed += 1 + + def print_duration(self): + duration = datetime.now() - self.start_time + LOG.info("Test Duration: {0:.0f} seconds".format(duration.total_seconds())) + class QueryTimeout(Exception): pass
