This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e73e2d40da115ed3804ffaecc0850c853b0e6330
Author: Riza Suminto <[email protected]>
AuthorDate: Sat Mar 8 13:28:52 2025 -0800

    IMPALA-13864: Implement ImpylaHS2ResultSet.exec_summary
    
    This patch implement building exec summary table for
    ImpylaHS2Connection. It adds fetch_exec_summary argument in
    ImpalaConnection.execute(). If this argument is True, an exec summary
    table will be added into the returned result object.
    
    fetch_exec_summary is also implemented for BeeswaxConnection. Thus,
    BeeswaxConnection will not fetch exec summary by default all the time.
    
    Tests that validate exec summary table is updated to set
    fetch_exec_summary=True and migrated to test against hs2 protocol.
    Change TestExecutorGroup._set_query_options() to do query option setting
    through hs2_client iconfig instead of SET query. Some flake8 issues are
    addressed as well.
    
    Move build_exec_summary_table to separate exec_summary.py file. Tweak it
    a bit to return early if given TExecSummary is empty.
    
    Fixed bug in ImpalaBeeswaxClient.fetch_results() where fetch will not
    happen at all if discard_result argument is True.
    
    Testing:
    - Run and pass affected tests locally.
    
    Change-Id: I7d88f78e58eeda29ce21e7828884c7a129d7efe6
    Reviewed-on: http://gerrit.cloudera.org:8080/22626
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 shell/exec_summary.py                        | 176 +++++++++++++++++++++++++++
 shell/impala_client.py                       | 153 +----------------------
 shell/packaging/make_python_package.sh       |   1 +
 tests/beeswax/impala_beeswax.py              |  17 +--
 tests/common/impala_connection.py            |  88 ++++++++++----
 tests/custom_cluster/test_coordinators.py    |   3 +-
 tests/custom_cluster/test_executor_groups.py |  61 ++++++----
 tests/query_test/test_hash_join_timer.py     |  22 ++--
 tests/query_test/test_insert.py              |  11 +-
 tests/query_test/test_observability.py       |  56 +++++----
 10 files changed, 344 insertions(+), 244 deletions(-)

diff --git a/shell/exec_summary.py b/shell/exec_summary.py
new file mode 100755
index 000000000..a7897315c
--- /dev/null
+++ b/shell/exec_summary.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from ExecStats.ttypes import TExecStats
+
+
+def build_exec_summary_table(summary, idx, indent_level, new_indent_level, 
output,
+                             is_prettyprint=True, 
separate_prefix_column=False):
+  """Direct translation of Coordinator::PrintExecSummary() to recursively 
build a list
+  of rows of summary statistics, one per exec node
+
+  summary: the TExecSummary object that contains all the summary data
+
+  idx: the index of the node to print
+
+  indent_level: the number of spaces to print before writing the node's label, 
to give
+  the appearance of a tree. The 0th child of a node has the same indent_level 
as its
+  parent. All other children have an indent_level of one greater than their 
parent.
+
+  new_indent_level: If true, this indent level is different from the previous 
row's.
+
+  output: the list of rows into which to append the rows produced for this 
node and its
+  children.
+
+  is_prettyprint: Optional. If True, print time, units, and bytes columns in 
pretty
+  printed format.
+
+  separate_prefix_column: Optional. If True, the prefix and operator name will 
be
+  returned as separate column. Otherwise, prefix and operater name will be 
concatenated
+  into single column.
+
+  Returns the index of the next exec node in summary.exec_nodes that should be
+  processed, used internally to this method only.
+  """
+  if not summary.nodes:
+    # Summary nodes is empty or None. Nothing to build.
+    return
+  assert idx < len(summary.nodes), (
+      "Index ({0}) must be less than exec summary count ({1})").format(
+          idx, len(summary.nodes))
+
+  attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
+
+  # Initialise aggregate and maximum stats
+  agg_stats, max_stats = TExecStats(), TExecStats()
+  for attr in attrs:
+    setattr(agg_stats, attr, 0)
+    setattr(max_stats, attr, 0)
+
+  node = summary.nodes[idx]
+  instances = 0
+  if node.exec_stats:
+    # exec_stats is not None or an empty list.
+    instances = len(node.exec_stats)
+    for stats in node.exec_stats:
+      for attr in attrs:
+        val = getattr(stats, attr)
+        if val is not None:
+          setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
+          setattr(max_stats, attr, max(getattr(max_stats, attr), val))
+    avg_time = agg_stats.latency_ns / instances
+  else:
+    avg_time = 0
+
+  is_sink = node.node_id == -1
+  # If the node is a broadcast-receiving exchange node, the cardinality of 
rows produced
+  # is the max over all instances (which should all have received the same 
number of
+  # rows). Otherwise, the cardinality is the sum over all instances which 
process
+  # disjoint partitions.
+  if is_sink:
+    cardinality = -1
+  elif node.is_broadcast:
+    cardinality = max_stats.cardinality
+  else:
+    cardinality = agg_stats.cardinality
+
+  est_stats = node.estimated_stats
+  label_prefix = ""
+  if indent_level > 0:
+    label_prefix = "|"
+    label_prefix += "  |" * (indent_level - 1)
+    if new_indent_level:
+      label_prefix += "--"
+    else:
+      label_prefix += "  "
+
+  def prettyprint(val, units, divisor):
+    for unit in units:
+      if val < divisor:
+        if unit == units[0]:
+          return "%d%s" % (val, unit)
+        else:
+          return "%3.2f%s" % (val, unit)
+      val /= divisor
+
+  def prettyprint_bytes(byte_val):
+    return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
+
+  def prettyprint_units(unit_val):
+    return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
+
+  def prettyprint_time(time_val):
+    return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
+
+  latency = max_stats.latency_ns
+  cardinality_est = est_stats.cardinality
+  memory_used = max_stats.memory_used
+  memory_est = est_stats.memory_used
+  if (is_prettyprint):
+    avg_time = prettyprint_time(avg_time)
+    latency = prettyprint_time(latency)
+    cardinality = "" if is_sink else prettyprint_units(cardinality)
+    cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
+    memory_used = prettyprint_bytes(memory_used)
+    memory_est = prettyprint_bytes(memory_est)
+
+  row = list()
+  if separate_prefix_column:
+    row.append(label_prefix)
+    row.append(node.label)
+  else:
+    row.append(label_prefix + node.label)
+
+  row.extend([
+    node.num_hosts,
+    instances,
+    avg_time,
+    latency,
+    cardinality,
+    cardinality_est,
+    memory_used,
+    memory_est,
+    node.label_detail])
+
+  output.append(row)
+  try:
+    sender_idx = summary.exch_to_sender_map[idx]
+    # This is an exchange node or a join node with a separate builder, so the 
source
+    # is a fragment root, and should be printed next.
+    sender_indent_level = indent_level + node.num_children
+    sender_new_indent_level = node.num_children > 0
+    build_exec_summary_table(summary, sender_idx, sender_indent_level,
+                             sender_new_indent_level, output, is_prettyprint,
+                             separate_prefix_column)
+  except (KeyError, TypeError):
+    # Fall through if idx not in map, or if exch_to_sender_map itself is not 
set
+    pass
+
+  idx += 1
+  if node.num_children > 0:
+    first_child_output = []
+    idx = build_exec_summary_table(summary, idx, indent_level, False, 
first_child_output,
+                                   is_prettyprint, separate_prefix_column)
+    for _ in range(1, node.num_children):
+      # All other children are indented
+      idx = build_exec_summary_table(summary, idx, indent_level + 1, True, 
output,
+                                     is_prettyprint, separate_prefix_column)
+    output += first_child_output
+  return idx
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 6ed2f5f87..3fc1891e1 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -35,7 +35,6 @@ import uuid
 
 from beeswaxd import BeeswaxService
 from beeswaxd.BeeswaxService import QueryState
