This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 00dc79adf6881677e6b7a694de021e29489b51e1
Author: Riza Suminto <[email protected]>
AuthorDate: Fri Mar 7 12:18:04 2025 -0800

    IMPALA-13907: Remove reference to create_beeswax_client
    
    This patch replace create_beeswax_client() reference to
    create_hs2_client() or vector-based client creation to prepare towards
    hs2 test migration.
    
    test_session_expiration_with_queued_query is changed to use impala.dbapi
    directly from Impyla due to limitation in ImpylaHS2Connection.
    
    TestAdmissionControllerRawHS2 is migrated to use hs2 as default test
    protocol.
    
    Modify test_query_expiration.py to set query option through client
    instead of SET query. test_query_expiration is slightly modified due to
    behavior difference in hs2 ImpylaHS2Connection.
    
    Remove remaining reference to BeeswaxConnection.QueryState.
    
    Fixed a bug in ImpylaHS2Connection.wait_for_finished_timeout().
    
    Fix some easy flake8 issues caught thorugh this command:
    git show HEAD --name-only | grep '^tests.*py' \
      | xargs -I {} impala-flake8 {} \
      | grep -e U100 -e E111 -e E301 -e E302 -e E303 -e F...
    
    Testing:
    - Pass exhaustive tests.
    
    Change-Id: I1d84251835d458cc87fb8fedfc20ee15aae18d51
    Reviewed-on: http://gerrit.cloudera.org:8080/22700
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 tests/common/impala_cluster.py                     |   2 +-
 tests/common/impala_connection.py                  |   3 +-
 tests/common/impala_service.py                     |  16 +--
 tests/common/resource_pool_config.py               |   2 +-
 tests/custom_cluster/test_admission_controller.py  |  11 +-
 tests/custom_cluster/test_ai_generate_text.py      |   4 +-
 tests/custom_cluster/test_blacklist.py             |   6 +-
 tests/custom_cluster/test_catalog_hms_failures.py  |  14 +--
 tests/custom_cluster/test_catalog_wait.py          |  11 +-
 tests/custom_cluster/test_catalogd_ha.py           |   4 +-
 tests/custom_cluster/test_client_ssl.py            |   5 +-
 .../custom_cluster/test_compact_catalog_updates.py |   5 +-
 tests/custom_cluster/test_concurrent_ddls.py       |   7 +-
 tests/custom_cluster/test_coordinators.py          |  21 ++--
 tests/custom_cluster/test_exchange_eos.py          |   2 +-
 .../test_frontend_connection_limit.py              |   8 +-
 tests/custom_cluster/test_hdfs_fd_caching.py       |   5 +-
 tests/custom_cluster/test_insert_behaviour.py      |   6 +-
 tests/custom_cluster/test_local_catalog.py         |  27 ++---
 tests/custom_cluster/test_mem_reservations.py      |   5 +-
 tests/custom_cluster/test_process_failures.py      |  16 +--
 tests/custom_cluster/test_query_expiration.py      | 118 +++++++++++----------
 tests/custom_cluster/test_query_retries.py         |  13 +--
 .../test_refresh_invalid_partition.py              |   6 +-
 tests/custom_cluster/test_restart_services.py      |  29 +++--
 tests/custom_cluster/test_s3a_access.py            |   4 +-
 tests/custom_cluster/test_scratch_disk.py          |  34 +++---
 tests/custom_cluster/test_seq_file_filtering.py    |   5 +-
 tests/custom_cluster/test_session_expiration.py    |  78 ++++++++------
 tests/custom_cluster/test_statestored_ha.py        |   8 +-
 tests/custom_cluster/test_web_pages.py             |   6 +-
 tests/metadata/test_compute_stats.py               |  22 ++--
 tests/metadata/test_ddl.py                         |   9 +-
 tests/query_test/test_rows_availability.py         |   7 +-
 tests/query_test/test_udfs.py                      |  13 ++-
 tests/stress/query_retries_stress_runner.py        |   6 +-
 tests/util/cancel_util.py                          |   2 +-
 37 files changed, 277 insertions(+), 263 deletions(-)

diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 7979c4754..142cf123e 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -193,7 +193,7 @@ class ImpalaCluster(object):
     n = 0
     for impalad in self.impalads:
       try:
-        client = impalad.service.create_beeswax_client()
+        client = impalad.service.create_hs2_client()
         result = client.execute("select 1")
         assert result.success
         ++n
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index 5e1d06266..323d2a2b8 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -849,8 +849,7 @@ class ImpylaHS2Connection(ImpalaConnection):
         return True
       elif impala_state == ERROR:
         try:
-          error_log = self.__do_rpc(
-            lambda: self.imp_service.get_log(operation_handle.log_context))
+          error_log = operation_handle.get_handle().get_log()
           raise impyla_error.OperationalError(error_log, None)
         finally:
           self.close_query(operation_handle)
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 49c059c5b..d873742c1 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -32,6 +32,7 @@ from datetime import datetime
 from time import sleep, time
 
 from tests.common.impala_connection import create_connection, 
create_ldap_connection
+from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
 from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport
 
@@ -440,9 +441,10 @@ class ImpaladService(BaseImpalaService):
     return self.is_port_open(self.webserver_port)
 
   def create_beeswax_client(self, use_kerberos=False):
-    """Creates a new beeswax client connection to the impalad"""
+    """Creates a new beeswax client connection to the impalad.
+    DEPRECATED: Use create_hs2_client() instead."""
     client = create_connection('%s:%d' % (self.hostname, self.beeswax_port),
-                               use_kerberos, 'beeswax')
+                               use_kerberos, BEESWAX)
     client.connect()
     return client
 
@@ -468,7 +470,7 @@ class ImpaladService(BaseImpalaService):
 
   def create_hs2_client(self):
     """Creates a new HS2 client connection to the impalad"""
-    client = create_connection('%s:%d' % (self.hostname, self.hs2_port), 
protocol='hs2')
+    client = create_connection('%s:%d' % (self.hostname, self.hs2_port), 
protocol=HS2)
     client.connect()
     return client
 
@@ -495,11 +497,11 @@ class ImpaladService(BaseImpalaService):
 
   def create_client(self, protocol):
     """Creates a new client connection for given protocol to this impalad"""
-    port = self.beeswax_port
-    if protocol == 'hs2':
-      port = self.hs2_port
-    elif protocol == 'hs2-http':
+    port = self.hs2_port
+    if protocol == HS2_HTTP:
       port = self.hs2_http_port
+    if protocol == BEESWAX:
+      port = self.beeswax_port
     client = create_connection('%s:%d' % (self.hostname, port), 
protocol=protocol)
     client.connect()
     return client
diff --git a/tests/common/resource_pool_config.py 
b/tests/common/resource_pool_config.py
index 1e267b696..162a7b8e0 100644
--- a/tests/common/resource_pool_config.py
+++ b/tests/common/resource_pool_config.py
@@ -61,7 +61,7 @@ class ResourcePoolConfig(object):
     if impala as picked up the change to that metric and is now equal to the
     'target'val'. Times out after 'timeout' seconds"""
     metric_str = self.CONFIG_TO_METRIC_STR_MAPPING[config_str]
-    client = self.impala_service.create_beeswax_client()
+    client = self.impala_service.create_hs2_client()
     client.set_configuration_option('request_pool', pool_name)
     # set mem_limit to something above the proc limit so that the query always 
