Repository: impala Updated Branches: refs/heads/master 07b8aaf6c -> 622e19c5f
IMPALA-7852: Fix some flakiness in test_hash_join_timer.py test_hash_join_timer.py aims to verify the timers in the join nodes are functioning correctly. It does so by parsing the query profile for certain patterns after the query has finished. Before IMPALA-4063, each individual fragment instance will post its profile to the coordinator upon completion. After IMPALA-4063, the profiles of all fragment instances are sent together periodically and the final profile is sent once all fragment instances on a backend are done. The problem with the existing implementation of the test is that it doesn't actually fetch results before closing the query. As a result of it, the coordinator fragment never gets a chance to complete as it will block forever when inserting into the plan root sink. The lack of completion of the coordinator fragment causes the final profiles of fragment instances on the coordinator to be not sent before the query is closed. As a result, the profile of a fragment instance on the coordinator could be stale if it completes between two periodic updates, leading to random test failure. This change fixes the flankiness by always fetching results before closing the query. Ideally, if we fix IMPALA-539 and wait for all backends' final profiles before completing query unregistration, we should get the final profile from the coordinator fragment too. Change-Id: I851824dffb78c7731e60793d90f1e57050c54955 Reviewed-on: http://gerrit.cloudera.org:8080/11964 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d4019be2 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d4019be2 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d4019be2 Branch: refs/heads/master Commit: d4019be2a259b6f5bc14da3261fac36e07f771ad Parents: 07b8aaf Author: Michael Ho <[email protected]> Authored: Mon Nov 19 15:34:09 2018 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Nov 22 01:22:37 2018 +0000 ---------------------------------------------------------------------- be/src/runtime/query-state.cc | 5 +- tests/query_test/test_hash_join_timer.py | 93 +++++++++++++-------------- 2 files changed, 47 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/d4019be2/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 12e8cf2..f706796 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -245,7 +245,10 @@ void QueryState::UpdateBackendExecState() { } } // Send one last report if the query has reached the terminal state. - if (IsTerminalState()) ReportExecStatus(); + if (IsTerminalState()) { + VLOG_QUERY << "UpdateBackendExecState(): last report for " << PrintId(query_id()); + ReportExecStatus(); + } } FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) { http://git-wip-us.apache.org/repos/asf/impala/blob/d4019be2/tests/query_test/test_hash_join_timer.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_hash_join_timer.py b/tests/query_test/test_hash_join_timer.py index c1b0f0f..59dd0a6 100644 --- a/tests/query_test/test_hash_join_timer.py +++ b/tests/query_test/test_hash_join_timer.py @@ -70,6 +70,8 @@ class TestHashJoinTimer(ImpalaTestSuite): # IMPALA-2973: Temporary workaround: when timers are using Linux COARSE clockid_t, very # short times may be measured as zero. HASH_JOIN_LOWER_BOUND_MS = 0 + # Constant of nanoseconds per millisecond + NANOS_PER_MILLI = 1000000 @classmethod def get_workload(self): @@ -84,10 +86,10 @@ class TestHashJoinTimer(ImpalaTestSuite): @classmethod def __is_valid_test_vector(cls, vector): - return vector.get_value('table_format').file_format == 'text' and\ - vector.get_value('table_format').compression_codec == 'none' and\ - vector.get_value('exec_option')['batch_size'] == 0 and\ - vector.get_value('exec_option')['disable_codegen'] == False and\ + return vector.get_value('table_format').file_format == 'text' and \ + vector.get_value('table_format').compression_codec == 'none' and \ + vector.get_value('exec_option')['batch_size'] == 0 and \ + vector.get_value('exec_option')['disable_codegen'] == False and \ vector.get_value('exec_option')['num_nodes'] == 0 @pytest.mark.execute_serially @@ -105,60 +107,51 @@ class TestHashJoinTimer(ImpalaTestSuite): verifier = MetricVerifier(impalad.service) verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0) - # Execute async to get a handle. Wait until the query has completed. - handle = self.execute_query_async(query, vector.get_value('exec_option')) - self.impalad_test_service.wait_for_query_state(self.client, handle, - self.client.QUERY_STATES['FINISHED'], timeout=40) - self.close_query(handle) + # Execute the query. The query summary and profile are stored in 'result'. + result = self.execute_query(query, vector.get_value('exec_option')) - # Parse the query profile - # The hash join node is "id=3". - # In the ExecSummary, search for "03:HASH JOIN" line, column 3 (avg) and 4 (max). - # In the fragment (including average), search for "HASH_JOIN_NODE (id=2)" and the - # non-child time. - # Also verify that the build side is in a different thread by searching for: - # "Join Build-Side Prepared Asynchronously" - profile = self.client.get_runtime_profile(handle) + # Parse the query summary; The join node is "id=3". + # In the ExecSummary, search for the join operator's summary and verify the + # avg and max times are within acceptable limits. + exec_summary = result.exec_summary check_execsummary_count = 0 + join_node_name = "03:%s" % (join_type) + for line in exec_summary: + if line['operator'] == join_node_name: + avg_time_ms = line['avg_time'] / self.NANOS_PER_MILLI + self.__verify_join_time(avg_time_ms, "ExecSummary Avg") + max_time_ms = line['max_time'] / self.NANOS_PER_MILLI + self.__verify_join_time(max_time_ms, "ExecSummary Max") + check_execsummary_count += 1 + assert (check_execsummary_count == 1), \ + "Unable to verify ExecSummary: {0}".format(exec_summary) + + # Parse the query profile; The join node is "id=3". + # In the profiles, search for lines containing "(id=3)" and parse for the avg and + # non-child times to verify that they are within acceptable limits. Also verify + # that the build side is built in a different thread by searching for the string: + # "Join Build-Side Prepared Asynchronously" + profile = result.runtime_profile check_fragment_count = 0 asyn_build = False - for line in profile.split("\n"): - # Matching for ExecSummary - if ("03:%s " % (join_type) in line): - # Sample line: - # 03:HASH JOIN 3 11.89ms 12.543ms 6.57K ... - # Split using "JOIN +", then split the right side with space. This becomes: - # "3","11.89ms","12.543ms",... - # The second column is the average, and the 3rd column is the max - rhs = re.split("JOIN +", line)[1] - columns = re.split(" +", rhs) - self.__verify_join_time(columns[1], "ExecSummary Avg") - self.__verify_join_time(columns[2], "ExecSummary Max") - check_execsummary_count = 1 - # Matching for Fragment (including Average - if ("(id=3)" in line): - # Sample line: - # HASH_JOIN_NODE (id=3):(Total: 3s580ms, non-child: 11.89ms, % non-child: 0.31%) - strip1 = re.split("non-child: ", line)[1] - non_child_time = re.split(", ", strip1)[0] - self.__verify_join_time(non_child_time, "Fragment non-child") - check_fragment_count = check_fragment_count + 1 - # Search for "Join Build-Side Prepared Asynchronously" - if ("Join Build-Side Prepared Asynchronously" in line): - asyn_build = True; - + if ("(id=3)" in line): + # Sample line: + # HASH_JOIN_NODE (id=3):(Total: 3s580ms, non-child: 11.89ms, % non-child: 0.31%) + strip1 = re.split("non-child: ", line)[1] + non_child_time = re.split(", ", strip1)[0] + non_child_time_ms = parse_duration_string_ms(non_child_time) + self.__verify_join_time(non_child_time_ms, "Fragment non-child") + check_fragment_count += 1 + # Search for "Join Build-Side Prepared Asynchronously" + if ("Join Build-Side Prepared Asynchronously" in line): + asyn_build = True assert (asyn_build), "Join is not prepared asynchronously: {0}".format(profile) assert (check_fragment_count > 1), \ "Unable to verify Fragment or Average Fragment: {0}".format(profile) - assert (check_execsummary_count == 1), \ - "Unable to verify ExecSummary: {0}".format(profile) - def __verify_join_time(self, duration, comment): - duration_ms = parse_duration_string_ms(duration) + def __verify_join_time(self, duration_ms, comment): if (duration_ms > self.HASH_JOIN_UPPER_BOUND_MS): - assert False, "Hash join timing too high for %s: %s %s" % ( - comment, duration, duration_ms) + assert False, "Hash join timing too high for %s: %s" % (comment, duration_ms) if (duration_ms < self.HASH_JOIN_LOWER_BOUND_MS): - assert False, "Hash join timing too low for %s: %s %s" % ( - comment, duration, duration_ms) + assert False, "Hash join timing too low for %s: %s" % (comment, duration_ms)