-from ExecStats.ttypes import TExecStats
 from ImpalaService import ImpalaService, ImpalaHiveServer2Service
 from ImpalaService.ImpalaHiveServer2Service import (TGetRuntimeProfileReq,
     TGetExecSummaryReq, TPingImpalaHS2ServiceReq, TCloseImpalaOperationReq)
@@ -46,6 +45,7 @@ from TCLIService.TCLIService import (TExecuteStatementReq, 
TOpenSessionReq,
     TOperationState, TFetchResultsReq, TFetchOrientation, TGetLogReq,
     TGetResultSetMetadataReq, TTypeId, TCancelOperationReq, TCloseOperationReq)
 from ImpalaHttpClient import ImpalaHttpClient
+from exec_summary import build_exec_summary_table
 from kerberos_util import get_kerb_host_from_kerberos_host_fqdn
 from thrift.protocol import TBinaryProtocol
 from thrift_sasl import TSaslClientTransport
@@ -110,157 +110,6 @@ RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
 RPC_EXCEPTION_SERVER = "SERVER_ERROR"
 
 
-def build_exec_summary_table(summary, idx, indent_level, new_indent_level, 
output,
-                             is_prettyprint=True, 
separate_prefix_column=False):
-  """Direct translation of Coordinator::PrintExecSummary() to recursively 
build a list
-  of rows of summary statistics, one per exec node
-
-  summary: the TExecSummary object that contains all the summary data
-
-  idx: the index of the node to print
-
-  indent_level: the number of spaces to print before writing the node's label, 
to give
-  the appearance of a tree. The 0th child of a node has the same indent_level 
as its
-  parent. All other children have an indent_level of one greater than their 
parent.
-
-  new_indent_level: If true, this indent level is different from the previous 
row's.
-
-  output: the list of rows into which to append the rows produced for this 
node and its
-  children.
-
-  is_prettyprint: Optional. If True, print time, units, and bytes columns in 
pretty
-  printed format.
-
-  separate_prefix_column: Optional. If True, the prefix and operator name will 
be
-  returned as separate column. Otherwise, prefix and operater name will be 
concatenated
-  into single column.
-
-  Returns the index of the next exec node in summary.exec_nodes that should be
-  processed, used internally to this method only.
-  """
-  attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
-
-  # Initialise aggregate and maximum stats
-  agg_stats, max_stats = TExecStats(), TExecStats()
-  for attr in attrs:
-    setattr(agg_stats, attr, 0)
-    setattr(max_stats, attr, 0)
-
-  node = summary.nodes[idx]
-  if node.exec_stats is not None:
-    for stats in node.exec_stats:
-      for attr in attrs:
-        val = getattr(stats, attr)
-        if val is not None:
-          setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
-          setattr(max_stats, attr, max(getattr(max_stats, attr), val))
-
-  if node.exec_stats is not None and node.exec_stats:
-    avg_time = agg_stats.latency_ns / len(node.exec_stats)
-  else:
-    avg_time = 0
-
-  is_sink = node.node_id == -1
-  # If the node is a broadcast-receiving exchange node, the cardinality of 
rows produced
-  # is the max over all instances (which should all have received the same 
number of
-  # rows). Otherwise, the cardinality is the sum over all instances which 
process
-  # disjoint partitions.
-  if is_sink:
-    cardinality = -1
-  elif node.is_broadcast:
-    cardinality = max_stats.cardinality
-  else:
-    cardinality = agg_stats.cardinality
-
-  est_stats = node.estimated_stats
-  label_prefix = ""
-  if indent_level > 0:
-    label_prefix = "|"
-    label_prefix += "  |" * (indent_level - 1)
-    if new_indent_level:
-      label_prefix += "--"
-    else:
-      label_prefix += "  "
-
-  def prettyprint(val, units, divisor):
-    for unit in units:
-      if val < divisor:
-        if unit == units[0]:
-          return "%d%s" % (val, unit)
-        else:
-          return "%3.2f%s" % (val, unit)
-      val /= divisor
-
-  def prettyprint_bytes(byte_val):
-    return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
-
-  def prettyprint_units(unit_val):
-    return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
-
-  def prettyprint_time(time_val):
-    return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
-
-  instances = 0
-  if node.exec_stats is not None:
-    instances = len(node.exec_stats)
-  latency = max_stats.latency_ns
-  cardinality_est = est_stats.cardinality
-  memory_used = max_stats.memory_used
-  memory_est = est_stats.memory_used
-  if (is_prettyprint):
-    avg_time = prettyprint_time(avg_time)
-    latency = prettyprint_time(latency)
-    cardinality = "" if is_sink else prettyprint_units(cardinality)
-    cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
-    memory_used = prettyprint_bytes(memory_used)
-    memory_est = prettyprint_bytes(memory_est)
-
-  row = list()
-  if separate_prefix_column:
-    row.append(label_prefix)
-    row.append(node.label)
-  else:
-    row.append(label_prefix + node.label)
-
-  row.extend([
-    node.num_hosts,
-    instances,
-    avg_time,
-    latency,
-    cardinality,
-    cardinality_est,
-    memory_used,
-    memory_est,
-    node.label_detail])
-
-  output.append(row)
-  try:
-    sender_idx = summary.exch_to_sender_map[idx]
-    # This is an exchange node or a join node with a separate builder, so the 
source
-    # is a fragment root, and should be printed next.
-    sender_indent_level = indent_level + node.num_children
-    sender_new_indent_level = node.num_children > 0
-    build_exec_summary_table(summary, sender_idx, sender_indent_level,
-                             sender_new_indent_level, output, is_prettyprint,
-                             separate_prefix_column)
-  except (KeyError, TypeError):
-    # Fall through if idx not in map, or if exch_to_sender_map itself is not 
set
-    pass
-
-  idx += 1
-  if node.num_children > 0:
-    first_child_output = []
-    idx = build_exec_summary_table(summary, idx, indent_level, False, 
first_child_output,
-                                   is_prettyprint, separate_prefix_column)
-    for child_idx in xrange(1, node.num_children):
-      # All other children are indented (we only have 0, 1 or 2 children for 
every exec
-      # node at the moment)
-      idx = build_exec_summary_table(summary, idx, indent_level + 1, True, 
output,
-                                     is_prettyprint, separate_prefix_column)
-    output += first_child_output
-  return idx
-
-
 class QueryOptionLevels:
   """These are the levels used when displaying query options.
   The values correspond to the ones in TQueryOptionLevel"""