gets
     # rejected.
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index b5266c675..72c1d6534 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -47,7 +47,7 @@ from tests.common.impala_connection import (
 from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_dimensions import (
-    HS2, BEESWAX,
+    HS2,
     add_mandatory_exec_option,
     create_exec_option_dimension,
     create_single_exec_option_dimension,
@@ -204,13 +204,6 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
 
 class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
 
-  @classmethod
-  def default_test_protocol(cls):
-    # HS2TestSuite override self.hs2_client with a raw Impala hs2 thrift 
client.
-    # This will set self.client = self.beeswax_client.
-    # Do not change this. Multiple test method has been hardcoded under this 
assumption.
-    return BEESWAX
-
   def __check_pool_rejected(self, client, pool, expected_error_re):
     try:
       client.set_configuration({'request_pool': pool})
@@ -286,7 +279,7 @@ class 
TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
     to require a specific pool, and validate that the per-pool configurations 
were
     applied."""
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     # Expected default mem limit for queueA, used in several tests below
     queueA_mem_limit = "MEM_LIMIT=%s" % (128 * 1024 * 1024)
     try:
diff --git a/tests/custom_cluster/test_ai_generate_text.py 
b/tests/custom_cluster/test_ai_generate_text.py
index 5b8d82504..51c70da13 100644
--- a/tests/custom_cluster/test_ai_generate_text.py
+++ b/tests/custom_cluster/test_ai_generate_text.py
@@ -49,7 +49,7 @@ class TestAIGenerateText(CustomClusterTestSuite):
       '--impalad_args=--ai_additional_platforms="bad.site" '
       '--ai_endpoint="https://bad.site";'])
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     err = self.execute_query_expect_failure(client, 
self.ai_generate_text_default_query)
     assert re.search(re.escape(self.AI_GENERATE_COMMON_ERR_PREFIX), str(err))
     assert re.search(re.escape(self.AI_CURL_NETWORK_ERR), str(err))
@@ -61,7 +61,7 @@ class TestAIGenerateText(CustomClusterTestSuite):
       '--ai_endpoint="https://api.openai.com/v1/chat/completions"; '
       '--ai_api_key_jceks_secret=""'])
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     err = self.execute_query_expect_failure(client, 
self.ai_generate_text_default_query)
     assert re.search(re.escape(self.AI_GENERATE_COMMON_ERR_PREFIX), str(err))
     assert re.search(re.escape(self.AI_CURL_NETWORK_ERR), str(err))
diff --git a/tests/custom_cluster/test_blacklist.py 
b/tests/custom_cluster/test_blacklist.py
index 9891bcd4e..6a80ab611 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -233,13 +233,13 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite):
       print("Generated dir " + dir_path)
     return result
 
-  def setup_method(self, method):
+  def setup_method(self, method):  # noqa: U100
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this class, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
     pass
 
-  def teardown_method(self, method):
+  def teardown_method(self, method):  # noqa: U100
     self.clear_tmp_dirs()
 
   @SkipIfBuildType.not_dev_build
@@ -264,7 +264,7 @@ class TestBlacklistFaultyDisk(CustomClusterTestSuite):
     # First set debug_action for query as empty.
     vector.get_value('exec_option')['debug_action'] = ''
     coord_impalad = self.cluster.get_first_impalad()
-    client = coord_impalad.service.create_beeswax_client()
+    client = coord_impalad.service.create_client_from_vector(vector)
 
     # Expect spill to disk to success with debug_action as empty. Verify all 
nodes are
     # active.
diff --git a/tests/custom_cluster/test_catalog_hms_failures.py 
b/tests/custom_cluster/test_catalog_hms_failures.py
index 6feea8d08..e288fd887 100644
--- a/tests/custom_cluster/test_catalog_hms_failures.py
+++ b/tests/custom_cluster/test_catalog_hms_failures.py
@@ -21,10 +21,10 @@ import time
 import threading
 
 from subprocess import check_call
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import (
     CustomClusterTestSuite,
     DEFAULT_CLUSTER_SIZE)
+from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
 from tests.common.skip import SkipIf
 from tests.util.event_processor_utils import EventProcessorUtils
 
@@ -74,7 +74,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
 
     try:
       self.client.execute("describe %s" % tbl_name)
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       print(str(e))
       assert "Failed to load metadata for table: %s. Running 'invalidate 
metadata %s' "\
           "may resolve this problem." % (tbl_name, tbl_name) in str(e)
@@ -104,7 +104,7 @@ class TestHiveMetaStoreFailure(CustomClusterTestSuite):
     for _ in range(2):
       try:
         self.client.execute("describe {0}".format(table))
-      except ImpalaBeeswaxException as e:
+      except IMPALA_CONNECTION_EXCEPTION as e:
         assert "Failed to load metadata for table: %s. "\
                "Running 'invalidate metadata %s' may resolve this problem." \
                % (table, table) in str(e)
@@ -203,7 +203,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     again"""
     # Make sure that catalogd is connected to HMS
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.reload_metadata(client)
 
     # Kill HMS
@@ -214,7 +214,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     start = time.time()
     try:
       self.reload_metadata(client)
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "Connection refused" in str(e)
     else:
       assert False, "Metadata load should have failed"
@@ -237,7 +237,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     HMS is started a little later"""
     # Make sure that catalogd is connected to HMS
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.reload_metadata(client)
 
     # Kill HMS
@@ -279,7 +279,7 @@ class TestCatalogHMSFailures(CustomClusterTestSuite):
     catalogd fails"""
     # Make sure that catalogd is connected to HMS
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.reload_metadata(client)
 
     # Kill HMS
diff --git a/tests/custom_cluster/test_catalog_wait.py 
b/tests/custom_cluster/test_catalog_wait.py
index eb1cc542e..1625e78fe 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -18,10 +18,10 @@
 from __future__ import absolute_import, division, print_function
 import pytest
 
-from time import sleep
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfBuildType
 
+
 @SkipIfBuildType.not_dev_build
 class TestCatalogWait(CustomClusterTestSuite):
   """Impalad coordinators must wait for their local replica of the catalog to 
be
@@ -29,14 +29,9 @@ class TestCatalogWait(CustomClusterTestSuite):
      This test simulates a failed or slow catalog on impalad startup."""
 
   def expect_connection(self, impalad):
-    impalad.service.create_beeswax_client()
     impalad.service.create_hs2_client()
 
   def expect_no_connection(self, impalad):
-    with pytest.raises(Exception) as e:
-      impalad.service.create_beeswax_client()
-      assert 'Could not connect to' in str(e.value)
-
     with pytest.raises(Exception) as e:
       impalad.service.create_hs2_client()
       assert 'Could not connect to' in str(e.value)
@@ -71,8 +66,8 @@ class TestCatalogWait(CustomClusterTestSuite):
     # and does not prematurely register itself as an executor. The former is
     # verified via query fragment metrics and the latter would fail if 
registered
     # but unable to process fragments.
-    client0 = self.cluster.impalads[0].service.create_beeswax_client()
-    client1 = self.cluster.impalads[1].service.create_beeswax_client()
+    client0 = self.cluster.impalads[0].service.create_hs2_client()
+    client1 = self.cluster.impalads[1].service.create_hs2_client()
 
     self.execute_query_expect_success(client0, "select * from 
functional.alltypes");
     self.execute_query_expect_success(client1, "select * from 
functional.alltypes");
diff --git a/tests/custom_cluster/test_catalogd_ha.py 
b/tests/custom_cluster/test_catalogd_ha.py
index 689e12c47..0bcb7dc29 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -462,10 +462,10 @@ class TestCatalogdHA(CustomClusterTestSuite):
     assert(not 
catalogd_service_2.get_metric_value("catalog-server.active-status"))
 
     # Run DDL with SYNC_DDL enabled.
-    client = self.cluster.impalads[0].service.create_beeswax_client()
+    client = self.cluster.impalads[0].service.create_hs2_client()
     assert client is not None
     try:
-      self.execute_query_expect_success(client, "set SYNC_DDL=1")
+      client.set_configuration_option('sync_ddl', 1)
       ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
       handle = client.execute_async(ddl_query.format(database=unique_database))
 
diff --git a/tests/custom_cluster/test_client_ssl.py 
b/tests/custom_cluster/test_client_ssl.py
index dd2b05e93..832b547ae 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -33,6 +33,7 @@ from tests.common.environ import IS_REDHAT_DERIVATIVE
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
 from tests.common.test_dimensions import create_client_protocol_dimension
+from tests.common.test_vector import BEESWAX
 from tests.shell.util import run_impala_shell_cmd, 
run_impala_shell_cmd_no_expect, \
     ImpalaShell, create_impala_shell_executable_dimension
 
@@ -53,6 +54,7 @@ else:
   SKIP_SSL_MSG = None
 CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
 
+
 class TestClientSsl(CustomClusterTestSuite):
   """Tests for a client using SSL (particularly, the Impala Shell) """
 
@@ -93,7 +95,6 @@ class TestClientSsl(CustomClusterTestSuite):
       pytest.skip("Python version does not support tls 1.2")
     super(TestClientSsl, cls).setup_class()
 
-
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, 
statestored_args=SSL_ARGS,
                                     catalogd_args=SSL_ARGS)
@@ -156,7 +157,7 @@ class TestClientSsl(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_impala_shell_executable_dimension(dev_only=True))
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('protocol') != 'beeswax')
+        v.get_value('protocol') != BEESWAX)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args=WEBSERVER_SSL_ARGS,
diff --git a/tests/custom_cluster/test_compact_catalog_updates.py 
b/tests/custom_cluster/test_compact_catalog_updates.py
index de3a6c50d..52b481ca6 100644
--- a/tests/custom_cluster/test_compact_catalog_updates.py
+++ b/tests/custom_cluster/test_compact_catalog_updates.py
@@ -22,6 +22,7 @@ import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
+
 class TestCompactCatalogUpdates(CustomClusterTestSuite):
   @classmethod
   def get_workload(cls):
@@ -47,8 +48,8 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
       impalad2 = self.cluster.impalads[1]
       assert impalad2.service.get_metric_value("catalog.curr-version") > 0
 
-      client1 = impalad1.service.create_beeswax_client()
-      client2 = impalad2.service.create_beeswax_client()
+      client1 = impalad1.service.create_hs2_client()
+      client2 = impalad2.service.create_hs2_client()
       query_options = {"sync_ddl" : 1}
       self.execute_query_expect_success(client1, "refresh functional.alltypes",
           query_options)
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index 3630db369..167c47d4e 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -25,8 +25,10 @@ from multiprocessing import TimeoutError
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import ERROR, FINISHED
 from tests.util.shell_util import dump_server_stacktraces
 
+
 class TestConcurrentDdls(CustomClusterTestSuite):
   """Test concurrent DDLs with invalidate metadata"""
 
@@ -216,7 +218,6 @@ class TestConcurrentDdls(CustomClusterTestSuite):
     for i in range(10):
       self.execute_query("invalidate metadata " + tbl)
       # Always keep a concurrent REFRESH statement running
-      refresh_state = self.client.get_state(refresh_handle)
-      if refresh_state == self.client.QUERY_STATES['FINISHED']\
-          or refresh_state == self.client.QUERY_STATES['EXCEPTION']:
+      refresh_state = self.client.get_impala_exec_state(refresh_handle)
+      if refresh_state == FINISHED or ERROR:
         refresh_handle = self.client.execute_async(refresh_stmt)
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 1f3c4b053..e95cc0c0e 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -31,6 +31,7 @@ from tests.common.test_result_verifier import 
error_msg_startswith
 LOG = logging.getLogger('test_coordinators')
 LOG.setLevel(level=logging.DEBUG)
 
+
 class TestCoordinators(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_multiple_coordinators(self):
@@ -49,7 +50,7 @@ class TestCoordinators(CustomClusterTestSuite):
     # Verify that Beeswax and HS2 client connections can't be established at a 
worker node
     beeswax_client = None
     try:
-      beeswax_client = worker.service.create_beeswax_client()
+      beeswax_client = worker.service.create_hs2_client()
     except Exception as e:
       LOG.info("Caught exception {0}".format(e))
     finally:
@@ -65,8 +66,8 @@ class TestCoordinators(CustomClusterTestSuite):
 
     # Verify that queries can successfully run on coordinator nodes
     try:
-      client1 = coordinator1.service.create_beeswax_client()
-      client2 = coordinator2.service.create_beeswax_client()
+      client1 = coordinator1.service.create_hs2_client()
+      client2 = coordinator2.service.create_hs2_client()
 
       # select queries
       self.execute_query_expect_success(client1, "select 1")
@@ -107,7 +108,7 @@ class TestCoordinators(CustomClusterTestSuite):
       coordinator = self.cluster.impalads[0]
       client = None
       try:
-        client = coordinator.service.create_beeswax_client()
+        client = coordinator.service.create_hs2_client()
         assert client is not None
         query = "select count(*) from functional.alltypesagg"
         result = client.execute(query, fetch_exec_summary=True)
@@ -157,7 +158,7 @@ class TestCoordinators(CustomClusterTestSuite):
 
     coordinator = self.cluster.impalads[0]
     try:
-      client = coordinator.service.create_beeswax_client()
+      client = coordinator.service.create_hs2_client()
 
       # create the database
       self.execute_query_expect_success(client,
@@ -270,7 +271,7 @@ class TestCoordinators(CustomClusterTestSuite):
 
     client = None
     try:
-      client = coordinator.service.create_beeswax_client()
+      client = coordinator.service.create_hs2_client()
       assert client is not None
 
       client.execute("SET EXPLAIN_LEVEL=2")
@@ -332,12 +333,12 @@ class TestCoordinators(CustomClusterTestSuite):
                                     impalad_args="-num_expected_executors=10")
   def test_num_expected_executors_flag(self):
     """Verifies that the '-num_expected_executors' flag is effective."""
-    client = self.cluster.impalads[0].service.create_beeswax_client()
-    client.execute("set explain_level=2")
+    client = self.cluster.impalads[0].service.create_hs2_client()
+    client.set_configuration_option("explain_level", "2")
     ret = client.execute("explain select * from functional.alltypes a inner 
join "
                          "functional.alltypes b on a.id = b.id;")
     num_hosts = "hosts=10 instances=10"
-    assert num_hosts in str(ret)
+    assert num_hosts in str(ret.tuples())
 
   @SkipIfFS.hbase
   @SkipIf.skip_hbase
@@ -346,7 +347,7 @@ class TestCoordinators(CustomClusterTestSuite):
     """Verifies HBase tables can be scanned by executor only impalads."""
     self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
                          use_exclusive_coordinators=True)
-    client = self.cluster.impalads[0].service.create_beeswax_client()
+    client = self.cluster.impalads[0].service.create_hs2_client()
     query = "select count(*) from functional_hbase.alltypes"
     result = self.execute_query_expect_success(client, query)
     assert result.data == ['7300']
diff --git a/tests/custom_cluster/test_exchange_eos.py 
b/tests/custom_cluster/test_exchange_eos.py
index c6e2999dd..439d9b50f 100644
--- a/tests/custom_cluster/test_exchange_eos.py
+++ b/tests/custom_cluster/test_exchange_eos.py
@@ -49,7 +49,7 @@ class TestExchangeEos(CustomClusterTestSuite):
 
     cluster = ImpalaCluster.get_e2e_test_cluster()
     coordinator = cluster.get_first_impalad()
-    client = coordinator.service.create_beeswax_client()
+    client = coordinator.service.create_hs2_client()
 
     vector.get_value('exec_option')['spool_query_results'] = 'true'
     for query in ["select * from tpch.lineitem order by l_orderkey limit 
10000",
diff --git a/tests/custom_cluster/test_frontend_connection_limit.py 
b/tests/custom_cluster/test_frontend_connection_limit.py
index 409b2b9a1..86e873a26 100644
--- a/tests/custom_cluster/test_frontend_connection_limit.py
+++ b/tests/custom_cluster/test_frontend_connection_limit.py
@@ -20,7 +20,6 @@ import pytest
 
 from threading import Thread
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 
 # This custom cluster test exercises the behavior of the front end thrift
 # server on how a new client connection request is handled, after the maximum
@@ -42,13 +41,8 @@ class TestFrontendConnectionLimit(CustomClusterTestSuite):
     super(TestFrontendConnectionLimit, cls).add_test_dimensions()
 
   def _connect_and_query(self, query, impalad):
-    client = impalad.service.create_beeswax_client()
-    try:
+    with impalad.service.create_hs2_client() as client:
       client.execute(query)
-    except Exception as e:
-      client.close()
-      raise ImpalaBeeswaxException(str(e))
-    client.close()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_hdfs_fd_caching.py 
b/tests/custom_cluster/test_hdfs_fd_caching.py
index 9cb6936a2..8574183e4 100644
--- a/tests/custom_cluster/test_hdfs_fd_caching.py
+++ b/tests/custom_cluster/test_hdfs_fd_caching.py
@@ -52,8 +52,7 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
 
   def setup_method(self, method):
     super(TestHdfsFdCaching, self).setup_method(method)
-    impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = self.hs2_client
 
     self.client = client
     client.execute("drop database if exists cachefd cascade")
@@ -63,8 +62,8 @@ class TestHdfsFdCaching(CustomClusterTestSuite):
     self.create_n_files(1)
 
   def teardown_method(self, method):
-    super(TestHdfsFdCaching, self).teardown_method(method)
     self.client.execute("drop database if exists cachefd cascade")
+    super(TestHdfsFdCaching, self).teardown_method(method)
 
   def run_fd_caching_test(self, vector, caching_expected, cache_capacity,
       eviction_timeout_secs):
diff --git a/tests/custom_cluster/test_insert_behaviour.py 
b/tests/custom_cluster/test_insert_behaviour.py
index f74ba80d2..77937450c 100644
--- a/tests/custom_cluster/test_insert_behaviour.py
+++ b/tests/custom_cluster/test_insert_behaviour.py
@@ -50,7 +50,7 @@ class 
TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
 
   def _get_impala_client(self):
     impalad = self.cluster.get_any_impalad()
-    return impalad.service.create_beeswax_client()
+    return impalad.service.create_hs2_client()
 
   def _create_test_tbl(self):
     client = self._get_impala_client()
@@ -116,7 +116,7 @@ class 
TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
   def test_insert_inherit_permission_disabled(self):
     """Check that turning off insert permission inheritance works correctly."""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     try:
       ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/p1=1/" % 
TEST_TBL)
       default_perms = ls['FileStatus']['permission']
@@ -129,7 +129,7 @@ class 
TestInsertBehaviourCustomCluster(CustomClusterTestSuite):
         self._check_partition_perms("p1=1/p2=3/", default_perms)
       self._check_partition_perms("p1=1/p2=3/p3=4/", default_perms)
     finally:
-       client.close()
+      client.close()
 
 
 @SkipIfFS.hive
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index d83a17f53..f9b8858da 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -82,8 +82,8 @@ class TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
     try:
       impalad1 = self.cluster.impalads[0]
       impalad2 = self.cluster.impalads[1]
-      client1 = impalad1.service.create_beeswax_client()
-      client2 = impalad2.service.create_beeswax_client()
+      client1 = impalad1.service.create_hs2_client()
+      client2 = impalad2.service.create_hs2_client()
 
       view = "%s.my_view" % unique_database
 
@@ -126,7 +126,7 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
     """
     try:
       impalad = self.cluster.impalads[0]
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_hs2_client()
 
       view = "%s.my_view" % unique_database
       self.execute_query_expect_success(client, "create view %s as select 1" % 
view)
@@ -223,8 +223,8 @@ class 
TestLocalCatalogCompactUpdates(CustomClusterTestSuite):
     try:
       impalad1 = self.cluster.impalads[0]
       impalad2 = self.cluster.impalads[1]
-      client1 = impalad1.service.create_beeswax_client()
-      client2 = impalad2.service.create_beeswax_client()
+      client1 = impalad1.service.create_hs2_client()
+      client2 = impalad2.service.create_hs2_client()
 
       # Create something to make the cache not empty.
       self.execute_query_expect_success(
@@ -265,8 +265,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     # Tracks query failures for all other reasons.
     failed_queries = queue.Queue()
     try:
-      client1 = self.cluster.impalads[0].service.create_beeswax_client()
-      client2 = self.cluster.impalads[1].service.create_beeswax_client()
+      client1 = self.cluster.impalads[0].service.create_hs2_client()
+      client2 = self.cluster.impalads[1].service.create_hs2_client()
 
       def stress_thread(client):
         # Loops, picks a random query in each iteration, runs it,
@@ -277,7 +277,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
           attempt += 1
           try:
             print('Attempt', attempt, 'client', str(client))
-            ret = self.execute_query_unchecked(client, q)
+            self.execute_query_unchecked(client, q)
           except Exception as e:
             if 'InconsistentMetadataFetchException' in str(e):
               with inconsistent_seen_lock:
@@ -354,8 +354,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     try:
       impalad1 = self.cluster.impalads[0]
       impalad2 = self.cluster.impalads[1]
-      client1 = impalad1.service.create_beeswax_client()
-      client2 = impalad2.service.create_beeswax_client()
+      client1 = impalad1.service.create_hs2_client()
+      client2 = impalad2.service.create_hs2_client()
 
       # Create a view in client 1, cache the table list including that view in
       # client 2, and then drop it in client 1. While we've still cached the
@@ -472,7 +472,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
     self.execute_query(
         "insert into {0}.tbl partition(p) values 
(0,0)".format(unique_database))
 
-    def read_part(i):
+    def read_part(i):  # noqa: U100
       self.execute_query_expect_success(
           tls.c, "select * from {0}.tbl where p=0".format(unique_database))
 
@@ -486,6 +486,7 @@ class TestLocalCatalogRetries(CustomClusterTestSuite):
       # Refresh to invalidate the partition in local catalog cache
       self.execute_query("refresh {0}.tbl 
partition(p=0)".format(unique_database))
 
+
 class TestLocalCatalogObservability(CustomClusterTestSuite):
   def get_catalog_cache_metrics(self, impalad):
     """ Returns catalog cache metrics as a dict by scraping the json metrics 
page on the
@@ -518,7 +519,7 @@ class TestLocalCatalogObservability(CustomClusterTestSuite):
       # Make sure /catalog_object endpoint is disabled on web UI.
       assert 'No URI handler for &apos;/catalog_object&apos;' \
         in impalad.service.read_debug_webpage('/catalog_object')
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_hs2_client()
       cache_hit_rate_metric_key = "catalog.cache.hit-rate"
       cache_miss_rate_metric_key = "catalog.cache.miss-rate"
       cache_hit_count_metric_key = "catalog.cache.hit-count"
@@ -594,6 +595,7 @@ class TestLocalCatalogObservability(CustomClusterTestSuite):
         % test_tbl
     self.assert_impalad_log_contains('INFO', log_regex)
 
+
 class TestFullAcid(CustomClusterTestSuite):
   @classmethod
   def get_workload(self):
@@ -617,6 +619,7 @@ class TestFullAcid(CustomClusterTestSuite):
   def test_full_acid_scans(self, vector, unique_database):
     self.run_test_case('QueryTest/full-acid-scans', vector, 
use_db=unique_database)
 
+
 class TestReusePartitionMetadata(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_mem_reservations.py 
b/tests/custom_cluster/test_mem_reservations.py
index 36996f910..14674532e 100644
--- a/tests/custom_cluster/test_mem_reservations.py
+++ b/tests/custom_cluster/test_mem_reservations.py
@@ -24,6 +24,7 @@ from tests.common.custom_cluster_test_suite import 
CustomClusterTestSuite
 from tests.common.impala_test_suite import LOG
 from tests.verifiers.metric_verifier import MetricVerifier
 
+
 class TestMemReservations(CustomClusterTestSuite):
   """Tests for memory reservations that require custom cluster arguments."""
 
@@ -34,7 +35,7 @@ class TestMemReservations(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--buffer_pool_limit=2g 
--memory_maintenance_sleep_time_ms=100")
-  def test_per_backend_min_reservation(self, vector):
+  def test_per_backend_min_reservation(self):
     """Tests that the per-backend minimum reservations are used (IMPALA-4833).
        The test sets the buffer_pool_limit very low (2gb), and then runs a 
query against
        two different coordinators. The query was created to have different 
minimum
@@ -82,7 +83,7 @@ class TestMemReservations(CustomClusterTestSuite):
         self.error = None
 
       def run(self):
-        client = self.coordinator.service.create_beeswax_client()
+        client = self.coordinator.service.create_hs2_client()
         try:
           client.set_configuration(CONFIG_MAP)
           for i in range(20):
diff --git a/tests/custom_cluster/test_process_failures.py 
b/tests/custom_cluster/test_process_failures.py
index 01668d139..4092d48d4 100644
--- a/tests/custom_cluster/test_process_failures.py
+++ b/tests/custom_cluster/test_process_failures.py
@@ -47,7 +47,7 @@ class TestProcessFailures(CustomClusterTestSuite):
   def test_restart_coordinator(self):
     """Restarts the coordinator between queries."""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
 
     self.execute_query_expect_success(client, QUERY)
 
@@ -56,7 +56,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     statestored.service.wait_for_live_subscribers(DEFAULT_NUM_SUBSCRIBERS, 
timeout=60)
 
     # Reconnect
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     impalad.service.wait_for_metric_value('catalog.ready', 1, timeout=60)
     self.execute_query_expect_success(client, QUERY)
 
@@ -67,7 +67,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     """"Tests that when a coordinator running multiple queries is killed, all
     running fragments on executors are cancelled."""
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     assert client is not None
     # A query which is cancelable and takes long time to execute
     query = "select * from tpch.lineitem t1, tpch.lineitem t2, tpch.lineitem 
t3 " \
@@ -100,7 +100,7 @@ class TestProcessFailures(CustomClusterTestSuite):
   def test_restart_statestore(self):
     """Tests the cluster still functions when the statestore dies."""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     statestored = self.cluster.statestored
     statestored.kill()
     impalad.service.wait_for_metric_value(
@@ -128,7 +128,7 @@ class TestProcessFailures(CustomClusterTestSuite):
   def test_kill_restart_worker(self):
     """Verifies a worker is able to be killed."""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_expect_success(client, QUERY)
 
     # select a different impalad and restart it
@@ -182,7 +182,7 @@ class TestProcessFailures(CustomClusterTestSuite):
   def test_restart_catalogd(self):
     # Choose a random impalad verify a query can run against it.
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_expect_success(client, QUERY)
 
     # Kill the catalogd.
@@ -208,7 +208,7 @@ class TestProcessFailures(CustomClusterTestSuite):
   def test_restart_all_impalad(self):
     """Restarts all the impalads and runs a query"""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_expect_success(client, QUERY)
 
     # Kill each impalad and wait for the statestore to register the failures.
@@ -229,7 +229,7 @@ class TestProcessFailures(CustomClusterTestSuite):
     for impalad in self.cluster.impalads:
       impalad.service.wait_for_num_known_live_backends(DEFAULT_CLUSTER_SIZE, 
timeout=60)
       impalad.service.wait_for_metric_value('catalog.ready', True, timeout=60)
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_hs2_client()
       self.execute_query_expect_success(client, QUERY)
       # Make sure the catalog service is actually back up by executing an 
operation
       # against it.
diff --git a/tests/custom_cluster/test_query_expiration.py 
b/tests/custom_cluster/test_query_expiration.py
index ae0eaf02d..3f088713f 100644
--- a/tests/custom_cluster/test_query_expiration.py
+++ b/tests/custom_cluster/test_query_expiration.py
@@ -25,6 +25,8 @@ import threading
 from time import sleep, time
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_connection import ERROR, FINISHED
+
 
 class TestQueryExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
@@ -33,26 +35,32 @@ class TestQueryExpiration(CustomClusterTestSuite):
     in_flight_queries = impalad.service.get_in_flight_queries()
     # Guard against too few in-flight queries.
     assert expected <= len(in_flight_queries)
+    executing_ids = list()
+    waiting_ids = list()
     actual = waiting = 0
     for query in in_flight_queries:
       if query["executing"]:
         actual += 1
+        executing_ids.append(query["query_id"])
       else:
         assert query["waiting"]
         waiting += 1
-    assert actual == expected, '%s out of %s queries executing (expected %s)' \
-        % (actual, len(in_flight_queries), expected)
-    assert waiting == expect_waiting, '%s out of %s queries waiting (expected 
%s)' \
-        % (waiting, len(in_flight_queries), expect_waiting)
+        waiting_ids.append(query["query_id"])
+    assert actual == expected, (
+        '{0} out of {1} queries executing (expected {2}). 
query_id={3}').format(
+            actual, len(in_flight_queries), expected, str(executing_ids))
+    assert waiting == expect_waiting, (
+        '{0} out of {1} queries waiting (expected {2}). query_id={3}').format(
+            waiting, len(in_flight_queries), expect_waiting, str(waiting_ids))
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--idle_query_timeout=8",
       disable_log_buffering=True)
-  def test_query_expiration(self, vector):
+  def test_query_expiration(self):
     """Confirm that single queries expire if not fetched"""
     impalad = self.cluster.get_first_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     num_expired = 
impalad.service.get_metric_value('impala-server.num-queries-expired')
     handles = []
 
@@ -62,29 +70,29 @@ class TestQueryExpiration(CustomClusterTestSuite):
     handles.append(default_timeout_expire_handle)
 
     # This query will hit a lower time limit.
-    client.execute("SET EXEC_TIME_LIMIT_S=3")
+    client.set_configuration_option("EXEC_TIME_LIMIT_S", "3")
     time_limit_expire_handle = client.execute_async(query1)
     handles.append(time_limit_expire_handle)
 
     # This query will hit a lower idle timeout instead of the default timeout 
or time
     # limit.
-    client.execute("SET EXEC_TIME_LIMIT_S=5")
-    client.execute("SET QUERY_TIMEOUT_S=3")
+    client.set_configuration_option("EXEC_TIME_LIMIT_S", "5")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "3")
     short_timeout_expire_handle = client.execute_async("SELECT SLEEP(2000000)")
     handles.append(short_timeout_expire_handle)
-    client.execute("SET EXEC_TIME_LIMIT_S=0")
+    client.set_configuration_option("EXEC_TIME_LIMIT_S", "0")
 
     # Set a huge timeout, to check that the server bounds it by 
--idle_query_timeout
-    client.execute("SET QUERY_TIMEOUT_S=1000")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "1000")
     default_timeout_expire_handle2 = client.execute_async("SELECT 
SLEEP(3000000)")
     handles.append(default_timeout_expire_handle2)
     self._check_num_executing(impalad, len(handles))
 
     # Run a query that fails, and will timeout due to client inactivity.
-    client.execute("SET QUERY_TIMEOUT_S=1")
-    client.execute('SET MEM_LIMIT=1')
+    client.set_configuration_option("QUERY_TIMEOUT_S", "1")
+    client.set_configuration_option('MEM_LIMIT', '1')
     exception_handle = client.execute_async("select count(*) from 
functional.alltypes")
-    client.execute('SET MEM_LIMIT=1g')
+    client.set_configuration_option('MEM_LIMIT', '1g')
     handles.append(exception_handle)
 
     before = time()
@@ -94,16 +102,13 @@ class TestQueryExpiration(CustomClusterTestSuite):
     # still be running.
     assert num_expired + 3 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
-    assert (client.get_state(short_timeout_expire_handle) ==
-            client.QUERY_STATES['EXCEPTION'])
-    assert (client.get_state(time_limit_expire_handle) ==
-            client.QUERY_STATES['EXCEPTION'])
-    assert (client.get_state(exception_handle) == 
client.QUERY_STATES['EXCEPTION'])
-    assert (client.get_state(default_timeout_expire_handle) ==
-            client.QUERY_STATES['FINISHED'])
-    assert (client.get_state(default_timeout_expire_handle2) ==
-            client.QUERY_STATES['FINISHED'])
-    # The query cancelled by exec_time_limit_s should be waiting to be closed.
+    assert (client.get_impala_exec_state(short_timeout_expire_handle) == ERROR)
+    assert (client.get_impala_exec_state(time_limit_expire_handle) == ERROR)
+    assert (client.get_impala_exec_state(exception_handle) == ERROR)
+    assert (client.get_impala_exec_state(default_timeout_expire_handle) == 
FINISHED)
+    assert (client.get_impala_exec_state(default_timeout_expire_handle2) == 
FINISHED)
+    # The query cancelled by exec_time_limit_s (time_limit_expire_handle) 
should be
+    # waiting to be closed.
     self._check_num_executing(impalad, 2, 1)
     self.__expect_expired(client, query1, short_timeout_expire_handle,
         r"Query [0-9a-f]+:[0-9a-f]+ expired due to "
@@ -113,6 +118,9 @@ class TestQueryExpiration(CustomClusterTestSuite):
     self.__expect_expired(client, query1, exception_handle,
         r"minimum memory reservation is greater than memory available.*\nQuery 
"
         + r"[0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout is 
1s000ms\)")
+    # hs2 client does not automaticaly close time_limit_expire_handle.
+    # manually close it.
+    client.close_query(time_limit_expire_handle)
     self._check_num_executing(impalad, 2)
     # Both queries with query_timeout_s < 4 should generate this message.
     self.assert_impalad_log_contains('INFO', "Expiring query due to client 
inactivity: "
@@ -128,16 +136,14 @@ class TestQueryExpiration(CustomClusterTestSuite):
     # The metric and client state are not atomically maintained. Since the
     # expiration metric has just been reached, accessing the client state
     # is guarded in a loop to avoid flaky false negatives.
-    self.__expect_client_state(client, default_timeout_expire_handle,
-                               client.QUERY_STATES['EXCEPTION'])
-    self.__expect_client_state(client, default_timeout_expire_handle2,
-                               client.QUERY_STATES['EXCEPTION'])
+    self.__expect_client_state(client, default_timeout_expire_handle, ERROR)
+    self.__expect_client_state(client, default_timeout_expire_handle2, ERROR)
 
     # Check that we didn't wait too long to be expired (double the timeout is 
sufficiently
     # large to avoid most noise in measurement)
     assert time() - before < 16
 
-    client.execute("SET QUERY_TIMEOUT_S=0")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "0")
     # Synchronous execution; calls fetch() and query should not time out.
     # Note: could be flakey if execute() takes too long to call fetch() etc 
after the
     # query completes.
@@ -148,6 +154,9 @@ class TestQueryExpiration(CustomClusterTestSuite):
         == num_expired + len(handles)
     self._check_num_executing(impalad, 0)
     for handle in handles:
+      if handle == time_limit_expire_handle:
+        # This is manually closed already.
+        continue
       try:
         client.close_query(handle)
         assert False, "Close should always throw an exception"
@@ -165,37 +174,37 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=0")
-  def test_query_expiration_no_default(self, vector):
+  def test_query_expiration_no_default(self):
     """Confirm that single queries expire if no default is set, but a per-query
     expiration or time limit is set"""
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     num_expired = 
impalad.service.get_metric_value('impala-server.num-queries-expired')
     query = "SELECT SLEEP(1000000)"
-    client.execute("SET QUERY_TIMEOUT_S=1")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "1")
     timeout_handle = client.execute_async(query)
-    client.execute("SET QUERY_TIMEOUT_S=0")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "0")
 
-    client.execute("SET EXEC_TIME_LIMIT_S=1")
+    client.set_configuration_option("EXEC_TIME_LIMIT_S", "1")
     time_limit_handle = client.execute_async(query)
-    client.execute("SET EXEC_TIME_LIMIT_S=0")
+    client.set_configuration_option("EXEC_TIME_LIMIT_S", "0")
 
     # Set a huge timeout, server should not expire the query while this test 
is running
-    client.execute("SET QUERY_TIMEOUT_S=1000")
+    client.set_configuration_option("QUERY_TIMEOUT_S", "1000")
     no_timeout_handle = client.execute_async(query)
 
-    before = time()
     sleep(4)
 
     # Query with timeout of 1 should have expired, other query should still be 
running.
     assert num_expired + 2 == impalad.service.get_metric_value(
       'impala-server.num-queries-expired')
 
-    assert client.get_state(timeout_handle) == client.QUERY_STATES['EXCEPTION']
-    assert client.get_state(time_limit_handle) == 
client.QUERY_STATES['EXCEPTION']
-    assert client.get_state(no_timeout_handle) == 
client.QUERY_STATES['FINISHED']
+    assert client.get_impala_exec_state(timeout_handle) == ERROR
+    assert client.get_impala_exec_state(time_limit_handle) == ERROR
+    assert client.get_impala_exec_state(no_timeout_handle) == FINISHED
     self.__expect_expired(client, query, timeout_handle,
-        "Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity \(timeout 
is 1s000ms\)")
+        r"Query [0-9a-f]+:[0-9a-f]+ expired due to client inactivity "
+        r"\(timeout is 1s000ms\)")
     self.__expect_expired(client, query, time_limit_handle,
         "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 
1s000ms")
 
@@ -211,14 +220,14 @@ class TestQueryExpiration(CustomClusterTestSuite):
     """Try to fetch 'expected_state' from 'client' within 'timeout' seconds.
     Fail if unable."""
     start_time = time()
-    actual_state = client.get_state(handle)
+    actual_state = client.get_impala_exec_state(handle)
     while (actual_state != expected_state and time() - start_time < timeout):
       actual_state = client.get_state(handle)
     assert expected_state == actual_state
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_query_timeout=1")
-  def test_concurrent_query_expiration(self, vector):
+  def test_concurrent_query_expiration(self):
     """Confirm that multiple concurrent queries are correctly expired if not 
fetched"""
     class ExpiringQueryThread(threading.Thread):
       """Thread that runs a query and does not fetch so it will time out."""
@@ -250,9 +259,9 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
       def run(self):
         # Query will not be idle but will hit time limit.
-        self.client.execute("SET EXEC_TIME_LIMIT_S=1")
+        self.client.set_configuration_option("EXEC_TIME_LIMIT_S", "1")
         try:
-          result = self.client.execute("SELECT SLEEP(2500)")
+          self.client.execute("SELECT SLEEP(2500)")
           assert "Expected to hit time limit"
         except Exception as e:
           self.exception = e
@@ -267,23 +276,23 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
       def run(self):
         # Query will complete before time limit.
-        self.client.execute("SET EXEC_TIME_LIMIT_S=10")
+        self.client.set_configuration_option("EXEC_TIME_LIMIT_S", "10")
         result = self.client.execute("SELECT count(*) FROM 
functional.alltypes")
         self.success = result.success
         self.data = result.data
 
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     num_expired = 
impalad.service.get_metric_value('impala-server.num-queries-expired')
     non_expiring_threads = \
-        [NonExpiringQueryThread(impalad.service.create_beeswax_client())
+        [NonExpiringQueryThread(impalad.service.create_hs2_client())
          for _ in range(5)]
-    expiring_threads = 
[ExpiringQueryThread(impalad.service.create_beeswax_client())
+    expiring_threads = 
[ExpiringQueryThread(impalad.service.create_hs2_client())
                         for _ in range(5)]
-    time_limit_threads = 
[TimeLimitThread(impalad.service.create_beeswax_client())
+    time_limit_threads = [TimeLimitThread(impalad.service.create_hs2_client())
                         for _ in range(5)]
     non_expiring_time_limit_threads = [
-        NonExpiringTimeLimitThread(impalad.service.create_beeswax_client())
+        NonExpiringTimeLimitThread(impalad.service.create_hs2_client())
         for _ in range(5)]
     all_threads = non_expiring_threads + expiring_threads + time_limit_threads 
+\
         non_expiring_time_limit_threads
@@ -296,14 +305,14 @@ class TestQueryExpiration(CustomClusterTestSuite):
     for t in non_expiring_threads:
       assert t.success
     for t in expiring_threads:
-      self.__expect_client_state(client, t.handle, 
client.QUERY_STATES['EXCEPTION'])
+      self.__expect_client_state(client, t.handle, ERROR)
     for t in time_limit_threads:
       assert re.search(
           "Query [0-9a-f]+:[0-9a-f]+ expired due to execution time limit of 
1s000ms",
           str(t.exception))
     for t in non_expiring_time_limit_threads:
       assert t.success
-      assert t.data[0] == '7300' # Number of rows in alltypes.
+      assert t.data[0] == '7300'  # Number of rows in alltypes.
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args()
@@ -330,5 +339,4 @@ class TestQueryExpiration(CustomClusterTestSuite):
 
     assert time() - before < 10
 
-    self.__expect_client_state(self.client, handle,
-                               self.client.QUERY_STATES['EXCEPTION'])
+    self.__expect_client_state(self.client, handle, ERROR)
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 4cfc278e5..3599cc715 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -932,7 +932,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
     query = self._count_query
-    client = self.cluster.get_first_impalad().service.create_beeswax_client()
+    client = self.cluster.get_first_impalad().service.create_hs2_client()
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
     client.wait_for_impala_state(handle, FINISHED, 60)
@@ -950,10 +950,11 @@ class TestQueryRetries(CustomClusterTestSuite):
     try:
       client.fetch(query, handle)
     except Exception as e:
-      assert "Client session expired" in str(e)
+      assert "Invalid or unknown query handle: {}".format(query_id) in str(e)
 
     # Assert that the impalad metrics show one expired session.
-    assert 
impalad_service.get_metric_value('impala-server.num-sessions-expired') == 1
+    # hs2_client opens new session on each execute_async(), so there should be 
2.
+    assert 
impalad_service.get_metric_value('impala-server.num-sessions-expired') == 2
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -1218,13 +1219,13 @@ class 
TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
       order by o_orderdate
       """
 
-  def setup_method(self, method):
+  def setup_method(self, method):  # noqa: U100
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this class, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
     pass
 
-  def teardown_method(self, method):
+  def teardown_method(self, method):  # noqa: U100
     self.clear_tmp_dirs()
 
   def __generate_scratch_dir(self, num):
@@ -1267,7 +1268,7 @@ class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
         expected_count=1)
 
     coord_impalad = self.cluster.get_first_impalad()
-    client = coord_impalad.service.create_beeswax_client()
+    client = coord_impalad.service.create_hs2_client()
 
     disk_failure_impalad = self.cluster.impalads[1]
     assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
diff --git a/tests/custom_cluster/test_refresh_invalid_partition.py 
b/tests/custom_cluster/test_refresh_invalid_partition.py
index 57b26fded..131a6f181 100644
--- a/tests/custom_cluster/test_refresh_invalid_partition.py
+++ b/tests/custom_cluster/test_refresh_invalid_partition.py
@@ -29,7 +29,7 @@ class TestRefreshInvalidPartition(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     catalogd_args="--topic_update_log_gc_frequency=10")
-  def test_refresh_invalid_partition_with_sync_ddl(self, vector, 
unique_database):
+  def test_refresh_invalid_partition_with_sync_ddl(self, unique_database):
     """
     Regression test for IMPALA-12448. Avoid getting stuck when refreshing a
     non-existent partition with sync_ddl.
@@ -67,8 +67,8 @@ class TestRefreshInvalidPartition(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     statestored_args="--statestore_update_frequency_ms=5000")
   def test_refresh_missing_partition(self, unique_database):
-    client1 = self.cluster.impalads[1].service.create_beeswax_client()
-    client2 = self.cluster.impalads[2].service.create_beeswax_client()
+    client1 = self.cluster.impalads[1].service.create_hs2_client()
+    client2 = self.cluster.impalads[2].service.create_hs2_client()
     self.client.execute('create table {}.tbl (i int) partitioned by (p int)'
         .format(unique_database))
     self.execute_query(
diff --git a/tests/custom_cluster/test_restart_services.py 
b/tests/custom_cluster/test_restart_services.py
index 8f962d3c5..5ae9232e4 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -34,9 +34,9 @@ from time import sleep
 from impala.error import HiveServer2Error
 from TCLIService import TCLIService
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.impala_connection import ERROR, RUNNING
+from tests.common.impala_connection import (
+    ERROR, FINISHED, IMPALA_CONNECTION_EXCEPTION, RUNNING)
 from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfFS
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
 
@@ -80,7 +80,7 @@ class TestRestart(CustomClusterTestSuite):
       self._start_impala_cluster([], num_coordinators=1, cluster_size=3)
       assert len(self.cluster.impalads) == 3
 
-      client = self.cluster.impalads[0].service.create_beeswax_client()
+      client = self.cluster.impalads[0].service.create_hs2_client()
       assert client is not None
 
       for i in range(5):
@@ -107,7 +107,7 @@ class TestRestart(CustomClusterTestSuite):
         pytest.skip()
 
       assert len(self.cluster.impalads) == 3
-      client = self.cluster.impalads[0].service.create_beeswax_client()
+      client = self.cluster.impalads[0].service.create_hs2_client()
       assert client is not None
 
       handle = client.execute_async(
@@ -436,11 +436,11 @@ class TestRestart(CustomClusterTestSuite):
     slow_query = \
       "select distinct * from tpch_parquet.lineitem where l_orderkey > 
sleep(1000)"
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     try:
       handle = client.execute_async(slow_query)
       # Make sure query starts running.
-      self.client.wait_for_impala_state(handle, RUNNING, 1000)
+      client.wait_for_impala_state(handle, RUNNING, 1000)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 3" in profile, profile
       # Restart Statestore and wait till the grace period ends + some buffer.
@@ -457,7 +457,7 @@ class TestRestart(CustomClusterTestSuite):
       try:
         client.wait_for_finished_timeout(handle, 100)
         assert False, "Query expected to fail"
-      except ImpalaBeeswaxException as e:
+      except IMPALA_CONNECTION_EXCEPTION as e:
         assert "Failed due to unreachable impalad" in str(e), str(e)
         assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \
                                      self.SUBSCRIBER_TIMEOUT_S, \
@@ -469,7 +469,7 @@ class TestRestart(CustomClusterTestSuite):
       catalogd_version = self.cluster.catalogd.service.get_catalog_version()
       impalad.service.wait_for_metric_value("catalog.curr-version", 
catalogd_version)
       handle = client.execute_async(slow_query)
-      self.client.wait_for_impala_state(handle, RUNNING, 1000)
+      client.wait_for_impala_state(handle, RUNNING, 1000)
       profile = client.get_runtime_profile(handle)
       assert "NumBackends: 2" in profile, profile
       start_time = time.time()
@@ -480,7 +480,7 @@ class TestRestart(CustomClusterTestSuite):
       try:
         client.wait_for_finished_timeout(handle, 100)
         assert False, "Query expected to fail"
-      except ImpalaBeeswaxException as e:
+      except IMPALA_CONNECTION_EXCEPTION as e:
         assert "Failed due to unreachable impalad" in str(e), str(e)
         assert time.time() - start_time > self.CANCELLATION_GRACE_PERIOD_S + \
                                      self.SUBSCRIBER_TIMEOUT_S, \
@@ -885,7 +885,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
     def expect_beeswax_shutdown_error(fn):
       try:
         fn()
-      except ImpalaBeeswaxException as e:
+      except IMPALA_CONNECTION_EXCEPTION as e:
         assert SHUTDOWN_ERROR_PREFIX in str(e)
     expect_beeswax_shutdown_error(lambda: self.client.execute("select 1"))
     expect_beeswax_shutdown_error(lambda: self.client.execute_async("select 
1"))
@@ -925,8 +925,7 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
 
     # Make sure that the beeswax query is still executing, then close it to 
allow the
     # coordinator to shut down.
-    self.impalad_test_service.wait_for_query_state(self.client, 
before_shutdown_handle,
-          self.client.QUERY_STATES['FINISHED'], timeout=20)
+    self.client.wait_for_impala_state(before_shutdown_handle, FINISHED, 20)
     self.client.close_query(before_shutdown_handle)
     self.cluster.impalads[0].wait_for_exit()
 
@@ -1009,15 +1008,13 @@ class TestGracefulShutdown(CustomClusterTestSuite, 
HS2TestSuite):
     'timeout' controls how long we will wait"""
     # Fix number of scanner threads to make runtime more deterministic.
     handle = self.execute_query_async(query, {'num_scanner_threads': 1})
-    self.impalad_test_service.wait_for_query_state(self.client, handle,
-                self.client.QUERY_STATES['RUNNING'], timeout=timeout)
+    self.client.wait_for_impala_state(handle, RUNNING, timeout)
     return handle
 
   def __fetch_and_get_num_backends(self, query, handle, delay_s=0, 
timeout_s=20):
     """Fetch the results of 'query' from the beeswax handle 'handle', close the
     query and return the number of backends obtained from the profile."""
-    self.impalad_test_service.wait_for_query_state(self.client, handle,
-                self.client.QUERY_STATES['FINISHED'], timeout=timeout_s)
+    self.client.wait_for_impala_state(handle, FINISHED, timeout_s)
     if delay_s > 0:
       LOG.info("sleeping for {0}s".format(delay_s))
       time.sleep(delay_s)
diff --git a/tests/custom_cluster/test_s3a_access.py 
b/tests/custom_cluster/test_s3a_access.py
index 75491a9d8..8364ef856 100644
--- a/tests/custom_cluster/test_s3a_access.py
+++ b/tests/custom_cluster/test_s3a_access.py
@@ -28,10 +28,12 @@ from tests.util.filesystem_utils import WAREHOUSE
 tmp = tempfile.NamedTemporaryFile(delete=False)
 BAD_KEY_FILE = tmp.name
 
+
 @SkipIf.not_s3
 class TestS3AAccess(CustomClusterTestSuite):
 
   cmd_filename = ""
+
   @classmethod
   def setup_class(cls):
     super(TestS3AAccess, cls).setup_class()
@@ -49,7 +51,7 @@ class TestS3AAccess(CustomClusterTestSuite):
 
   def _get_impala_client(self):
     impalad = self.cluster.get_any_impalad()
-    return impalad.service.create_beeswax_client()
+    return impalad.service.create_hs2_client()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
diff --git a/tests/custom_cluster/test_scratch_disk.py 
b/tests/custom_cluster/test_scratch_disk.py
index 05bf1e410..964a082fe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -91,13 +91,13 @@ class TestScratchDir(CustomClusterTestSuite):
         os.chmod(dir_path, stat.S_IREAD)
     return result
 
-  def setup_method(self, method):
+  def setup_method(self, method):  # noqa: U100
     # Don't call the superclass method to prevent starting Impala before each 
test. In
     # this file, each test is responsible for doing that because we want to 
generate
     # the parameter string to start-impala-cluster in each test method.
     pass
 
-  def teardown_method(self, method):
+  def teardown_method(self, method):  # noqa: U100
     self.clear_tmp_dirs()
     self.check_deleted_file_fd()
 
@@ -117,7 +117,7 @@ class TestScratchDir(CustomClusterTestSuite):
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
     assert self.count_nonempty_dirs(normal_dirs) == 1
 
@@ -130,7 +130,7 @@ class TestScratchDir(CustomClusterTestSuite):
     exec_option = vector.get_value('exec_option')
     exec_option['buffer_pool_limit'] = self.buffer_pool_limit
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     # Expect spill to disk to fail
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
@@ -159,7 +159,7 @@ class TestScratchDir(CustomClusterTestSuite):
     # disk.
     exec_option['spool_query_results'] = '0'
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     # Expect spill to disk to fail
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
@@ -185,7 +185,7 @@ class TestScratchDir(CustomClusterTestSuite):
     # disk.
     exec_option['spool_query_results'] = '0'
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     # Expect spill to disk to fail
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
@@ -215,7 +215,7 @@ class TestScratchDir(CustomClusterTestSuite):
 
     # Should still be able to spill to the third directory.
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
     # Restore second directory mod for cleanup later.
     for dirpath, dirnames, filenames in os.walk(dirs[1]):
@@ -236,7 +236,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -266,7 +266,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs))
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -335,7 +335,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -366,7 +366,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -400,7 +400,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -433,7 +433,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -472,7 +472,7 @@ class TestScratchDir(CustomClusterTestSuite):
     handle_name = 'handle'
     for i in range(num):
         impalad = self.cluster.impalads[i - 1]
