Repository: impala
Updated Branches:
  refs/heads/master 07b8aaf6c -> 622e19c5f


IMPALA-7852: Fix some flakiness in test_hash_join_timer.py

test_hash_join_timer.py aims to verify the timers in the
join nodes are functioning correctly. It does so by parsing
the query profile for certain patterns after the query has
finished.

Before IMPALA-4063, each individual fragment instance will
post its profile to the coordinator upon completion. After
IMPALA-4063, the profiles of all fragment instances are sent
together periodically and the final profile is sent once all
fragment instances on a backend are done.

The problem with the existing implementation of the test is
that it doesn't actually fetch results before closing the
query. As a result of it, the coordinator fragment never gets
a chance to complete as it will block forever when inserting
into the plan root sink. The lack of completion of the
coordinator fragment causes the final profiles of fragment
instances on the coordinator to be not sent before the query
is closed. As a result, the profile of a fragment instance
on the coordinator could be stale if it completes between
two periodic updates, leading to random test failure.

This change fixes the flankiness by always fetching results
before closing the query. Ideally, if we fix IMPALA-539 and
wait for all backends' final profiles before completing query
unregistration, we should get the final profile from the
coordinator fragment too.

Change-Id: I851824dffb78c7731e60793d90f1e57050c54955
Reviewed-on: http://gerrit.cloudera.org:8080/11964
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d4019be2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d4019be2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d4019be2

Branch: refs/heads/master
Commit: d4019be2a259b6f5bc14da3261fac36e07f771ad
Parents: 07b8aaf
Author: Michael Ho <[email protected]>
Authored: Mon Nov 19 15:34:09 2018 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Nov 22 01:22:37 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/query-state.cc            |  5 +-
 tests/query_test/test_hash_join_timer.py | 93 +++++++++++++--------------
 2 files changed, 47 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d4019be2/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 12e8cf2..f706796 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -245,7 +245,10 @@ void QueryState::UpdateBackendExecState() {
     }
   }
   // Send one last report if the query has reached the terminal state.
-  if (IsTerminalState()) ReportExecStatus();
+  if (IsTerminalState()) {
+    VLOG_QUERY << "UpdateBackendExecState(): last report for " << 
PrintId(query_id());
+    ReportExecStatus();
+  }
 }
 
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& 
instance_id) {

http://git-wip-us.apache.org/repos/asf/impala/blob/d4019be2/tests/query_test/test_hash_join_timer.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_hash_join_timer.py 
b/tests/query_test/test_hash_join_timer.py
index c1b0f0f..59dd0a6 100644
--- a/tests/query_test/test_hash_join_timer.py
+++ b/tests/query_test/test_hash_join_timer.py
@@ -70,6 +70,8 @@ class TestHashJoinTimer(ImpalaTestSuite):
   # IMPALA-2973: Temporary workaround: when timers are using Linux COARSE 
clockid_t, very
   # short times may be measured as zero.
   HASH_JOIN_LOWER_BOUND_MS = 0
+  # Constant of nanoseconds per millisecond
+  NANOS_PER_MILLI = 1000000
 
   @classmethod
   def get_workload(self):
@@ -84,10 +86,10 @@ class TestHashJoinTimer(ImpalaTestSuite):
 
   @classmethod
   def __is_valid_test_vector(cls, vector):
-    return vector.get_value('table_format').file_format == 'text' and\
-        vector.get_value('table_format').compression_codec == 'none' and\
-        vector.get_value('exec_option')['batch_size'] == 0 and\
-        vector.get_value('exec_option')['disable_codegen'] == False and\
+    return vector.get_value('table_format').file_format == 'text' and \
+        vector.get_value('table_format').compression_codec == 'none' and \
+        vector.get_value('exec_option')['batch_size'] == 0 and \
+        vector.get_value('exec_option')['disable_codegen'] == False and \
         vector.get_value('exec_option')['num_nodes'] == 0
 
   @pytest.mark.execute_serially
@@ -105,60 +107,51 @@ class TestHashJoinTimer(ImpalaTestSuite):
       verifier = MetricVerifier(impalad.service)
       verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0)
 
-    # Execute async to get a handle. Wait until the query has completed.
-    handle = self.execute_query_async(query, vector.get_value('exec_option'))
-    self.impalad_test_service.wait_for_query_state(self.client, handle,
-        self.client.QUERY_STATES['FINISHED'], timeout=40)
-    self.close_query(handle)
+    # Execute the query. The query summary and profile are stored in 'result'.
+    result = self.execute_query(query, vector.get_value('exec_option'))
 