diff --git a/shell/packaging/make_python_package.sh 
b/shell/packaging/make_python_package.sh
index 5ec12047a..a1fc5479b 100755
--- a/shell/packaging/make_python_package.sh
+++ b/shell/packaging/make_python_package.sh
@@ -63,6 +63,7 @@ assemble_package_files() {
   cp "${SHELL_HOME}/kerberos_util.py" "${MODULE_LIB_DIR}"
   cp "${SHELL_HOME}/value_converter.py" "${MODULE_LIB_DIR}"
   cp "${SHELL_HOME}/thrift_printer.py" "${MODULE_LIB_DIR}"
+  cp "${SHELL_HOME}/exec_summary.py" "${MODULE_LIB_DIR}"
 
   cp "${SHELL_HOME}/packaging/README.md" "${PACKAGE_DIR}"
   cp "${SHELL_HOME}/packaging/MANIFEST.in" "${PACKAGE_DIR}"
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index fb74146f2..cd1f29bd5 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -184,7 +184,8 @@ class ImpalaBeeswaxClient(object):
                             service='impala', transport_type=trans_type, 
user=self.user,
                             password=self.password, use_ssl=self.use_ssl)
 
-  def execute(self, query_string, user=None, fetch_profile_after_close=False):
+  def execute(self, query_string, user=None, fetch_profile_after_close=False,
+              fetch_exec_summary=False):
     """Re-directs the query to its appropriate handler, returns 
ImpalaBeeswaxResult"""
     # Take care of leading/trailing whitespaces.
     query_string = query_string.strip()
@@ -200,7 +201,8 @@ class ImpalaBeeswaxClient(object):
         # profile and log will be available so fetch them first.
         runtime_profile = self.get_runtime_profile(handle)
 
-      exec_summary = self.get_exec_summary_and_parse(handle)
+      exec_summary = (self.get_exec_summary_and_parse(handle) if 
fetch_exec_summary
+                      else None)
       log = self.get_log(handle.log_context)
 
       result = self.fetch_results(query_string, handle)
@@ -218,7 +220,8 @@ class ImpalaBeeswaxClient(object):
       result = self.fetch_results(query_string, handle)
       result.time_taken = time.time() - start
       result.start_time = start_time
-      result.exec_summary = self.get_exec_summary_and_parse(handle)
+      result.exec_summary = (self.get_exec_summary_and_parse(handle) if 
fetch_exec_summary
+                             else None)
       result.log = self.get_log(handle.log_context)
 
       if not fetch_profile_after_close:
@@ -254,7 +257,7 @@ class ImpalaBeeswaxClient(object):
     return output
 
   def __build_summary_table(self, summary, output):
-    from shell.impala_client import build_exec_summary_table
+    from shell.exec_summary import build_exec_summary_table
     result = list()
     build_exec_summary_table(summary, 0, 0, False, result, 
is_prettyprint=False,
                              separate_prefix_column=True)
@@ -385,14 +388,14 @@ class ImpalaBeeswaxClient(object):
                                  success=True, data=[])
 
     # Result fetching for insert is different from other queries.
-    exec_result = None
-    if discard_results:
-      return exec_result
     if query_type == 'insert':
       exec_result = self.__fetch_insert_results(query_handle)
     else:
       exec_result = self.__fetch_results(query_handle, max_rows)
     exec_result.query = query_string
+    # Check for discard_results arg only after all rows has been fetched.
+    if discard_results:
+      return None
     return exec_result
 
   def __fetch_results(self, handle, max_rows=-1):
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index caafe7d42..18e882c32 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -111,6 +111,21 @@ def format_sql_for_logging(sql_stmt):
             u"\n-- [...]\n").format(len(sql_stmt), truncated_sql)
 
 
+def build_summary_table_from_thrift(thrift_exec_summary):
+  from shell.exec_summary import build_exec_summary_table
+  result = list()
+  build_exec_summary_table(thrift_exec_summary, 0, 0, False, result,
+                           is_prettyprint=False, separate_prefix_column=True)
+  keys = ['prefix', 'operator', 'num_hosts', 'num_instances', 'avg_time', 
'max_time',
+          'num_rows', 'est_num_rows', 'peak_mem', 'est_peak_mem', 'detail']
+  output = list()
+  for row in result:
+    assert len(keys) == len(row)
+    summ_map = dict(zip(keys, row))
+    output.append(summ_map)
+  return output
+
+
 def collect_default_query_options(options, name, val, kind):
   if kind == 'REMOVED':
     return