-        locals()[client_name + str(i)] = 
impalad.service.create_beeswax_client()
+        locals()[client_name + str(i)] = impalad.service.create_hs2_client()
 
     for i in range(num):
         client = locals()[client_name + str(i)]
@@ -517,7 +517,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     handle = self.execute_query_async_using_client(client, self.spill_query, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -546,7 +546,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_async_using_client(client, self.spill_query_big_table, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
@@ -587,7 +587,7 @@ class TestScratchDir(CustomClusterTestSuite):
                                     expected_count=len(normal_dirs) - 1)
     vector.get_value('exec_option')['buffer_pool_limit'] = 
self.buffer_pool_limit
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     self.execute_query_async_using_client(client, self.spill_query_big_table, 
vector)
     verifier = MetricVerifier(impalad.service)
     verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
diff --git a/tests/custom_cluster/test_seq_file_filtering.py 
b/tests/custom_cluster/test_seq_file_filtering.py
index fa731b7c5..3eaa43159 100644
--- a/tests/custom_cluster/test_seq_file_filtering.py
+++ b/tests/custom_cluster/test_seq_file_filtering.py
@@ -21,6 +21,7 @@ import pytest
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.skip import SkipIfBuildType
 
+
 class TestImpala3798(CustomClusterTestSuite):
   """Regression test for IMPALA-3798, which is a hang that occurs when an Avro 
file is not
   filtered by a runtime filter, but its header split is (this only occurs when 
the filter
@@ -36,9 +37,9 @@ class TestImpala3798(CustomClusterTestSuite):
   @SkipIfBuildType.not_dev_build
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--skip_file_runtime_filtering=true")
-  def test_sequence_file_filtering_race(self, vector):
+  def test_sequence_file_filtering_race(self):
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.execute("SET RUNTIME_FILTER_MODE=GLOBAL")
     client.execute("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
 
diff --git a/tests/custom_cluster/test_session_expiration.py 
b/tests/custom_cluster/test_session_expiration.py
index e76f3457d..cf468891e 100644
--- a/tests/custom_cluster/test_session_expiration.py
+++ b/tests/custom_cluster/test_session_expiration.py
@@ -24,26 +24,31 @@ import socket
 import re
 from time import sleep
 
+from impala.dbapi import connect
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import DEFAULT_HS2_PORT
+from tests.util.thrift_util import op_handle_to_query_id
 
 
 class TestSessionExpiration(CustomClusterTestSuite):
   """Tests query expiration logic"""
+  PROFILE_PAGE = "http://localhost:{0}/query_profile?query_id={1}&json";
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_session_timeout=6 "
       "--idle_client_poll_period_s=0")
-  def test_session_expiration(self, vector):
+  def test_session_expiration(self):
     impalad = self.cluster.get_any_impalad()
     self.close_impala_clients()
     num_expired = 
impalad.service.get_metric_value("impala-server.num-sessions-expired")
     num_connections = impalad.service.get_metric_value(
         "impala.thrift-server.beeswax-frontend.connections-in-use")
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
+    client.execute('select 1')
     # Sleep for half the expiration time to confirm that the session is not 
expired early
     # (see IMPALA-838)
     sleep(3)
+    assert client is not None
     assert num_expired == impalad.service.get_metric_value(
         "impala-server.num-sessions-expired")
     # Wait for session expiration. Impala will poll the session expiry queue 
every second
@@ -51,25 +56,25 @@ class TestSessionExpiration(CustomClusterTestSuite):
         "impala-server.num-sessions-expired", num_expired + 1, 20)
     # Verify that the idle connection is not closed.
     assert 1 + num_connections == impalad.service.get_metric_value(
-        "impala.thrift-server.beeswax-frontend.connections-in-use")
+        "impala.thrift-server.hiveserver2-frontend.connections-in-use")
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_session_timeout=3 "
       "--idle_client_poll_period_s=0")
-  def test_session_expiration_with_set(self, vector):
+  def test_session_expiration_with_set(self):
     impalad = self.cluster.get_any_impalad()
     self.close_impala_clients()
     num_expired = 
impalad.service.get_metric_value("impala-server.num-sessions-expired")
 
     # Test if we can set a shorter timeout than the process-wide option
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.execute("SET IDLE_SESSION_TIMEOUT=1")
     sleep(2.5)
     assert num_expired + 1 == impalad.service.get_metric_value(
       "impala-server.num-sessions-expired")
 
     # Test if we can set a longer timeout than the process-wide option
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.execute("SET IDLE_SESSION_TIMEOUT=10")
     sleep(5)
     assert num_expired + 1 == impalad.service.get_metric_value(
@@ -78,13 +83,13 @@ class TestSessionExpiration(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--idle_session_timeout=5 "
        "--idle_client_poll_period_s=0")
-  def test_unsetting_session_expiration(self, vector):
+  def test_unsetting_session_expiration(self):
     impalad = self.cluster.get_any_impalad()
     self.close_impala_clients()
     num_expired = 
impalad.service.get_metric_value("impala-server.num-sessions-expired")
 
     # Test unsetting IDLE_SESSION_TIMEOUT
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     client.execute("SET IDLE_SESSION_TIMEOUT=1")
 
     # Unset to 5 sec
@@ -98,34 +103,47 @@ class TestSessionExpiration(CustomClusterTestSuite):
     assert num_expired + 1 == impalad.service.get_metric_value(
       "impala-server.num-sessions-expired")
 
+  def _get_fast_timeout_cursor_from_hs2_client(self, connection, 
idle_session_timeout=3):
+    """Get a fast timing out HiveServer2Cursor from a HiveServer2Connection."""
+    cursor = connection.cursor()
+    # Set disable the trivial query otherwise "select 1" would be admitted as a
+    # trivial query.
+    cursor.execute('set enable_trivial_query_for_admission=false')
+    cursor.execute('set idle_session_timeout={}'.format(idle_session_timeout))
+    return cursor
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--default_pool_max_requests=1 "
       "--idle_client_poll_period_s=0")
-  def test_session_expiration_with_queued_query(self, vector):
+  def test_session_expiration_with_queued_query(self):
     """Ensure that a query waiting in queue gets cancelled if the session 
expires."""
+    # It is currently not possible to run two successive execute_async within 
single
+    # session using ImpylaHS2Connection. Therefore, we obtain 2 
HiveServer2Cursor from
+    # HiveServer2Connection instead.
     impalad = self.cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
-    client.execute("SET IDLE_SESSION_TIMEOUT=3")
-    # Set disable the trivial query otherwise "select 1" would be admitted as a
-    # trivial query.
-    client.execute("set enable_trivial_query_for_admission=false")
-    client.execute_async("select sleep(10000)")
-    queued_handle = client.execute_async("select 1")
-    impalad.service.wait_for_metric_value(
-      "admission-controller.local-num-queued.default-pool", 1)
-    sleep(3)
-    impalad.service.wait_for_metric_value(
-      "admission-controller.local-num-queued.default-pool", 0)
-    impalad.service.wait_for_metric_value(
-      "admission-controller.agg-num-running.default-pool", 0)
-    queued_query_profile = 
impalad.service.create_beeswax_client().get_runtime_profile(
-      queued_handle)
-    assert "Admission result: Cancelled (queued)" in queued_query_profile
+    with connect(port=impalad.service.hs2_port) as conn:
+      timeout = 3
+      debug_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, 
timeout)
+      queued_cursor = self._get_fast_timeout_cursor_from_hs2_client(conn, 
timeout)
+      debug_cursor.execute_async("select sleep(10000)")
+      queued_cursor.execute_async("select 1")
+      impalad.service.wait_for_metric_value(
+          "admission-controller.local-num-queued.default-pool", 1)
+      sleep(timeout)
+      impalad.service.wait_for_metric_value(
+          "admission-controller.local-num-queued.default-pool", 0)
+      impalad.service.wait_for_metric_value(
+          "admission-controller.agg-num-running.default-pool", 0)
+      queued_query_id = 
op_handle_to_query_id(queued_cursor._last_operation.handle)
+      assert queued_query_id is not None
+      json_summary = self.get_debug_page(
+          self.PROFILE_PAGE.format(impalad.service.webserver_port, 
queued_query_id))
+      assert "Admission result: Cancelled (queued)" in json_summary['profile']
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="--idle_session_timeout=10 "
       "--idle_client_poll_period_s=1", cluster_size=1)
-  def test_closing_idle_connection(self, vector):
+  def test_closing_idle_connection(self):
     """ IMPALA-7802: verifies that connections of idle sessions are closed
     after the sessions have expired."""
     impalad = self.cluster.get_any_impalad()
@@ -139,10 +157,8 @@ class TestSessionExpiration(CustomClusterTestSuite):
 
       # Connect to Impala using either beeswax or HS2 client and verify the 
number of
       # opened connections.
-      if protocol == 'beeswax':
-        client = impalad.service.create_beeswax_client()
-      else:
-        client = impalad.service.create_hs2_client()
+      client = impalad.service.create_client(
+          protocol=('hs2' if protocol == 'hiveserver2' else protocol))
       client.execute("select 1")
       impalad.service.wait_for_metric_value(num_connections_metrics_name,
            num_connections + 1, 20)
diff --git a/tests/custom_cluster/test_statestored_ha.py 
b/tests/custom_cluster/test_statestored_ha.py
index 004ba6768..d4b9c499b 100644
--- a/tests/custom_cluster/test_statestored_ha.py
+++ b/tests/custom_cluster/test_statestored_ha.py
@@ -20,12 +20,12 @@ import logging
 import pytest
 import time
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
 from tests.common.impala_cluster import (
     DEFAULT_CATALOG_SERVICE_PORT, DEFAULT_STATESTORE_SERVICE_PORT)
-from tests.common.impala_connection import ERROR, RUNNING
+from tests.common.impala_connection import (
+    ERROR, IMPALA_CONNECTION_EXCEPTION, RUNNING)
 from tests.common.skip import SkipIfBuildType, SkipIfNotHdfsMinicluster
 from time import sleep
 
@@ -744,7 +744,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
     slow_query = \
         "select distinct * from tpch_parquet.lineitem where l_orderkey > 
sleep(1000)"
     impalad = self.cluster.impalads[0]
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_hs2_client()
     try:
       # Run a slow query
       handle = client.execute_async(slow_query)
@@ -769,7 +769,7 @@ class TestStatestoredHA(CustomClusterTestSuite):
       try:
         client.wait_for_finished_timeout(handle, 100)
         assert False, "Query expected to fail"
-      except ImpalaBeeswaxException as e:
+      except IMPALA_CONNECTION_EXCEPTION as e:
         assert "Failed due to unreachable impalad" in str(e), str(e)
 
       # Restart original active statestored. Verify that the statestored does 
not resume
diff --git a/tests/custom_cluster/test_web_pages.py 
b/tests/custom_cluster/test_web_pages.py
index adc8a74c4..681f1929a 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -24,10 +24,10 @@ import psutil
 import pytest
 import time
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import (
   DEFAULT_CLUSTER_SIZE,
   CustomClusterTestSuite)
+from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
 from tests.common.skip import SkipIfFS
 from tests.shell.util import run_impala_shell_cmd
 
@@ -212,7 +212,7 @@ class TestWebPage(CustomClusterTestSuite):
     statestored_args="--logtostderr=true --redirect_stdout_stderr=false",
     catalogd_args="--logtostderr=true --redirect_stdout_stderr=false"
   )
-  def test_webserver_hide_logs_link(self, vector):
+  def test_webserver_hide_logs_link(self):
     """Validate that there is no /logs link when we use --logtostderr=true """
     ports = ["25000", "25010", "25020"]
     for port in ports:
@@ -375,7 +375,7 @@ class TestWebPage(CustomClusterTestSuite):
       self.execute_query("refresh functional.alltypes", {
         "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@100"
       })
-    except ImpalaBeeswaxException as e:
+    except IMPALA_CONNECTION_EXCEPTION as e:
       assert "RPC recv timed out" in str(e)
     # In impalad side, the query fails by the above error. However, in 
catalogd side,
     # the RPCs are still running. Check the in-flight operations.
diff --git a/tests/metadata/test_compute_stats.py 
b/tests/metadata/test_compute_stats.py
index 93b815f2d..5c094184c 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -17,6 +17,7 @@
 
 from __future__ import absolute_import, division, print_function
 from builtins import range
+import os
 import pytest
 from hive_metastore.ttypes import (
     ColumnStatistics, ColumnStatisticsDesc, ColumnStatisticsData,
@@ -32,8 +33,6 @@ from tests.common.test_dimensions import (
     create_uncompressed_text_dimension)
 from CatalogObjects.ttypes import THdfsCompression
 
-import os
-
 
 IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
 
@@ -109,7 +108,7 @@ class TestComputeStats(ImpalaTestSuite):
     finally:
       self.cleanup_db("parquet", sync_ddl=0)
 
-  def test_compute_stats_compression_codec(self, vector, unique_database):
+  def test_compute_stats_compression_codec(self, unique_database):
     """IMPALA-8254: Tests that running compute stats with compression_codec set
     should not throw an error."""
     table = "{0}.codec_tbl".format(unique_database)
@@ -122,7 +121,7 @@ class TestComputeStats(ImpalaTestSuite):
         self.execute_query_expect_success(self.client, "drop stats 
{0}".format(table))
 
   @SkipIfFS.hive
-  def test_compute_stats_impala_2201(self, vector, unique_database):
+  def test_compute_stats_impala_2201(self, unique_database):
     """IMPALA-2201: Tests that the results of compute incremental stats are 
properly
     persisted when the data was loaded from Hive with 
hive.stats.autogather=true.
     """
@@ -193,11 +192,11 @@ class TestComputeStats(ImpalaTestSuite):
     # not zero, for all scans.
     for i in range(len(explain_result.data)):
       if ("SCAN HDFS" in explain_result.data[i]):
-         assert(hdfs_physical_properties_template in explain_result.data[i + 
1])
-         assert("cardinality=0" not in explain_result.data[i + 2])
+        assert hdfs_physical_properties_template in explain_result.data[i + 1]
+        assert "cardinality=0" not in explain_result.data[i + 2]
 
   @SkipIfFS.hive
-  def test_corrupted_stats_in_partitioned_hive_tables(self, vector, 
unique_database):
+  def test_corrupted_stats_in_partitioned_hive_tables(self, unique_database):
     """IMPALA-9744: Tests that the partition stats corruption in Hive tables
     (row count=0, partition size>0, persisted when the data was loaded with
     hive.stats.autogather=true) is handled at the table scan level.
@@ -240,7 +239,7 @@ class TestComputeStats(ImpalaTestSuite):
             table_name, 2, 2)
 
   @SkipIfFS.hive