-    # Parse the query profile
-    # The hash join node is "id=3".
-    # In the ExecSummary, search for "03:HASH JOIN" line, column 3 (avg) and 4 
(max).
-    # In the fragment (including average), search for "HASH_JOIN_NODE (id=2)" 
and the
-    # non-child time.
-    # Also verify that the build side is in a different thread by searching 
for:
-    #     "Join Build-Side Prepared Asynchronously"
-    profile = self.client.get_runtime_profile(handle)
+    # Parse the query summary; The join node is "id=3".
+    # In the ExecSummary, search for the join operator's summary and verify the
+    # avg and max times are within acceptable limits.
+    exec_summary = result.exec_summary
     check_execsummary_count = 0
+    join_node_name = "03:%s" % (join_type)
+    for line in exec_summary:
+      if line['operator'] == join_node_name:
+        avg_time_ms = line['avg_time'] / self.NANOS_PER_MILLI
+        self.__verify_join_time(avg_time_ms, "ExecSummary Avg")
+        max_time_ms = line['max_time'] / self.NANOS_PER_MILLI
+        self.__verify_join_time(max_time_ms, "ExecSummary Max")
+        check_execsummary_count += 1
+    assert (check_execsummary_count == 1), \
+        "Unable to verify ExecSummary: {0}".format(exec_summary)
+
+    # Parse the query profile; The join node is "id=3".
+    # In the profiles, search for lines containing "(id=3)" and parse for the 
avg and
+    # non-child times to verify that they are within acceptable limits. Also 
verify
+    # that the build side is built in a different thread by searching for the 
string:
+    # "Join Build-Side Prepared Asynchronously"
+    profile = result.runtime_profile
     check_fragment_count = 0
     asyn_build = False
-
     for line in profile.split("\n"):
-        # Matching for ExecSummary
-        if ("03:%s  " % (join_type) in line):
-            # Sample line:
-            # 03:HASH JOIN           3    11.89ms   12.543ms  6.57K  ...
-            # Split using "JOIN +", then split the right side with space. This 
becomes:
-            #   "3","11.89ms","12.543ms",...
-            # The second column is the average, and the 3rd column is the max
-            rhs = re.split("JOIN +", line)[1]
-            columns = re.split(" +", rhs)
-            self.__verify_join_time(columns[1], "ExecSummary Avg")
-            self.__verify_join_time(columns[2], "ExecSummary Max")
-            check_execsummary_count = 1
-        # Matching for Fragment (including Average
-        if ("(id=3)" in line):
-            # Sample line:
-            # HASH_JOIN_NODE (id=3):(Total: 3s580ms, non-child: 11.89ms, % 
non-child: 0.31%)
-            strip1 = re.split("non-child: ", line)[1]
-            non_child_time = re.split(", ", strip1)[0]
-            self.__verify_join_time(non_child_time, "Fragment non-child")
-            check_fragment_count = check_fragment_count + 1
-        # Search for "Join Build-Side Prepared Asynchronously"
-        if ("Join Build-Side Prepared Asynchronously" in line):
-            asyn_build = True;
-
+      if ("(id=3)" in line):
+        # Sample line:
+        # HASH_JOIN_NODE (id=3):(Total: 3s580ms, non-child: 11.89ms, % 
non-child: 0.31%)
+        strip1 = re.split("non-child: ", line)[1]
+        non_child_time = re.split(", ", strip1)[0]
+        non_child_time_ms = parse_duration_string_ms(non_child_time)
+        self.__verify_join_time(non_child_time_ms, "Fragment non-child")
+        check_fragment_count += 1
+      # Search for "Join Build-Side Prepared Asynchronously"
+      if ("Join Build-Side Prepared Asynchronously" in line):
+        asyn_build = True
     assert (asyn_build), "Join is not prepared asynchronously: 
{0}".format(profile)
     assert (check_fragment_count > 1), \
         "Unable to verify Fragment or Average Fragment: {0}".format(profile)
-    assert (check_execsummary_count == 1), \
-        "Unable to verify ExecSummary: {0}".format(profile)
 
-  def __verify_join_time(self, duration, comment):
-    duration_ms = parse_duration_string_ms(duration)
+  def __verify_join_time(self, duration_ms, comment):
     if (duration_ms > self.HASH_JOIN_UPPER_BOUND_MS):
-      assert False, "Hash join timing too high for %s: %s %s" % (
-        comment, duration, duration_ms)
+      assert False, "Hash join timing too high for %s: %s" % (comment, 
duration_ms)
     if (duration_ms < self.HASH_JOIN_LOWER_BOUND_MS):
-      assert False, "Hash join timing too low for %s: %s %s" % (
-        comment, duration, duration_ms)
+      assert False, "Hash join timing too low for %s: %s" % (comment, 
duration_ms)

Reply via email to