@@ -274,7 +289,9 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
     """Cancels an in-flight operation"""
     pass
 
-  def execute(self, sql_stmt):
+  def execute(self, sql_stmt, user=None, fetch_profile_after_close=False,  # 
noqa: U100
+              fetch_exec_summary=False,  # noqa: U100
+              profile_format=TRuntimeProfileFormat.STRING):  # noqa: U100
     """Executes a query and fetches the results"""
     pass
 
@@ -288,6 +305,7 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, object)):
   @abc.abstractmethod
   def fetch(self, sql_stmt, operation_handle, max_rows=-1, 
discard_results=False):
     """Fetches query results up to max_rows given a handle and sql statement.
+    Caller must ensure that query has passed PENDING state before calling 
fetch.
     If max_rows < 0, all rows are fetched. If max_rows > 0 but the number of
     rows returned is less than max_rows, all the rows have been fetched.
     Return None if discard_results is True.
@@ -372,6 +390,17 @@ class ImpalaConnection(with_metaclass(abc.ABCMeta, 
object)):
     profile"""
     pass
 
+  @abc.abstractmethod
+  def get_exec_summary(self, operation_handle):  # noqa: U100
+    pass
+
+  def get_exec_summary_table(self, operation_handle):
+    summary_table = list()
+    summary = self.get_exec_summary(operation_handle)
+    if summary:
+        summary_table = build_summary_table_from_thrift(summary)
+    return summary_table
+
 
 # Represents a connection to Impala using the Beeswax API.
 class BeeswaxConnection(ImpalaConnection):
@@ -455,11 +484,15 @@ class BeeswaxConnection(ImpalaConnection):
     self.log_handle(operation_handle, 'closing DML query')
     self.__beeswax_client.close_dml(operation_handle.get_handle())
 
-  def execute(self, sql_stmt, user=None, fetch_profile_after_close=False):
+  def execute(self, sql_stmt, user=None, fetch_profile_after_close=False,
+              fetch_exec_summary=False, 
profile_format=TRuntimeProfileFormat.STRING):
+    assert profile_format == TRuntimeProfileFormat.STRING, (
+      "Beeswax client only supports getting runtime profile in STRING format.")
     self.log_client(u"executing against {0}\n{1}".format(
       self.__host_port, format_sql_for_logging(sql_stmt)))
     return self.__beeswax_client.execute(sql_stmt, user=user,
-        fetch_profile_after_close=fetch_profile_after_close)
+        fetch_profile_after_close=fetch_profile_after_close,
+        fetch_exec_summary=fetch_exec_summary)
 
   def execute_async(self, sql_stmt, user=None):
     self.log_client(u"executing async {0}\n{1}".format(
@@ -485,7 +518,7 @@ class BeeswaxConnection(ImpalaConnection):
   def get_runtime_profile(self, operation_handle,
                           profile_format=TRuntimeProfileFormat.STRING):
     assert profile_format == TRuntimeProfileFormat.STRING, (
-      "Beeswax client only support getting runtime profile in STRING format.")
+      "Beeswax client only supports getting runtime profile in STRING format.")
     self.log_handle(operation_handle, 'getting runtime profile operation')
     return 
self.__beeswax_client.get_runtime_profile(operation_handle.get_handle())
 
@@ -690,8 +723,8 @@ class ImpylaHS2Connection(ImpalaConnection):
          format_sql_for_logging(sql_stmt))
     )
 
-  def execute(self, sql_stmt, user=None, 
profile_format=TRuntimeProfileFormat.STRING,
-      fetch_profile_after_close=False):
+  def execute(self, sql_stmt, user=None, fetch_profile_after_close=False,
+              fetch_exec_summary=False, 
profile_format=TRuntimeProfileFormat.STRING):
     cursor = self.__cursor
     result = None
     try:
@@ -704,8 +737,8 @@ class ImpylaHS2Connection(ImpalaConnection):
       self.log_handle(handle, "started query in session {0}".format(
         self.__get_session_id(cursor)))
       result = self.__fetch_results_and_profile(
-        handle, profile_format=profile_format,
-        fetch_profile_after_close=fetch_profile_after_close)
+        handle, fetch_profile_after_close=fetch_profile_after_close,
+        fetch_exec_summary=fetch_exec_summary, profile_format=profile_format)
     finally:
       cursor.close_operation()
       if cursor != self.__cursor:
@@ -713,11 +746,12 @@ class ImpylaHS2Connection(ImpalaConnection):
     return result
 
   def __fetch_results_and_profile(
-      self, operation_handle, profile_format=TRuntimeProfileFormat.STRING,
-      fetch_profile_after_close=False):
+      self, operation_handle, fetch_profile_after_close=False,
+      fetch_exec_summary=False, profile_format=TRuntimeProfileFormat.STRING):
     r = None
     try:
-      r = self.__fetch_results(operation_handle, profile_format=profile_format)
+      r = self.__fetch_results(operation_handle, 
fetch_exec_summary=fetch_exec_summary,
+                               profile_format=profile_format)
     finally:
       if r is None:
         # Try to close the query handle but ignore any exceptions not to 
replace the
@@ -860,6 +894,7 @@ class ImpylaHS2Connection(ImpalaConnection):
 
   def __fetch_results(self, handle, max_rows=-1,
                       discard_results=False,
+                      fetch_exec_summary=False,
                       profile_format=TRuntimeProfileFormat.STRING):
     """Implementation of result fetching from handle."""
     cursor = handle.get_handle()
@@ -877,20 +912,25 @@ class ImpylaHS2Connection(ImpalaConnection):
       else:
         result_tuples = cursor.fetchmany(max_rows)
 
-    if not self._is_hive and self._collect_profile_and_log:
-      log = self.get_log(handle)
-      profile = self.get_runtime_profile(handle, profile_format=profile_format)
-    else:
-      log = None
-      profile = None
     result = None
-
     if discard_results:
       return result
