Repository: incubator-impala
Updated Branches:
  refs/heads/master e06268e82 -> 0bccb3ea0


IMPALA-6063: stress test: report test duration

At the end of the stress test, report the number of seconds the test ran
like this:

Test Duration: 1234 seconds

Time spent calculating runtime information isn't counted in the test
duration.

There are some additional changes to simplify the run_queries method
because Flake8 complained that it was too complex and this change
was making it even longer.

Testing:
Ran the stress test and verified the status output and Test Duration
output.

Change-Id: Ic1769c6c27cf064a330026d12d50562abfaf656f
Reviewed-on: http://gerrit.cloudera.org:8080/8339
Reviewed-by: Michael Brown <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/57bde365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/57bde365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/57bde365

Branch: refs/heads/master
Commit: 57bde365a31805440103b207e443991b127ce873
Parents: e06268e
Author: Matthew Mulder <[email protected]>
Authored: Thu Oct 19 14:35:35 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Oct 20 03:01:07 2017 +0000

----------------------------------------------------------------------
 tests/stress/concurrent_select.py | 143 ++++++++++++++++++---------------
 1 file changed, 79 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/57bde365/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py 
b/tests/stress/concurrent_select.py
index bc1a530..ac86deb 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -364,7 +364,7 @@ class StressRunner(object):
       raise Exception("Memory leak check interval must be positive")
 
     # If there is a crash, start looking for errors starting from this time.