-  def test_corrupted_stats_in_unpartitioned_hive_tables(self, vector, 
unique_database):
+  def test_corrupted_stats_in_unpartitioned_hive_tables(self, unique_database):
     """IMPALA-9744: Tests that the stats corruption in unpartitioned Hive
     tables (row count=0, partition size>0, persisted when the data was loaded
     with hive.stats.autogather=true) is handled at the table scan level.
@@ -280,13 +279,13 @@ class TestComputeStats(ImpalaTestSuite):
             table_name, 1, 1)
 
   @SkipIfCatalogV2.stats_pulling_disabled()
-  def test_pull_stats_profile(self, vector, unique_database):
+  def test_pull_stats_profile(self, unique_database):
     """Checks that the frontend profile includes metrics when computing
        incremental statistics.
     """
     try:
       impalad = ImpalaCluster.get_e2e_test_cluster().impalads[0]
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_hs2_client()
       create = "create table test like functional.alltypes"
       load = "insert into test partition(year, month) select * from 
functional.alltypes"
       insert = """insert into test partition(year=2009, month=1) values
@@ -329,6 +328,7 @@ class TestComputeStats(ImpalaTestSuite):
     finally:
       client.close()
 
+
 # Tests compute stats on HBase tables. This test is separate from 
TestComputeStats,
 # because we want to use the existing machanism to disable running tests on 