+
+    log = None
+    profile = None
+    exec_summary = None
+    if not self._is_hive:
+      if fetch_exec_summary:
+        exec_summary = self.get_exec_summary_table(handle)
+      if self._collect_profile_and_log:
+        log = self.get_log(handle)
+        profile = self.get_runtime_profile(handle, 
profile_format=profile_format)
+
     result = ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
                                 column_labels=column_labels, 
column_types=column_types,
                                 query=handle.sql_stmt(), log=log, 
profile=profile,
-                                query_id=self.get_query_id(handle))
+                                query_id=self.get_query_id(handle),
+                                exec_summary=exec_summary)
     return result
 
 
@@ -898,7 +938,7 @@ class ImpylaHS2ResultSet(object):
   """This emulates the interface of ImpalaBeeswaxResult so that it can be used 
in
   place of it. TODO: when we deprecate/remove Beeswax, clean this up."""
   def __init__(self, success, result_tuples, column_labels, column_types, 
query, log,
-      profile, query_id):
+      profile, query_id, exec_summary):
     self.success = success
     self.column_labels = column_labels
     self.column_types = column_types
@@ -913,6 +953,7 @@ class ImpylaHS2ResultSet(object):
     self.data = None
     if result_tuples is not None:
       self.data = [self.__convert_result_row(tuple) for tuple in result_tuples]
+    self.exec_summary = exec_summary
 
   def tuples(self):
     """Return the raw HS2 result set, which is a list of tuples."""
@@ -995,7 +1036,9 @@ class MinimalHS2Connection(ImpalaConnection):
     finally:
       self.__conn.close()
 
-  def execute(self, sql_stmt):  # noqa: U100
+  def execute(self, sql_stmt, user=None, fetch_profile_after_close=False,  # 
noqa: U100
+              fetch_exec_summary=False,  # noqa: U100
+              profile_format=TRuntimeProfileFormat.STRING):  # noqa: U100
     raise NotImplementedError()
 
   def execute_async(self, sql_stmt):
@@ -1085,3 +1128,6 @@ class MinimalHS2Connection(ImpalaConnection):
 
   def wait_for_admission_control(self, operation_handle, timeout_s=60):  # 
noqa: U100
     raise NotImplementedError()
+
+  def get_exec_summary(self, operation_handle):  # noqa: U100
+    raise NotImplementedError()
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index f9b882919..1f3c4b053 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -110,7 +110,8 @@ class TestCoordinators(CustomClusterTestSuite):
         client = coordinator.service.create_beeswax_client()
         assert client is not None
         query = "select count(*) from functional.alltypesagg"
-        result = self.execute_query_expect_success(client, query)
+        result = client.execute(query, fetch_exec_summary=True)
+        assert result.success
         # Verify that SCAN and AGG are executed on the expected number of
         # executor nodes
         for rows in result.exec_summary:
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index af08f4890..48df14ad4 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -987,18 +987,17 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                          exec_group_set_prefix="root.large") 
== 1
 
   def _set_query_options(self, query_options):
-    """Set query options by running it as an SQL statement.
-    To mimic impala-shell behavior, use self.client.set_configuration() 
instead.
-    """
+    """Set query options by setting client configuration."""
     for k, v in query_options.items():
-      self.execute_query_expect_success(self.client, "SET {}='{}'".format(k, 
v))
+      self.hs2_client.set_configuration_option(k, v)
 