-    start_time = datetime.now()
+    self.start_time = datetime.now()
 
     self._mem_broker = MemBroker(
         impala.min_impalad_mem_mb,
@@ -375,10 +375,7 @@ class StressRunner(object):
 
     # Print the status to show the state before starting.
     if should_print_status:
-      self._print_status_header()
-      self._print_status()
-      lines_printed = 1
-      last_report_secs = 0
+      self._print_status(print_header=True)
 
     self._num_queries_to_run = num_queries_to_run
     self._start_polling_mem_usage(impala)
@@ -386,69 +383,14 @@ class StressRunner(object):
     self._start_consuming_queries(impala)
 
     # Wait for everything to finish.
-    sleep_secs = 0.1
-    while (
-        self._query_producer_thread.is_alive() or
-        self._query_consumer_thread.is_alive() or
-        self._query_runners
-    ):
-      if self._query_producer_thread.error or 
self._query_consumer_thread.error:
-        # This is bad enough to abort early. A failure here probably means 
there's a
-        # bug in this script. The mem poller could be checked for an error 
too. It is
-        # not critical so is ignored.
-        LOG.error("Aborting due to error in producer/consumer")
-        sys.exit(1)
-      checked_for_crashes = False
-      for idx, runner in enumerate(self._query_runners):
-        if runner.exitcode is not None:
-          if runner.exitcode != 0:
-            if not checked_for_crashes:
-              LOG.info("Checking for crashes")
-              if print_crash_info_if_exists(impala, start_time):
-                sys.exit(runner.exitcode)
-              LOG.info("No crashes detected")
-              checked_for_crashes = True
-            if (
-                self._num_successive_errors.value >=
-                self.num_successive_errors_needed_to_abort
-            ):
-              print(
-                  "Aborting due to %s successive errors encountered"
-                  % self._num_successive_errors.value, file=sys.stderr)
-              sys.exit(1)
-          del self._query_runners[idx]
-      sleep(sleep_secs)
-      if should_print_status:
-        last_report_secs += sleep_secs
-        if last_report_secs > 5:
-          if (
-              not self._query_producer_thread.is_alive() or
-              not self._query_consumer_thread.is_alive() or
-              not self._query_runners
-          ):
-            LOG.debug("Producer is alive: %s" % 
self._query_producer_thread.is_alive())
-            LOG.debug("Consumer is alive: %s" % 
self._query_consumer_thread.is_alive())
-            LOG.debug("Queue size: %s" % self._query_queue.qsize())
-            LOG.debug("Runners: %s" % len(self._query_runners))
-          last_report_secs = 0
-          lines_printed %= 50
-          if lines_printed == 0:
-            self._print_status_header()
-          self._print_status()
-          lines_printed += 1
+    self._wait_for_test_to_finish(impala, should_print_status)
 
     # And print the final state.
     if should_print_status:
       self._print_status()
 
-    if (
-        self._num_other_errors.value > 0 or
-        self._num_result_mismatches.value > 0 or
-        self._num_queries_timedout.value - self._num_queries_cancelled.value > 0
-    ):
-      LOG.error("Failing the stress test due to unexpected errors, incorrect 
results, or "
-                "timed out queries. See the report line above for details.")
-      sys.exit(1)
+    self._check_for_test_failure()
+    self.print_duration()
 
   def _start_producing_queries(self, queries):
     def enqueue_queries():
@@ -713,7 +655,10 @@ class StressRunner(object):
   def _print_status_header(self):
     print(" | ".join(self._status_headers))
 
-  def _print_status(self):
+  def _print_status(self, print_header=False):
+    if print_header:
+      self._print_status_header()
+
     reported_mem, actual_mem = self._get_mem_usage_values(reset=True)
     status_format = " | ".join(["%%%ss" % len(header) for header in 
self._status_headers])
     print(status_format % (
@@ -758,6 +703,76 @@ class StressRunner(object):
     with open(profile_log_path, "w") as profile_log:
       profile_log.write(report.profile)
 
+  def _check_successive_errors(self):
+    if (self._num_successive_errors.value >= 
self.num_successive_errors_needed_to_abort):
+      print(
+          "Aborting due to %s successive errors encountered"
+          % self._num_successive_errors.value, file=sys.stderr)
+      self.print_duration()
+      sys.exit(1)
+
+  def _check_for_test_failure(self):
+    if (
+        self._num_other_errors.value > 0 or
+        self._num_result_mismatches.value > 0 or
+        self._num_queries_timedout.value - self._num_queries_cancelled.value > 0
+    ):
+      LOG.error("Failing the stress test due to unexpected errors, incorrect 
results, or "
+                "timed out queries. See the report line above for details.")
+      self.print_duration()
+      sys.exit(1)
+
+  def _wait_for_test_to_finish(self, impala, should_print_status):
+    last_report_secs = 0
+    lines_printed = 1
+    sleep_secs = 0.1
+
+    while (
+        self._query_producer_thread.is_alive() or
+        self._query_consumer_thread.is_alive() or
+        self._query_runners
+    ):
+      if self._query_producer_thread.error or 
self._query_consumer_thread.error:
+        # This is bad enough to abort early. A failure here probably means 
there's a
+        # bug in this script. The mem poller could be checked for an error 
too. It is
+        # not critical so is ignored.
+        LOG.error("Aborting due to error in producer/consumer")
+        sys.exit(1)
+      checked_for_crashes = False
+      for idx, runner in enumerate(self._query_runners):
+        if runner.exitcode is not None:
+          if runner.exitcode != 0:
+            if not checked_for_crashes:
+              LOG.info("Checking for crashes")
+              if print_crash_info_if_exists(impala, self.start_time):
+                self.print_duration()
+                sys.exit(runner.exitcode)
+              LOG.info("No crashes detected")
+              checked_for_crashes = True
+            self._check_successive_errors()
+          del self._query_runners[idx]
+      sleep(sleep_secs)
+      if should_print_status:
+        last_report_secs += sleep_secs
+        if last_report_secs > 5:
+          if (
+              not self._query_producer_thread.is_alive() or
+              not self._query_consumer_thread.is_alive() or
+              not self._query_runners
+          ):
+            LOG.debug("Producer is alive: %s" % 
self._query_producer_thread.is_alive())
+            LOG.debug("Consumer is alive: %s" % 
self._query_consumer_thread.is_alive())
+            LOG.debug("Queue size: %s" % self._query_queue.qsize())
+            LOG.debug("Runners: %s" % len(self._query_runners))
+          last_report_secs = 0
+          lines_printed %= 50
+          self._print_status(print_header=(lines_printed == 0))
+          lines_printed += 1
+
+  def print_duration(self):
+    duration = datetime.now() - self.start_time
+    LOG.info("Test Duration: {0:.0f} seconds".format(duration.total_seconds()))
+
 
 class QueryTimeout(Exception):
   pass

Reply via email to