This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e73e2d40da115ed3804ffaecc0850c853b0e6330 Author: Riza Suminto <[email protected]> AuthorDate: Sat Mar 8 13:28:52 2025 -0800 IMPALA-13864: Implement ImpylaHS2ResultSet.exec_summary This patch implement building exec summary table for ImpylaHS2Connection. It adds fetch_exec_summary argument in ImpalaConnection.execute(). If this argument is True, an exec summary table will be added into the returned result object. fetch_exec_summary is also implemented for BeeswaxConnection. Thus, BeeswaxConnection will not fetch exec summary by default all the time. Tests that validate exec summary table is updated to set fetch_exec_summary=True and migrated to test against hs2 protocol. Change TestExecutorGroup._set_query_options() to do query option setting through hs2_client iconfig instead of SET query. Some flake8 issues are addressed as well. Move build_exec_summary_table to separate exec_summary.py file. Tweak it a bit to return early if given TExecSummary is empty. Fixed bug in ImpalaBeeswaxClient.fetch_results() where fetch will not happen at all if discard_result argument is True. Testing: - Run and pass affected tests locally. Change-Id: I7d88f78e58eeda29ce21e7828884c7a129d7efe6 Reviewed-on: http://gerrit.cloudera.org:8080/22626 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- shell/exec_summary.py | 176 +++++++++++++++++++++++++++ shell/impala_client.py | 153 +---------------------- shell/packaging/make_python_package.sh | 1 + tests/beeswax/impala_beeswax.py | 17 +-- tests/common/impala_connection.py | 88 ++++++++++---- tests/custom_cluster/test_coordinators.py | 3 +- tests/custom_cluster/test_executor_groups.py | 61 ++++++---- tests/query_test/test_hash_join_timer.py | 22 ++-- tests/query_test/test_insert.py | 11 +- tests/query_test/test_observability.py | 56 +++++---- 10 files changed, 344 insertions(+), 244 deletions(-) diff --git a/shell/exec_summary.py b/shell/exec_summary.py new file mode 100755 index 000000000..a7897315c --- /dev/null +++ b/shell/exec_summary.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from ExecStats.ttypes import TExecStats + + +def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, + is_prettyprint=True, separate_prefix_column=False): + """Direct translation of Coordinator::PrintExecSummary() to recursively build a list + of rows of summary statistics, one per exec node + + summary: the TExecSummary object that contains all the summary data + + idx: the index of the node to print + + indent_level: the number of spaces to print before writing the node's label, to give + the appearance of a tree. The 0th child of a node has the same indent_level as its + parent. All other children have an indent_level of one greater than their parent. + + new_indent_level: If true, this indent level is different from the previous row's. + + output: the list of rows into which to append the rows produced for this node and its + children. + + is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty + printed format. + + separate_prefix_column: Optional. If True, the prefix and operator name will be + returned as separate column. Otherwise, prefix and operater name will be concatenated + into single column. + + Returns the index of the next exec node in summary.exec_nodes that should be + processed, used internally to this method only. + """ + if not summary.nodes: + # Summary nodes is empty or None. Nothing to build. + return + assert idx < len(summary.nodes), ( + "Index ({0}) must be less than exec summary count ({1})").format( + idx, len(summary.nodes)) + + attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] + + # Initialise aggregate and maximum stats + agg_stats, max_stats = TExecStats(), TExecStats() + for attr in attrs: + setattr(agg_stats, attr, 0) + setattr(max_stats, attr, 0) + + node = summary.nodes[idx] + instances = 0 + if node.exec_stats: + # exec_stats is not None or an empty list. + instances = len(node.exec_stats) + for stats in node.exec_stats: + for attr in attrs: + val = getattr(stats, attr) + if val is not None: + setattr(agg_stats, attr, getattr(agg_stats, attr) + val) + setattr(max_stats, attr, max(getattr(max_stats, attr), val)) + avg_time = agg_stats.latency_ns / instances + else: + avg_time = 0 + + is_sink = node.node_id == -1 + # If the node is a broadcast-receiving exchange node, the cardinality of rows produced + # is the max over all instances (which should all have received the same number of + # rows). Otherwise, the cardinality is the sum over all instances which process + # disjoint partitions. + if is_sink: + cardinality = -1 + elif node.is_broadcast: + cardinality = max_stats.cardinality + else: + cardinality = agg_stats.cardinality + + est_stats = node.estimated_stats + label_prefix = "" + if indent_level > 0: + label_prefix = "|" + label_prefix += " |" * (indent_level - 1) + if new_indent_level: + label_prefix += "--" + else: + label_prefix += " " + + def prettyprint(val, units, divisor): + for unit in units: + if val < divisor: + if unit == units[0]: + return "%d%s" % (val, unit) + else: + return "%3.2f%s" % (val, unit) + val /= divisor + + def prettyprint_bytes(byte_val): + return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) + + def prettyprint_units(unit_val): + return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) + + def prettyprint_time(time_val): + return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) + + latency = max_stats.latency_ns + cardinality_est = est_stats.cardinality + memory_used = max_stats.memory_used + memory_est = est_stats.memory_used + if (is_prettyprint): + avg_time = prettyprint_time(avg_time) + latency = prettyprint_time(latency) + cardinality = "" if is_sink else prettyprint_units(cardinality) + cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) + memory_used = prettyprint_bytes(memory_used) + memory_est = prettyprint_bytes(memory_est) + + row = list() + if separate_prefix_column: + row.append(label_prefix) + row.append(node.label) + else: + row.append(label_prefix + node.label) + + row.extend([ + node.num_hosts, + instances, + avg_time, + latency, + cardinality, + cardinality_est, + memory_used, + memory_est, + node.label_detail]) + + output.append(row) + try: + sender_idx = summary.exch_to_sender_map[idx] + # This is an exchange node or a join node with a separate builder, so the source + # is a fragment root, and should be printed next. + sender_indent_level = indent_level + node.num_children + sender_new_indent_level = node.num_children > 0 + build_exec_summary_table(summary, sender_idx, sender_indent_level, + sender_new_indent_level, output, is_prettyprint, + separate_prefix_column) + except (KeyError, TypeError): + # Fall through if idx not in map, or if exch_to_sender_map itself is not set + pass + + idx += 1 + if node.num_children > 0: + first_child_output = [] + idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output, + is_prettyprint, separate_prefix_column) + for _ in range(1, node.num_children): + # All other children are indented + idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, + is_prettyprint, separate_prefix_column) + output += first_child_output + return idx diff --git a/shell/impala_client.py b/shell/impala_client.py index 6ed2f5f87..3fc1891e1 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -35,7 +35,6 @@ import uuid from beeswaxd import BeeswaxService from beeswaxd.BeeswaxService import QueryState -from ExecStats.ttypes import TExecStats from ImpalaService import ImpalaService, ImpalaHiveServer2Service from ImpalaService.ImpalaHiveServer2Service import (TGetRuntimeProfileReq, TGetExecSummaryReq, TPingImpalaHS2ServiceReq, TCloseImpalaOperationReq) @@ -46,6 +45,7 @@ from TCLIService.TCLIService import (TExecuteStatementReq, TOpenSessionReq, TOperationState, TFetchResultsReq, TFetchOrientation, TGetLogReq, TGetResultSetMetadataReq, TTypeId, TCancelOperationReq, TCloseOperationReq) from ImpalaHttpClient import ImpalaHttpClient +from exec_summary import build_exec_summary_table from kerberos_util import get_kerb_host_from_kerberos_host_fqdn from thrift.protocol import TBinaryProtocol from thrift_sasl import TSaslClientTransport @@ -110,157 +110,6 @@ RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION" RPC_EXCEPTION_SERVER = "SERVER_ERROR" -def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, - is_prettyprint=True, separate_prefix_column=False): - """Direct translation of Coordinator::PrintExecSummary() to recursively build a list - of rows of summary statistics, one per exec node - - summary: the TExecSummary object that contains all the summary data - - idx: the index of the node to print - - indent_level: the number of spaces to print before writing the node's label, to give - the appearance of a tree. The 0th child of a node has the same indent_level as its - parent. All other children have an indent_level of one greater than their parent. - - new_indent_level: If true, this indent level is different from the previous row's. - - output: the list of rows into which to append the rows produced for this node and its - children. - - is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty - printed format. - - separate_prefix_column: Optional. If True, the prefix and operator name will be - returned as separate column. Otherwise, prefix and operater name will be concatenated - into single column. - - Returns the index of the next exec node in summary.exec_nodes that should be - processed, used internally to this method only. - """ - attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] - - # Initialise aggregate and maximum stats - agg_stats, max_stats = TExecStats(), TExecStats() - for attr in attrs: - setattr(agg_stats, attr, 0) - setattr(max_stats, attr, 0) - - node = summary.nodes[idx] - if node.exec_stats is not None: - for stats in node.exec_stats: - for attr in attrs: - val = getattr(stats, attr) - if val is not None: - setattr(agg_stats, attr, getattr(agg_stats, attr) + val) - setattr(max_stats, attr, max(getattr(max_stats, attr), val)) - - if node.exec_stats is not None and node.exec_stats: - avg_time = agg_stats.latency_ns / len(node.exec_stats) - else: - avg_time = 0 - - is_sink = node.node_id == -1 - # If the node is a broadcast-receiving exchange node, the cardinality of rows produced - # is the max over all instances (which should all have received the same number of - # rows). Otherwise, the cardinality is the sum over all instances which process - # disjoint partitions. - if is_sink: - cardinality = -1 - elif node.is_broadcast: - cardinality = max_stats.cardinality - else: - cardinality = agg_stats.cardinality - - est_stats = node.estimated_stats - label_prefix = "" - if indent_level > 0: - label_prefix = "|" - label_prefix += " |" * (indent_level - 1) - if new_indent_level: - label_prefix += "--" - else: - label_prefix += " " - - def prettyprint(val, units, divisor): - for unit in units: - if val < divisor: - if unit == units[0]: - return "%d%s" % (val, unit) - else: - return "%3.2f%s" % (val, unit) - val /= divisor - - def prettyprint_bytes(byte_val): - return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) - - def prettyprint_units(unit_val): - return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) - - def prettyprint_time(time_val): - return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) - - instances = 0 - if node.exec_stats is not None: - instances = len(node.exec_stats) - latency = max_stats.latency_ns - cardinality_est = est_stats.cardinality - memory_used = max_stats.memory_used - memory_est = est_stats.memory_used - if (is_prettyprint): - avg_time = prettyprint_time(avg_time) - latency = prettyprint_time(latency) - cardinality = "" if is_sink else prettyprint_units(cardinality) - cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) - memory_used = prettyprint_bytes(memory_used) - memory_est = prettyprint_bytes(memory_est) - - row = list() - if separate_prefix_column: - row.append(label_prefix) - row.append(node.label) - else: - row.append(label_prefix + node.label) - - row.extend([ - node.num_hosts, - instances, - avg_time, - latency, - cardinality, - cardinality_est, - memory_used, - memory_est, - node.label_detail]) - - output.append(row) - try: - sender_idx = summary.exch_to_sender_map[idx] - # This is an exchange node or a join node with a separate builder, so the source - # is a fragment root, and should be printed next. - sender_indent_level = indent_level + node.num_children - sender_new_indent_level = node.num_children > 0 - build_exec_summary_table(summary, sender_idx, sender_indent_level, - sender_new_indent_level, output, is_prettyprint, - separate_prefix_column) - except (KeyError, TypeError): - # Fall through if idx not in map, or if exch_to_sender_map itself is not set - pass - - idx += 1 - if node.num_children > 0: - first_child_output = [] - idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output, - is_prettyprint, separate_prefix_column) - for child_idx in xrange(1, node.num_children): - # All other children are indented (we only have 0, 1 or 2 children for every exec - # node at the moment) - idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, - is_prettyprint, separate_prefix_column) - output += first_child_output - return idx - - class QueryOptionLevels: """These are the levels used when displaying query options. The values correspond to the ones in TQueryOptionLevel""" diff --git a/shell/packaging/make_python_package.sh b/shell/packaging/make_python_package.sh index 5ec12047a..a1fc5479b 100755 --- a/shell/packaging/make_python_package.sh +++ b/shell/packaging/make_python_package.sh @@ -63,6 +63,7 @@ assemble_package_files() { cp "${SHELL_HOME}/kerberos_util.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/value_converter.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/thrift_printer.py" "${MODULE_LIB_DIR}" + cp "${SHELL_HOME}/exec_summary.py" "${MODULE_LIB_DIR}" cp "${SHELL_HOME}/packaging/README.md" "${PACKAGE_DIR}" cp "${SHELL_HOME}/packaging/MANIFEST.in" "${PACKAGE_DIR}" diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index fb74146f2..cd1f29bd5 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -184,7 +184,8 @@ class ImpalaBeeswaxClient(object): service='impala', transport_type=trans_type, user=self.user, password=self.password, use_ssl=self.use_ssl) - def execute(self, query_string, user=None, fetch_profile_after_close=False): + def execute(self, query_string, user=None, fetch_profile_after_close=False, + fetch_exec_summary=False): """Re-directs the query to its appropriate handler, returns ImpalaBeeswaxResult""" # Take care of leading/trailing whitespaces. query_string = query_string.strip() @@ -200,7 +201,8 @@ class ImpalaBeeswaxClient(object): # profile and log will be available so fetch them first. runtime_profile = self.get_runtime_profile(handle) - exec_summary = self.get_exec_summary_and_parse(handle) + exec_summary = (self.get_exec_summary_and_parse(handle) if fetch_exec_summary + else None) log = self.get_log(handle.log_context) result = self.fetch_results(query_string, handle) @@ -218,7 +220,8 @@ class ImpalaBeeswaxClient(object): result = self.fetch_results(query_string, handle) result.time_taken = time.time() - start result.start_time = start_time - result.exec_summary = self.get_exec_summary_and_parse(handle) + result.exec_summary = (self.get_exec_summary_and_parse(handle) if fetch_exec_summary + else None) result.log = self.get_log(handle.log_context) if not fetch_profile_after_close: @@ -254,7 +257,7 @@ class ImpalaBeeswaxClient(object): return output def __build_summary_table(self, summary, output): - from shell.impala_client import build_exec_summary_table + from shell.exec_summary import build_exec_summary_table result = list() build_exec_summary_table(summary, 0, 0, False, result, is_prettyprint=False, separate_prefix_column=True) @@ -385,14 +388,14 @@ class ImpalaBeeswaxClient(object): success=True, data=[]) # Result fetching for insert is different from other queries. - exec_result = None - if discard_results: - return exec_result if query_type == 'insert': exec_result = self.__fetch_insert_results(query_handle) else: exec_result = self.__fetch_results(query_handle, max_rows) exec_result.query = query_string + # Check for discard_results arg only after all rows has been fetched. + if discard_results: + return None return exec_result def __fetch_results(self, handle, max_rows=-1): diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index caafe7d42..18e882c32 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -111,6 +111,21 @@ def format_sql_for_logging(sql_stmt): u"\n-- [...]\n").format(len(sql_stmt), truncated_sql) +def build_summary_table_from_thrift(thrift_exec_summary): + from shell.exec_summary import build_exec_summary_table + result = list() + build_exec_summary_table(thrift_exec_summary, 0, 0, False, result, + is_prettyprint=False, separate_prefix_column=True) + keys = ['prefix', 'operator', 'num_hosts', 'num_instances', 'avg_time', 'max_time', + 'num_rows', 'est_num_rows', 'peak_mem', 'est_peak_mem', 'detail'] + output = list() + for row in result: + assert len(keys) == len(row) + summ_map = dict(zip(keys, row)) + output.append(summ_map) + return output + + def collect_default_query_options(options, name, val, kind): if kind == 'REMOVED': return @@ -274,7 +289,9 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): """Cancels an in-flight operation""" pass - def execute(self, sql_stmt): + def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, # noqa: U100 + fetch_exec_summary=False, # noqa: U100 + profile_format=TRuntimeProfileFormat.STRING): # noqa: U100 """Executes a query and fetches the results""" pass @@ -288,6 +305,7 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): @abc.abstractmethod def fetch(self, sql_stmt, operation_handle, max_rows=-1, discard_results=False): """Fetches query results up to max_rows given a handle and sql statement. + Caller must ensure that query has passed PENDING state before calling fetch. If max_rows < 0, all rows are fetched. If max_rows > 0 but the number of rows returned is less than max_rows, all the rows have been fetched. Return None if discard_results is True. @@ -372,6 +390,17 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)): profile""" pass + @abc.abstractmethod + def get_exec_summary(self, operation_handle): # noqa: U100 + pass + + def get_exec_summary_table(self, operation_handle): + summary_table = list() + summary = self.get_exec_summary(operation_handle) + if summary: + summary_table = build_summary_table_from_thrift(summary) + return summary_table + # Represents a connection to Impala using the Beeswax API. class BeeswaxConnection(ImpalaConnection): @@ -455,11 +484,15 @@ class BeeswaxConnection(ImpalaConnection): self.log_handle(operation_handle, 'closing DML query') self.__beeswax_client.close_dml(operation_handle.get_handle()) - def execute(self, sql_stmt, user=None, fetch_profile_after_close=False): + def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, + fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING): + assert profile_format == TRuntimeProfileFormat.STRING, ( + "Beeswax client only supports getting runtime profile in STRING format.") self.log_client(u"executing against {0}\n{1}".format( self.__host_port, format_sql_for_logging(sql_stmt))) return self.__beeswax_client.execute(sql_stmt, user=user, - fetch_profile_after_close=fetch_profile_after_close) + fetch_profile_after_close=fetch_profile_after_close, + fetch_exec_summary=fetch_exec_summary) def execute_async(self, sql_stmt, user=None): self.log_client(u"executing async {0}\n{1}".format( @@ -485,7 +518,7 @@ class BeeswaxConnection(ImpalaConnection): def get_runtime_profile(self, operation_handle, profile_format=TRuntimeProfileFormat.STRING): assert profile_format == TRuntimeProfileFormat.STRING, ( - "Beeswax client only support getting runtime profile in STRING format.") + "Beeswax client only supports getting runtime profile in STRING format.") self.log_handle(operation_handle, 'getting runtime profile operation') return self.__beeswax_client.get_runtime_profile(operation_handle.get_handle()) @@ -690,8 +723,8 @@ class ImpylaHS2Connection(ImpalaConnection): format_sql_for_logging(sql_stmt)) ) - def execute(self, sql_stmt, user=None, profile_format=TRuntimeProfileFormat.STRING, - fetch_profile_after_close=False): + def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, + fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING): cursor = self.__cursor result = None try: @@ -704,8 +737,8 @@ class ImpylaHS2Connection(ImpalaConnection): self.log_handle(handle, "started query in session {0}".format( self.__get_session_id(cursor))) result = self.__fetch_results_and_profile( - handle, profile_format=profile_format, - fetch_profile_after_close=fetch_profile_after_close) + handle, fetch_profile_after_close=fetch_profile_after_close, + fetch_exec_summary=fetch_exec_summary, profile_format=profile_format) finally: cursor.close_operation() if cursor != self.__cursor: @@ -713,11 +746,12 @@ class ImpylaHS2Connection(ImpalaConnection): return result def __fetch_results_and_profile( - self, operation_handle, profile_format=TRuntimeProfileFormat.STRING, - fetch_profile_after_close=False): + self, operation_handle, fetch_profile_after_close=False, + fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING): r = None try: - r = self.__fetch_results(operation_handle, profile_format=profile_format) + r = self.__fetch_results(operation_handle, fetch_exec_summary=fetch_exec_summary, + profile_format=profile_format) finally: if r is None: # Try to close the query handle but ignore any exceptions not to replace the @@ -860,6 +894,7 @@ class ImpylaHS2Connection(ImpalaConnection): def __fetch_results(self, handle, max_rows=-1, discard_results=False, + fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING): """Implementation of result fetching from handle.""" cursor = handle.get_handle() @@ -877,20 +912,25 @@ class ImpylaHS2Connection(ImpalaConnection): else: result_tuples = cursor.fetchmany(max_rows) - if not self._is_hive and self._collect_profile_and_log: - log = self.get_log(handle) - profile = self.get_runtime_profile(handle, profile_format=profile_format) - else: - log = None - profile = None result = None - if discard_results: return result + + log = None + profile = None + exec_summary = None + if not self._is_hive: + if fetch_exec_summary: + exec_summary = self.get_exec_summary_table(handle) + if self._collect_profile_and_log: + log = self.get_log(handle) + profile = self.get_runtime_profile(handle, profile_format=profile_format) + result = ImpylaHS2ResultSet(success=True, result_tuples=result_tuples, column_labels=column_labels, column_types=column_types, query=handle.sql_stmt(), log=log, profile=profile, - query_id=self.get_query_id(handle)) + query_id=self.get_query_id(handle), + exec_summary=exec_summary) return result @@ -898,7 +938,7 @@ class ImpylaHS2ResultSet(object): """This emulates the interface of ImpalaBeeswaxResult so that it can be used in place of it. TODO: when we deprecate/remove Beeswax, clean this up.""" def __init__(self, success, result_tuples, column_labels, column_types, query, log, - profile, query_id): + profile, query_id, exec_summary): self.success = success self.column_labels = column_labels self.column_types = column_types @@ -913,6 +953,7 @@ class ImpylaHS2ResultSet(object): self.data = None if result_tuples is not None: self.data = [self.__convert_result_row(tuple) for tuple in result_tuples] + self.exec_summary = exec_summary def tuples(self): """Return the raw HS2 result set, which is a list of tuples.""" @@ -995,7 +1036,9 @@ class MinimalHS2Connection(ImpalaConnection): finally: self.__conn.close() - def execute(self, sql_stmt): # noqa: U100 + def execute(self, sql_stmt, user=None, fetch_profile_after_close=False, # noqa: U100 + fetch_exec_summary=False, # noqa: U100 + profile_format=TRuntimeProfileFormat.STRING): # noqa: U100 raise NotImplementedError() def execute_async(self, sql_stmt): @@ -1085,3 +1128,6 @@ class MinimalHS2Connection(ImpalaConnection): def wait_for_admission_control(self, operation_handle, timeout_s=60): # noqa: U100 raise NotImplementedError() + + def get_exec_summary(self, operation_handle): # noqa: U100 + raise NotImplementedError() diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py index f9b882919..1f3c4b053 100644 --- a/tests/custom_cluster/test_coordinators.py +++ b/tests/custom_cluster/test_coordinators.py @@ -110,7 +110,8 @@ class TestCoordinators(CustomClusterTestSuite): client = coordinator.service.create_beeswax_client() assert client is not None query = "select count(*) from functional.alltypesagg" - result = self.execute_query_expect_success(client, query) + result = client.execute(query, fetch_exec_summary=True) + assert result.success # Verify that SCAN and AGG are executed on the expected number of # executor nodes for rows in result.exec_summary: diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index af08f4890..48df14ad4 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -987,18 +987,17 @@ class TestExecutorGroups(CustomClusterTestSuite): exec_group_set_prefix="root.large") == 1 def _set_query_options(self, query_options): - """Set query options by running it as an SQL statement. - To mimic impala-shell behavior, use self.client.set_configuration() instead. - """ + """Set query options by setting client configuration.""" for k, v in query_options.items(): - self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, v)) + self.hs2_client.set_configuration_option(k, v) - def _run_query_and_verify_profile(self, query, - expected_strings_in_profile, not_expected_in_profile=[]): + def _run_query_and_verify_profile(self, query, expected_strings_in_profile, + not_expected_in_profile=[], + fetch_exec_summary=False): """Run 'query' and assert existence of 'expected_strings_in_profile' and nonexistence of 'not_expected_in_profile' in query profile. - Caller is reponsible to close self.client at the end of test.""" - result = self.execute_query_expect_success(self.client, query) + Caller is reponsible to close self.hs2_client at the end of test.""" + result = self.hs2_client.execute(query, fetch_exec_summary=fetch_exec_summary) profile = str(result.runtime_profile) for expected_profile in expected_strings_in_profile: assert expected_profile in profile, ( @@ -1050,7 +1049,6 @@ class TestExecutorGroups(CustomClusterTestSuite): def test_query_cpu_count_divisor_default(self, unique_database): coordinator_test_args = "-gen_experimental_profile=true" self._setup_three_exec_group_cluster(coordinator_test_args) - self.client.clear_configuration() # The default query options for this test. # Some test case will change these options along the test, but should eventually @@ -1091,7 +1089,7 @@ class TestExecutorGroups(CustomClusterTestSuite): # Create small table based on tpcds_parquet.store_sales that will be used later # for COMPUTE STATS test. Forcing large parallelism to speed up CTAS. # Otherwise, query will go to tiny pool. - self.client.set_configuration({ + self._set_query_options({ 'REQUEST_POOL': 'root.large', 'PROCESSING_COST_MIN_THREADS': '4'}) self._run_query_and_verify_profile( @@ -1102,7 +1100,7 @@ class TestExecutorGroups(CustomClusterTestSuite): ["Executor Group: root.large", "ExecutorGroupsConsidered: 1", "Verdict: query option REQUEST_POOL=root.large is set. " "Memory and cpu limit checking is skipped."]) - self.client.set_configuration({ + self._set_query_options({ 'REQUEST_POOL': '', 'PROCESSING_COST_MIN_THREADS': ''}) @@ -1128,14 +1126,13 @@ class TestExecutorGroups(CustomClusterTestSuite): # Test that child queries follow REQUEST_POOL that is set through client # configuration. Two child queries should all run in root.small. - self.client.set_configuration({'REQUEST_POOL': 'root.small'}) + self._set_query_options({'REQUEST_POOL': 'root.small'}) self._run_query_and_verify_profile(compute_stats_query, ["Query Options (set by configuration): REQUEST_POOL=root.small", "ExecutorGroupsConsidered: 1", "Verdict: Assign to first group because query is not auto-scalable"], ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"]) self._verify_total_admitted_queries("root.small", 2) - self.client.clear_configuration() # Test that child queries follow REQUEST_POOL that is set through SQL statement. # Two child queries should all run in root.large. @@ -1239,7 +1236,7 @@ class TestExecutorGroups(CustomClusterTestSuite): "ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2", "CpuAsk: 2", "CpuAskBounded: 2"]) self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'}) - result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY) + result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY) status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than " r"MAX_FRAGMENT_INSTANCES_PER_NODE \(1\).") assert re.search(status, str(result)) @@ -1387,7 +1384,6 @@ class TestExecutorGroups(CustomClusterTestSuite): def test_query_cpu_count_on_insert(self, unique_database): coordinator_test_args = "-gen_experimental_profile=true" self._setup_three_exec_group_cluster(coordinator_test_args) - self.client.clear_configuration() # The default query options for this test. # Some test case will change these options along the test, but should eventually @@ -1404,7 +1400,8 @@ class TestExecutorGroups(CustomClusterTestSuite): "select id, year from functional_parquet.alltypes" ).format(unique_database, "test_ctas1"), ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", - "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"]) + "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 1, [0, 1]) # Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit. @@ -1414,7 +1411,8 @@ class TestExecutorGroups(CustomClusterTestSuite): "select id, year from functional_parquet.alltypes limit 100000" ).format(unique_database, "test_ctas2"), ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", - "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"]) + "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 1, [0, 2]) # Test partitioned insert, small scan, no MAX_FS_WRITER. @@ -1424,7 +1422,8 @@ class TestExecutorGroups(CustomClusterTestSuite): "select id, year from functional_parquet.alltypes" ).format(unique_database, "test_ctas3"), ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", - "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"]) + "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 1, [0, 1]) store_sales_no_part_col = ( @@ -1445,7 +1444,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("create table {0}.{1} as {2}").format( unique_database, "test_ctas4", big_select), ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", - "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"]) + "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 2, [0, 1, 1]) # Test partitioned insert, large scan, no MAX_FS_WRITER. @@ -1453,7 +1453,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format( unique_database, "test_ctas5", big_select), ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", - "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"]) + "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 3, [0, 3, 3, 3]) # Test partitioned insert, large scan, high MAX_FS_WRITER. @@ -1462,7 +1463,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format( unique_database, "test_ctas6", big_select), ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", - "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"]) + "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 3, [0, 3, 3, 3]) # Test partitioned insert, large scan, low MAX_FS_WRITER. @@ -1471,7 +1473,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format( unique_database, "test_ctas7", big_select), ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", - "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"]) + "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 2, [0, 2, 3, 3]) # Test unpartitioned insert overwrite. MAX_FS_WRITER=2. @@ -1479,7 +1482,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("insert overwrite {0}.{1} {2}").format( unique_database, "test_ctas4", big_select), ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", - "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"]) + "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 2, [0, 1, 1]) # Test partitioned insert overwrite. MAX_FS_WRITER=2. @@ -1487,7 +1491,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("insert overwrite {0}.{1} ({2}) partition (ss_store_sk) {3}").format( unique_database, "test_ctas7", store_sales_no_part_col, big_select), ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3", - "Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"]) + "Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 2, [0, 2, 3, 3]) # Test unpartitioned insert overwrite. MAX_FS_WRITER=1. @@ -1496,7 +1501,8 @@ class TestExecutorGroups(CustomClusterTestSuite): ("insert overwrite {0}.{1} {2}").format( unique_database, "test_ctas4", big_select), ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2", - "Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"]) + "Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 1, [0, 3, 4]) # Unset MAX_FS_WRITERS. @@ -1510,7 +1516,8 @@ class TestExecutorGroups(CustomClusterTestSuite): "where ss_store_sk=1").format( unique_database, "test_ctas7", store_sales_no_part_col, store_sales_columns), ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1", - "Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "| partitions=6"]) + "Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "| partitions=6"], + fetch_exec_summary=True) self.__verify_fs_writers(result, 1, [0, 1]) # END testing insert + MAX_FS_WRITER @@ -1628,7 +1635,7 @@ class TestExecutorGroups(CustomClusterTestSuite): self._set_query_options({ 'COMPUTE_PROCESSING_COST': 'true', 'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'}) - result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY) + result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY) assert ("AnalysisException: The query does not fit largest executor group sets. " "Reason: not enough cpu cores (require=300, max=192).") in str(result) diff --git a/tests/query_test/test_hash_join_timer.py b/tests/query_test/test_hash_join_timer.py index 5a340d0a9..7deddd5e4 100644 --- a/tests/query_test/test_hash_join_timer.py +++ b/tests/query_test/test_hash_join_timer.py @@ -21,6 +21,7 @@ import re from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.test_dimensions import hs2_client_protocol_dimension from tests.common.test_vector import ImpalaTestDimension from tests.util.parse_util import parse_duration_string_ms from tests.verifiers.metric_verifier import MetricVerifier @@ -45,7 +46,9 @@ class TestHashJoinTimer(ImpalaTestSuite): # Fully hint the queries so that the plan will not change. # Each test case contain a query, the join type. - TEST_CASES = [["select /*+straight_join*/ count(*) from" + TEST_CASES = \ + [ + ["select /*+straight_join*/ count(*) from" " (select distinct * from functional.alltypes where int_col >= sleep(5)) a" " join /* +SHUFFLE */ functional.alltypes b on (a.id=b.id)", "HASH JOIN"], @@ -64,7 +67,7 @@ class TestHashJoinTimer(ImpalaTestSuite): " (select distinct * from functional.alltypes where int_col >= sleep(5)) b" " where a.id>b.id and a.id=99", "NESTED LOOP JOIN"] - ] + ] # IMPALA-2973: For non-code-coverage builds, 1000 milliseconds are sufficient, but more # time is needed in code-coverage builds. HASH_JOIN_UPPER_BOUND_MS = 2000 @@ -83,15 +86,16 @@ class TestHashJoinTimer(ImpalaTestSuite): super(TestHashJoinTimer, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('test cases', *cls.TEST_CASES)) + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) cls.ImpalaTestMatrix.add_constraint(lambda v: cls.__is_valid_test_vector(v)) @classmethod def __is_valid_test_vector(cls, vector): - return vector.get_value('table_format').file_format == 'text' and \ - vector.get_value('table_format').compression_codec == 'none' and \ - vector.get_value('exec_option')['batch_size'] == 0 and \ - vector.get_value('exec_option')['disable_codegen'] == False and \ - vector.get_value('exec_option')['num_nodes'] == 0 + return (vector.get_value('table_format').file_format == 'text' + and vector.get_value('table_format').compression_codec == 'none' + and vector.get_value('exec_option')['batch_size'] == 0 + and not vector.get_value('exec_option')['disable_codegen'] + and vector.get_value('exec_option')['num_nodes'] == 0) @pytest.mark.execute_serially def test_hash_join_timer(self, vector): @@ -109,7 +113,9 @@ class TestHashJoinTimer(ImpalaTestSuite): verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0) # Execute the query. The query summary and profile are stored in 'result'. - result = self.execute_query(query, vector.get_value('exec_option')) + result = None + with self.create_impala_client_from_vector(vector) as client: + result = client.execute(query, fetch_exec_summary=True) # Parse the query summary; The join node is "id=3". # In the ExecSummary, search for the join operator's summary and verify the diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index db24c0b53..4db607d8d 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -37,6 +37,7 @@ from tests.common.test_dimensions import ( from tests.common.test_result_verifier import ( QueryTestResult, parse_result_rows) +from tests.common.test_vector import HS2 from tests.verifiers.metric_verifier import MetricVerifier @@ -237,8 +238,8 @@ class TestInsertWideTable(ImpalaTestSuite): # Don't run on core. This test is very slow (IMPALA-864) and we are unlikely to # regress here. - if cls.exploration_strategy() == 'core': - cls.ImpalaTestMatrix.add_constraint(lambda v: False) + if cls.exploration_strategy() != 'exhaustive': + pytest.skip("Test only run in exhaustive exploration.") @SkipIfLocal.parquet_file_size def test_insert_wide_table(self, vector, unique_database): @@ -437,6 +438,10 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite): def get_workload(self): return 'functional-query' + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestInsertHdfsWriterLimit, cls).add_test_dimensions() @@ -544,7 +549,7 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite): # and running. self.impalad_test_service.wait_for_metric_value("cluster-membership.backends.total", 3) - result = self.client.execute(query) + result = self.client.execute(query, fetch_exec_summary=True) assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile if (max_fs_writers > 0): num_writers = int(result.exec_summary[0]['num_instances']) diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py index 7a4ea2561..34b2d1505 100644 --- a/tests/query_test/test_observability.py +++ b/tests/query_test/test_observability.py @@ -22,7 +22,8 @@ from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.impala_cluster import ImpalaCluster from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfFS, SkipIfLocal, SkipIfNotHdfsMinicluster -from tests.util.filesystem_utils import IS_EC, WAREHOUSE +from tests.common.test_vector import HS2 +from tests.util.filesystem_utils import WAREHOUSE from tests.util.parse_util import get_duration_us_from_str from time import sleep, time from RuntimeProfile.ttypes import TRuntimeProfileFormat @@ -35,6 +36,10 @@ class TestObservability(ImpalaTestSuite): def get_workload(self): return 'functional-query' + @classmethod + def default_test_protocol(cls): + return HS2 + def test_merge_exchange_num_rows(self): """Regression test for IMPALA-1473 - checks that the exec summary for a merging exchange with a limit reports the number of rows returned as equal to the limit, @@ -42,7 +47,7 @@ class TestObservability(ImpalaTestSuite): of rows returned correctly.""" query = """select tinyint_col, count(*) from functional.alltypes group by tinyint_col order by tinyint_col limit 5""" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) exchange = result.exec_summary[1] assert exchange['operator'] == '05:MERGING-EXCHANGE' assert exchange['num_rows'] == 5 @@ -61,7 +66,7 @@ class TestObservability(ImpalaTestSuite): query = """select distinct a.int_col, a.string_col from functional.alltypes a inner join functional.alltypessmall b on (a.id = b.id) where a.year = 2009 and b.month = 2""" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) exchange = result.exec_summary[8] assert exchange['operator'] == '04:EXCHANGE' assert exchange['num_rows'] == 25 @@ -98,21 +103,21 @@ class TestObservability(ImpalaTestSuite): """IMPALA-4499: Checks that the exec summary for scans show the table name.""" # HDFS table query = "select count(*) from functional.alltypestiny" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) scan_idx = len(result.exec_summary) - 1 assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS' assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny' # KUDU table query = "select count(*) from functional_kudu.alltypestiny" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) scan_idx = len(result.exec_summary) - 1 assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU' assert result.exec_summary[scan_idx]['detail'] == 'functional_kudu.alltypestiny' # HBASE table query = "select count(*) from functional_hbase.alltypestiny" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) scan_idx = len(result.exec_summary) - 1 assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE' assert result.exec_summary[scan_idx]['detail'] == 'functional_hbase.alltypestiny' @@ -121,7 +126,7 @@ class TestObservability(ImpalaTestSuite): """IMPALA-1048: Checks that the exec summary contains sinks.""" # SELECT query. query = "select count(*) from functional.alltypes" - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) # Sanity-check the root sink. root_sink = result.exec_summary[0] assert root_sink['operator'] == 'F01:ROOT' @@ -147,7 +152,7 @@ class TestObservability(ImpalaTestSuite): # INSERT query. query = "create table {0}.tmp as select count(*) from functional.alltypes".format( unique_database) - result = self.execute_query(query) + result = self.client.execute(query, fetch_exec_summary=True) # Sanity-check the HDFS writer sink. assert result.exec_summary[0]['operator'] == 'F01:HDFS WRITER' assert result.exec_summary[0]['max_time'] >= 0 @@ -633,7 +638,7 @@ class TestObservability(ImpalaTestSuite): "create table %s as select * from functional.alltypestiny" % table_name) results = self.execute_query("compute stats %s" % table_name) # Search for all query ids (max length 33) in the profile. - matches = re.findall("Query \(id=.{,33}\)", results.runtime_profile) + matches = re.findall(r"Query \(id=.{,33}\)", results.runtime_profile) query_ids = [] for query_id in matches: if query_id not in query_ids: @@ -673,7 +678,7 @@ class TestObservability(ImpalaTestSuite): for key in keys: if key in line: # Match byte count within parentheses - m = re.search("\(([0-9]+)\)", line) + m = re.search(r"\(([0-9]+)\)", line) # If a match was not found, then the value of the key should be 0 if not m: @@ -686,8 +691,8 @@ class TestObservability(ImpalaTestSuite): # All counters have values assert all(counters[key] > 0 for key in keys) - assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] + - counters["TotalInnerBytesSent"]) + assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] + + counters["TotalInnerBytesSent"]) def test_query_profile_contains_host_resource_usage(self): """Tests that the profile contains a sub-profile with per node resource usage.""" @@ -914,12 +919,12 @@ class TestObservability(ImpalaTestSuite): assert "Travel:" in runtime_profile assert "HashCollisions:" in runtime_profile assert "Resizes:" in runtime_profile - nprobes = re.search('Probes:.*\((\d+)\)', runtime_profile) + nprobes = re.search(r'Probes:.*\((\d+)\)', runtime_profile) # Probes and travel can be 0. The number can be an integer or float with K. # The number extracted is the number inside parenthesis, which is always # an integer. assert nprobes and len(nprobes.groups()) == 1 and int(nprobes.group(1)) >= 0 - ntravel = re.search('Travel:.*\((\d+)\)', runtime_profile) + ntravel = re.search(r'Travel:.*\((\d+)\)', runtime_profile) assert ntravel and len(ntravel.groups()) == 1 and int(ntravel.group(1)) >= 0 def test_query_profle_hashtable(self): @@ -955,7 +960,7 @@ class TestObservability(ImpalaTestSuite): assert results.success "When the skew summary is seen, look for the details" - skews_found = 'skew\(s\) found at:.*HASH_JOIN.*HASH_JOIN.*HDFS_SCAN_NODE' + skews_found = r'skew\(s\) found at:.*HASH_JOIN.*HASH_JOIN.*HDFS_SCAN_NODE' if len(re.findall(skews_found, results.runtime_profile, re.M)) == 1: "Expect to see skew details twice at the hash join nodes." @@ -981,20 +986,21 @@ class TestObservability(ImpalaTestSuite): @SkipIfNotHdfsMinicluster.plans def test_reduced_cardinality_by_filter(self): """IMPALA-12702: Check that ExecSummary shows the reduced cardinality estimation.""" - query_opts = {'compute_processing_cost': True} query = """select STRAIGHT_JOIN count(*) from (select l_orderkey from tpch_parquet.lineitem) a join (select o_orderkey, o_custkey from tpch_parquet.orders) l1 on a.l_orderkey = l1.o_orderkey where l1.o_custkey < 1000""" - result = self.execute_query(query, query_opts) - scan = result.exec_summary[10] - assert '00:SCAN' in scan['operator'] - assert scan['num_rows'] == 39563 - assert scan['est_num_rows'] == 575771 - assert scan['detail'] == 'tpch_parquet.lineitem' - runtime_profile = result.runtime_profile - assert "cardinality=575.77K(filtered from 6.00M)" in runtime_profile + with self.create_impala_client() as client: + client.set_configuration_option('compute_processing_cost', 1) + result = client.execute(query, fetch_exec_summary=True) + scan = result.exec_summary[10] + assert '00:SCAN' in scan['operator'] + assert scan['num_rows'] == 39563 + assert scan['est_num_rows'] == 575771 + assert scan['detail'] == 'tpch_parquet.lineitem' + runtime_profile = result.runtime_profile + assert "cardinality=575.77K(filtered from 6.00M)" in runtime_profile def test_query_profile_contains_get_inflight_profile_counter(self): """Test that counter for getting inflight profiles appears in the profile""" @@ -1080,4 +1086,4 @@ class TestQueryStates(ImpalaTestSuite): def __is_line_in_profile(self, line, profile): """Returns true if the given 'line' is in the given 'profile'. A single line of the profile must exactly match the given 'line' (excluding whitespaces).""" - return re.search("^\s*{0}\s*$".format(line), profile, re.M) + return re.search(r"^\s*{0}\s*$".format(line), profile, re.M)