-  def _run_query_and_verify_profile(self, query,
-      expected_strings_in_profile, not_expected_in_profile=[]):
+  def _run_query_and_verify_profile(self, query, expected_strings_in_profile,
+                                    not_expected_in_profile=[],
+                                    fetch_exec_summary=False):
     """Run 'query' and assert existence of 'expected_strings_in_profile' and
     nonexistence of 'not_expected_in_profile' in query profile.
-    Caller is reponsible to close self.client at the end of test."""
-    result = self.execute_query_expect_success(self.client, query)
+    Caller is reponsible to close self.hs2_client at the end of test."""
+    result = self.hs2_client.execute(query, 
fetch_exec_summary=fetch_exec_summary)
     profile = str(result.runtime_profile)
     for expected_profile in expected_strings_in_profile:
       assert expected_profile in profile, (
@@ -1050,7 +1049,6 @@ class TestExecutorGroups(CustomClusterTestSuite):
   def test_query_cpu_count_divisor_default(self, unique_database):
     coordinator_test_args = "-gen_experimental_profile=true"
     self._setup_three_exec_group_cluster(coordinator_test_args)
-    self.client.clear_configuration()
 
     # The default query options for this test.
     # Some test case will change these options along the test, but should 
eventually
@@ -1091,7 +1089,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     # Create small table based on tpcds_parquet.store_sales that will be used 
later
     # for COMPUTE STATS test. Forcing large parallelism to speed up CTAS.
     # Otherwise, query will go to tiny pool.
-    self.client.set_configuration({
+    self._set_query_options({
       'REQUEST_POOL': 'root.large',
       'PROCESSING_COST_MIN_THREADS': '4'})
     self._run_query_and_verify_profile(
@@ -1102,7 +1100,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ["Executor Group: root.large", "ExecutorGroupsConsidered: 1",
        "Verdict: query option REQUEST_POOL=root.large is set. "
        "Memory and cpu limit checking is skipped."])
-    self.client.set_configuration({
+    self._set_query_options({
       'REQUEST_POOL': '',
       'PROCESSING_COST_MIN_THREADS': ''})
 
@@ -1128,14 +1126,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
 
     # Test that child queries follow REQUEST_POOL that is set through client
     # configuration. Two child queries should all run in root.small.
-    self.client.set_configuration({'REQUEST_POOL': 'root.small'})
+    self._set_query_options({'REQUEST_POOL': 'root.small'})
     self._run_query_and_verify_profile(compute_stats_query,
         ["Query Options (set by configuration): REQUEST_POOL=root.small",
          "ExecutorGroupsConsidered: 1",
          "Verdict: Assign to first group because query is not auto-scalable"],
         ["EffectiveParallelism:", "CpuAsk:", "AvgAdmissionSlotsPerExecutor:"])
     self._verify_total_admitted_queries("root.small", 2)
-    self.client.clear_configuration()
 
     # Test that child queries follow REQUEST_POOL that is set through SQL 
statement.
     # Two child queries should all run in root.large.
@@ -1239,7 +1236,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
          "ExecutorGroupsConsidered: 1", "AvgAdmissionSlotsPerExecutor: 2",
          "CpuAsk: 2", "CpuAskBounded: 2"])
     self._set_query_options({'MAX_FRAGMENT_INSTANCES_PER_NODE': '1'})
-    result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
+    result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY)
     status = (r"PROCESSING_COST_MIN_THREADS \(2\) can not be larger than "
               r"MAX_FRAGMENT_INSTANCES_PER_NODE \(1\).")
     assert re.search(status, str(result))
@@ -1387,7 +1384,6 @@ class TestExecutorGroups(CustomClusterTestSuite):
   def test_query_cpu_count_on_insert(self, unique_database):
     coordinator_test_args = "-gen_experimental_profile=true"
     self._setup_three_exec_group_cluster(coordinator_test_args)
-    self.client.clear_configuration()
 
     # The default query options for this test.
     # Some test case will change these options along the test, but should 
eventually
@@ -1404,7 +1400,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes"
        ).format(unique_database, "test_ctas1"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
+       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 1, [0, 1])
 
     # Test unpartitioned insert, small scan, no MAX_FS_WRITER, with limit.
@@ -1414,7 +1411,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes limit 100000"
        ).format(unique_database, "test_ctas2"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"])
+       "Verdict: Match", "CpuAsk: 2", "AvgAdmissionSlotsPerExecutor: 2"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 1, [0, 2])
 
     # Test partitioned insert, small scan, no MAX_FS_WRITER.
@@ -1424,7 +1422,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "select id, year from functional_parquet.alltypes"
        ).format(unique_database, "test_ctas3"),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"])
+       "Verdict: Match", "CpuAsk: 1", "AvgAdmissionSlotsPerExecutor: 1"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 1, [0, 1])
 
     store_sales_no_part_col = (
@@ -1445,7 +1444,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("create table {0}.{1} as {2}").format(
          unique_database, "test_ctas4", big_select),
       ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"])
+       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 2, [0, 1, 1])
 
     # Test partitioned insert, large scan, no MAX_FS_WRITER.
@@ -1453,7 +1453,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
          unique_database, "test_ctas5", big_select),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"])
+       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
 
     # Test partitioned insert, large scan, high MAX_FS_WRITER.
@@ -1462,7 +1463,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
          unique_database, "test_ctas6", big_select),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"])
+       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 9"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 3, [0, 3, 3, 3])
 
     # Test partitioned insert, large scan, low MAX_FS_WRITER.
@@ -1471,7 +1473,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("create table {0}.{1} partitioned by (ss_store_sk) as {2}").format(
          unique_database, "test_ctas7", big_select),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"])
+       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 8"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 2, [0, 2, 3, 3])
 
     # Test unpartitioned insert overwrite. MAX_FS_WRITER=2.
@@ -1479,7 +1482,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("insert overwrite {0}.{1} {2}").format(
          unique_database, "test_ctas4", big_select),
       ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
-       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"])
+       "Verdict: Match", "CpuAsk: 9", "CpuAskBounded: 2"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 2, [0, 1, 1])
 
     # Test partitioned insert overwrite. MAX_FS_WRITER=2.
@@ -1487,7 +1491,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("insert overwrite {0}.{1} ({2}) partition (ss_store_sk) {3}").format(
         unique_database, "test_ctas7", store_sales_no_part_col, big_select),
       ["Executor Group: root.large-group", "ExecutorGroupsConsidered: 3",
-       "Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"])
+       "Verdict: Match", "CpuAsk: 8", "CpuAskBounded: 8"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 2, [0, 2, 3, 3])
 
     # Test unpartitioned insert overwrite. MAX_FS_WRITER=1.
@@ -1496,7 +1501,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
       ("insert overwrite {0}.{1} {2}").format(
          unique_database, "test_ctas4", big_select),
       ["Executor Group: root.small-group", "ExecutorGroupsConsidered: 2",
-       "Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"])
+       "Verdict: Match", "CpuAsk: 7", "CpuAskBounded: 7"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 1, [0, 3, 4])
 
     # Unset MAX_FS_WRITERS.
@@ -1510,7 +1516,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
        "where ss_store_sk=1").format(
          unique_database, "test_ctas7", store_sales_no_part_col, 
store_sales_columns),
       ["Executor Group: root.tiny-group", "ExecutorGroupsConsidered: 1",
-       "Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "|  partitions=6"])
+       "Verdict: Match", "CpuAsk: 1", "CpuAskBounded: 1", "|  partitions=6"],
+      fetch_exec_summary=True)
     self.__verify_fs_writers(result, 1, [0, 1])
     # END testing insert + MAX_FS_WRITER
 
@@ -1628,7 +1635,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self._set_query_options({
       'COMPUTE_PROCESSING_COST': 'true',
       'SLOT_COUNT_STRATEGY': 'PLANNER_CPU_ASK'})
-    result = self.execute_query_expect_failure(self.client, CPU_TEST_QUERY)
+    result = self.execute_query_expect_failure(self.hs2_client, CPU_TEST_QUERY)
     assert ("AnalysisException: The query does not fit largest executor group 
sets. "
         "Reason: not enough cpu cores (require=300, max=192).") in str(result)
 
diff --git a/tests/query_test/test_hash_join_timer.py 
b/tests/query_test/test_hash_join_timer.py
index 5a340d0a9..7deddd5e4 100644
--- a/tests/query_test/test_hash_join_timer.py
+++ b/tests/query_test/test_hash_join_timer.py
@@ -21,6 +21,7 @@ import re
 
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import hs2_client_protocol_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.parse_util import parse_duration_string_ms
 from tests.verifiers.metric_verifier import MetricVerifier
@@ -45,7 +46,9 @@ class TestHashJoinTimer(ImpalaTestSuite):
   # Fully hint the queries so that the plan will not change.
 
   # Each test case contain a query, the join type.
-  TEST_CASES = [["select /*+straight_join*/ count(*) from"
+  TEST_CASES = \
+            [
+             ["select /*+straight_join*/ count(*) from"
               " (select distinct * from functional.alltypes where int_col >= 
sleep(5)) a"
               " join /* +SHUFFLE */ functional.alltypes b on (a.id=b.id)",
               "HASH JOIN"],
@@ -64,7 +67,7 @@ class TestHashJoinTimer(ImpalaTestSuite):
               " (select distinct * from functional.alltypes where int_col >= 
sleep(5)) b"
               " where a.id>b.id and a.id=99",
               "NESTED LOOP JOIN"]
-             ]
+            ]
   # IMPALA-2973: For non-code-coverage builds, 1000 milliseconds are 
sufficient, but more
   # time is needed in code-coverage builds.
   HASH_JOIN_UPPER_BOUND_MS = 2000
@@ -83,15 +86,16 @@ class TestHashJoinTimer(ImpalaTestSuite):
     super(TestHashJoinTimer, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('test cases', *cls.TEST_CASES))
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(lambda v: 
cls.__is_valid_test_vector(v))
 
   @classmethod
   def __is_valid_test_vector(cls, vector):
-    return vector.get_value('table_format').file_format == 'text' and \
-        vector.get_value('table_format').compression_codec == 'none' and \
-        vector.get_value('exec_option')['batch_size'] == 0 and \
-        vector.get_value('exec_option')['disable_codegen'] == False and \
-        vector.get_value('exec_option')['num_nodes'] == 0
+    return (vector.get_value('table_format').file_format == 'text'
+            and vector.get_value('table_format').compression_codec == 'none'
+            and vector.get_value('exec_option')['batch_size'] == 0
+            and not vector.get_value('exec_option')['disable_codegen']
+            and vector.get_value('exec_option')['num_nodes'] == 0)
 
   @pytest.mark.execute_serially
   def test_hash_join_timer(self, vector):
@@ -109,7 +113,9 @@ class TestHashJoinTimer(ImpalaTestSuite):
       verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0)
 
     # Execute the query. The query summary and profile are stored in 'result'.
-    result = self.execute_query(query, vector.get_value('exec_option'))
+    result = None
+    with self.create_impala_client_from_vector(vector) as client:
+      result = client.execute(query, fetch_exec_summary=True)
 
     # Parse the query summary; The join node is "id=3".
     # In the ExecSummary, search for the join operator's summary and verify the
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index db24c0b53..4db607d8d 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -37,6 +37,7 @@ from tests.common.test_dimensions import (
 from tests.common.test_result_verifier import (
     QueryTestResult,
     parse_result_rows)
+from tests.common.test_vector import HS2
 from tests.verifiers.metric_verifier import MetricVerifier
 
 
@@ -237,8 +238,8 @@ class TestInsertWideTable(ImpalaTestSuite):
 
     # Don't run on core. This test is very slow (IMPALA-864) and we are 
unlikely to
     # regress here.
-    if cls.exploration_strategy() == 'core':
-      cls.ImpalaTestMatrix.add_constraint(lambda v: False)
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip("Test only run in exhaustive exploration.")
 
   @SkipIfLocal.parquet_file_size
   def test_insert_wide_table(self, vector, unique_database):
@@ -437,6 +438,10 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite):
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestInsertHdfsWriterLimit, cls).add_test_dimensions()
@@ -544,7 +549,7 @@ class TestInsertHdfsWriterLimit(ImpalaTestSuite):
     # and running.
     
self.impalad_test_service.wait_for_metric_value("cluster-membership.backends.total",
                                                     3)
-    result = self.client.execute(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     assert 'HDFS WRITER' in result.exec_summary[0]['operator'], 
result.runtime_profile
     if (max_fs_writers > 0):
       num_writers = int(result.exec_summary[0]['num_instances'])
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index 7a4ea2561..34b2d1505 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -22,7 +22,8 @@ from tests.beeswax.impala_beeswax import 
ImpalaBeeswaxException
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfLocal, SkipIfNotHdfsMinicluster
-from tests.util.filesystem_utils import IS_EC, WAREHOUSE
+from tests.common.test_vector import HS2
+from tests.util.filesystem_utils import WAREHOUSE
 from tests.util.parse_util import get_duration_us_from_str
 from time import sleep, time
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
@@ -35,6 +36,10 @@ class TestObservability(ImpalaTestSuite):
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   def test_merge_exchange_num_rows(self):
     """Regression test for IMPALA-1473 - checks that the exec summary for a 