hbase/none
 # based on the filesystem type (S3, Isilon, etc.).
@@ -391,7 +391,7 @@ class TestIncompatibleColStats(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(
         create_uncompressed_text_dimension(cls.get_workload()))
 
-  def test_incompatible_col_stats(self, vector, unique_database):
+  def test_incompatible_col_stats(self, unique_database):
     """Tests Impala is able to use tables when the column stats data is not 
compatible
     with the column type. Regression test for IMPALA-588."""
 
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 068e41922..551312a29 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -580,7 +580,7 @@ class TestDdlStatements(TestDdlBase):
     else:
       num_attempts = 60
     for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_client_from_vector(vector)
       try:
         for attempt in itertools.count(1):
           assert attempt <= num_attempts, "ran out of attempts"
@@ -603,21 +603,20 @@ class TestDdlStatements(TestDdlBase):
     impala_cluster = ImpalaCluster.get_e2e_test_cluster()
     impalads = impala_cluster.impalads
     view_name = "%s.test_describe_view" % unique_database
-    query_opts = vector.get_value('exec_option')
-    first_client = impalads[0].service.create_beeswax_client()
+    first_client = impalads[0].service.create_client_from_vector(vector)
     try:
       # Create a view and verify it's visible.
       self.execute_query_expect_success(first_client,
                                         "create view {0} as "
                                         "select * from functional.alltypes"
-                                        .format(view_name), query_opts)
+                                        .format(view_name))
       self._verify_describe_view(vector, view_name, "select * from 
functional.alltypes")
 
       # Alter the view and verify the alter is visible.
       self.execute_query_expect_success(first_client,
                                         "alter view {0} as "
                                         "select * from functional.alltypesagg"
-                                        .format(view_name), query_opts)
+                                        .format(view_name))
       self._verify_describe_view(vector, view_name,
                                  "select * from functional.alltypesagg")
     finally:
diff --git a/tests/query_test/test_rows_availability.py 
b/tests/query_test/test_rows_availability.py
index 357929894..e68f95618 100644
--- a/tests/query_test/test_rows_availability.py
+++ b/tests/query_test/test_rows_availability.py
@@ -20,8 +20,10 @@ import pytest
 import re
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_vector import ImpalaTestDimension
+from tests.common.impala_connection import FINISHED
 from tests.util.parse_util import parse_duration_string_ms
 
+
 class TestRowsAvailability(ImpalaTestSuite):
   """Tests that the 'Rows available' timeline event is marked only after rows 
are
   truly available. We mark the 'Rows available' event once we advance the query
@@ -59,7 +61,7 @@ class TestRowsAvailability(ImpalaTestSuite):
     return vector.get_value('table_format').file_format == 'text' and\
         vector.get_value('table_format').compression_codec == 'none' and\
         vector.get_value('exec_option')['batch_size'] == 0 and\
-        vector.get_value('exec_option')['disable_codegen'] == False and\
+        vector.get_value('exec_option')['disable_codegen'] is False and\
         vector.get_value('exec_option')['num_nodes'] == 0
 
   @pytest.mark.execute_serially
@@ -70,8 +72,7 @@ class TestRowsAvailability(ImpalaTestSuite):
     query = vector.get_value('query')
     # Execute async to get a handle. Wait until the query has completed.
     handle = self.execute_query_async(query, vector.get_value('exec_option'))
-    self.impalad_test_service.wait_for_query_state(self.client, handle,
-        self.client.QUERY_STATES['FINISHED'], timeout=20)
+    self.client.wait_for_impala_state(handle, FINISHED, 20)
 
     profile = self.client.get_runtime_profile(handle)
     start_time_ms = None
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 31bcb5873..e3e3ea548 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -49,7 +49,7 @@ class TestUdfBase(ImpalaTestSuite):
   def _run_query_all_impalads(self, exec_options, query, expected):
     impala_cluster = ImpalaCluster.get_e2e_test_cluster()
     for impalad in impala_cluster.impalads:
-      client = impalad.service.create_beeswax_client()
+      client = impalad.service.create_hs2_client()
       result = self.execute_query_expect_success(client, query, exec_options)
       assert result.data == expected, impalad
 
@@ -508,19 +508,18 @@ class TestUdfTargeted(TestUdfBase):
 
     cluster = ImpalaCluster.get_e2e_test_cluster()
     impalad = cluster.get_any_impalad()
-    client = impalad.service.create_beeswax_client()
+    client = impalad.service.create_client_from_vector(vector)
     # Create and drop functions with sync_ddl to make sure they are reflected
     # in every impalad.
-    exec_option = copy(vector.get_value('exec_option'))
-    exec_option['sync_ddl'] = 1
+    client.set_configuration_option('sync_ddl', 1)
 
-    self.execute_query_expect_success(client, drop_fn_stmt, exec_option)
-    self.execute_query_expect_success(client, create_fn_stmt, exec_option)
+    self.execute_query_expect_success(client, drop_fn_stmt)
+    self.execute_query_expect_success(client, create_fn_stmt)
     # Delete the udf jar
     check_call(["hadoop", "fs", "-rm", jar_path])
 
     different_impalad = cluster.get_different_impalad(impalad)
-    client = different_impalad.service.create_beeswax_client()
+    client = different_impalad.service.create_client_from_vector(vector)
     # Run a query using the udf from an impalad other than the one
     # we used to create the function. This is to bypass loading from
     # the cache
diff --git a/tests/stress/query_retries_stress_runner.py 
b/tests/stress/query_retries_stress_runner.py
index 840bf2e92..cd89e0cc1 100755
--- a/tests/stress/query_retries_stress_runner.py
+++ b/tests/stress/query_retries_stress_runner.py
@@ -125,7 +125,7 @@ def run_concurrent_workloads(concurrency, coordinator, 
database, queries):
     client = None
     try:
       # Create and setup the client.
-      client = coordinator.service.create_beeswax_client()
+      client = coordinator.service.create_hs2_client()
       LOG.info("Running workload: database={0} and coordinator=localhost:{1}, 
pid={2}"
           .format(database, coordinator.get_webserver_port(), 
coordinator.get_pid()))
       client.execute("use {0}".format(database))
@@ -347,9 +347,9 @@ When specifying a non-default scale, the job will look for 
a database of the for
     sys.exit(1)
 
   # Set the correct database.
-  if table_format is 'parquet':
+  if table_format == 'parquet':
     database = workload + scale + '_parquet'
-  elif workload is 'text':
+  elif workload == 'text':
     database = workload + scale
   else:
     parser.print_usage()
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index 4f98299ca..dbcba7627 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -120,7 +120,7 @@ def __run_cancel_query_and_validate_state(client, query, 
exec_option,
   thread.start()
 
   sleep(cancel_delay)
-  if client.get_state(handle) == client.QUERY_STATES['EXCEPTION']:
+  if client.is_error(handle):
       # If some error occurred before trying to cancel the query then we put 
an error
       # message together and fail the test.
       thread.join()

Reply via email to