merging
     exchange with a limit reports the number of rows returned as equal to the 
limit,
@@ -42,7 +47,7 @@ class TestObservability(ImpalaTestSuite):
     of rows returned correctly."""
     query = """select tinyint_col, count(*) from functional.alltypes
         group by tinyint_col order by tinyint_col limit 5"""
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     exchange = result.exec_summary[1]
     assert exchange['operator'] == '05:MERGING-EXCHANGE'
     assert exchange['num_rows'] == 5
@@ -61,7 +66,7 @@ class TestObservability(ImpalaTestSuite):
     query = """select distinct a.int_col, a.string_col from 
functional.alltypes a
         inner join functional.alltypessmall b on (a.id = b.id)
         where a.year = 2009 and b.month = 2"""
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     exchange = result.exec_summary[8]
     assert exchange['operator'] == '04:EXCHANGE'
     assert exchange['num_rows'] == 25
@@ -98,21 +103,21 @@ class TestObservability(ImpalaTestSuite):
     """IMPALA-4499: Checks that the exec summary for scans show the table 
name."""
     # HDFS table
     query = "select count(*) from functional.alltypestiny"
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     scan_idx = len(result.exec_summary) - 1
     assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HDFS'
     assert result.exec_summary[scan_idx]['detail'] == 'functional.alltypestiny'
 
     # KUDU table
     query = "select count(*) from functional_kudu.alltypestiny"
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     scan_idx = len(result.exec_summary) - 1
     assert result.exec_summary[scan_idx]['operator'] == '00:SCAN KUDU'
     assert result.exec_summary[scan_idx]['detail'] == 
'functional_kudu.alltypestiny'
 
     # HBASE table
     query = "select count(*) from functional_hbase.alltypestiny"
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     scan_idx = len(result.exec_summary) - 1
     assert result.exec_summary[scan_idx]['operator'] == '00:SCAN HBASE'
     assert result.exec_summary[scan_idx]['detail'] == 
'functional_hbase.alltypestiny'
@@ -121,7 +126,7 @@ class TestObservability(ImpalaTestSuite):
     """IMPALA-1048: Checks that the exec summary contains sinks."""
     # SELECT query.
     query = "select count(*) from functional.alltypes"
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     # Sanity-check the root sink.
     root_sink = result.exec_summary[0]
     assert root_sink['operator'] == 'F01:ROOT'
@@ -147,7 +152,7 @@ class TestObservability(ImpalaTestSuite):
     # INSERT query.
     query = "create table {0}.tmp as select count(*) from 
functional.alltypes".format(
         unique_database)
-    result = self.execute_query(query)
+    result = self.client.execute(query, fetch_exec_summary=True)
     # Sanity-check the HDFS writer sink.
     assert result.exec_summary[0]['operator'] == 'F01:HDFS WRITER'
     assert result.exec_summary[0]['max_time'] >= 0
@@ -633,7 +638,7 @@ class TestObservability(ImpalaTestSuite):
         "create table %s as select * from functional.alltypestiny" % 
table_name)
     results = self.execute_query("compute stats %s" % table_name)
     # Search for all query ids (max length 33) in the profile.
-    matches = re.findall("Query \(id=.{,33}\)", results.runtime_profile)
+    matches = re.findall(r"Query \(id=.{,33}\)", results.runtime_profile)
     query_ids = []
     for query_id in matches:
       if query_id not in query_ids:
@@ -673,7 +678,7 @@ class TestObservability(ImpalaTestSuite):
       for key in keys:
         if key in line:
           # Match byte count within parentheses
-          m = re.search("\(([0-9]+)\)", line)
+          m = re.search(r"\(([0-9]+)\)", line)
 
           # If a match was not found, then the value of the key should be 0
           if not m:
@@ -686,8 +691,8 @@ class TestObservability(ImpalaTestSuite):
     # All counters have values
     assert all(counters[key] > 0 for key in keys)
 
-    assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"] +
-                                          counters["TotalInnerBytesSent"])
+    assert counters["TotalBytesSent"] == (counters["TotalScanBytesSent"]
+                                          + counters["TotalInnerBytesSent"])
 
   def test_query_profile_contains_host_resource_usage(self):
     """Tests that the profile contains a sub-profile with per node resource 
usage."""
@@ -914,12 +919,12 @@ class TestObservability(ImpalaTestSuite):
     assert "Travel:" in runtime_profile
     assert "HashCollisions:" in runtime_profile
     assert "Resizes:" in runtime_profile
-    nprobes = re.search('Probes:.*\((\d+)\)', runtime_profile)
+    nprobes = re.search(r'Probes:.*\((\d+)\)', runtime_profile)
     # Probes and travel can be 0. The number can be an integer or float with K.
     # The number extracted is the number inside parenthesis, which is always
     # an integer.
     assert nprobes and len(nprobes.groups()) == 1 and int(nprobes.group(1)) >= 0
-    ntravel = re.search('Travel:.*\((\d+)\)', runtime_profile)
+    ntravel = re.search(r'Travel:.*\((\d+)\)', runtime_profile)
     assert ntravel and len(ntravel.groups()) == 1 and int(ntravel.group(1)) >= 0
 
   def test_query_profle_hashtable(self):
@@ -955,7 +960,7 @@ class TestObservability(ImpalaTestSuite):
     assert results.success
 
     "When the skew summary is seen, look for the details"
-    skews_found = 'skew\(s\) found at:.*HASH_JOIN.*HASH_JOIN.*HDFS_SCAN_NODE'
+    skews_found = r'skew\(s\) found at:.*HASH_JOIN.*HASH_JOIN.*HDFS_SCAN_NODE'
     if len(re.findall(skews_found, results.runtime_profile, re.M)) == 1:
 
       "Expect to see skew details twice at the hash join nodes."
@@ -981,20 +986,21 @@ class TestObservability(ImpalaTestSuite):
   @SkipIfNotHdfsMinicluster.plans
   def test_reduced_cardinality_by_filter(self):
     """IMPALA-12702: Check that ExecSummary shows the reduced cardinality 
estimation."""
-    query_opts = {'compute_processing_cost': True}
     query = """select STRAIGHT_JOIN count(*) from
         (select l_orderkey from tpch_parquet.lineitem) a
         join (select o_orderkey, o_custkey from tpch_parquet.orders) l1
           on a.l_orderkey = l1.o_orderkey
         where l1.o_custkey < 1000"""
-    result = self.execute_query(query, query_opts)
-    scan = result.exec_summary[10]
-    assert '00:SCAN' in scan['operator']
-    assert scan['num_rows'] == 39563
-    assert scan['est_num_rows'] == 575771
-    assert scan['detail'] == 'tpch_parquet.lineitem'
-    runtime_profile = result.runtime_profile
-    assert "cardinality=575.77K(filtered from 6.00M)" in runtime_profile
+    with self.create_impala_client() as client:
+      client.set_configuration_option('compute_processing_cost', 1)
+      result = client.execute(query, fetch_exec_summary=True)
+      scan = result.exec_summary[10]
+      assert '00:SCAN' in scan['operator']
+      assert scan['num_rows'] == 39563
+      assert scan['est_num_rows'] == 575771
+      assert scan['detail'] == 'tpch_parquet.lineitem'
+      runtime_profile = result.runtime_profile
+      assert "cardinality=575.77K(filtered from 6.00M)" in runtime_profile
 
   def test_query_profile_contains_get_inflight_profile_counter(self):
     """Test that counter for getting inflight profiles appears in the 
profile"""
@@ -1080,4 +1086,4 @@ class TestQueryStates(ImpalaTestSuite):
   def __is_line_in_profile(self, line, profile):
     """Returns true if the given 'line' is in the given 'profile'. A single 
line of the
     profile must exactly match the given 'line' (excluding whitespaces)."""
-    return re.search("^\s*{0}\s*$".format(line), profile, re.M)
+    return re.search(r"^\s*{0}\s*$".format(line), profile, re.M)

Reply via email to