This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9cb9bae84e8888203e4bdfd3e20ee3e61c6059fe
Author: Riza Suminto <[email protected]>
AuthorDate: Thu Feb 13 11:19:31 2025 -0800

    IMPALA-13758: Use context manager in ImpalaTestSuite.change_database
    
    ImpalaTestSuite.change_database is responsible to point impala client to
    database under test. However, it left client pointing to that database
    after the test without reverting them back to default database. This
    patch does the reversal by changing ImpalaTestSuite.change_database to
    use context manager.
    
    This patch change the behavior of execute_query_using_client() and
    execute_query_async_using_client(). They used to change database
    according to the given vector parameter, but not anymore after this
    patch. In practice, this behavior change does not affect many tests
    because most queries going through these functions already use fully
    qualified table name. Going forward, querying through function other
    than run_test_case() should try to use fully qualified table name as
    much as possible.
    
    Retain behavior of ImpalaTestSuite._get_table_location() since there are
    considerable number of tests relies on it (changing database when
    called).
    
    Removed unused test fixtures and fixed several flake8 issues in modified
    test files.
    
    Testing:
    - Moved nested-types-subplan-single-node.test. This allows the test
      framework to point to the right tpch_nested* database.
    - Pass exhaustive test except IMPALA-13752 and IMPALA-13761. They will
      be fixed in separate patch.
    
    Change-Id: I75bec7403cc302728a630efe3f95e852a84594e2
    Reviewed-on: http://gerrit.cloudera.org:8080/22487
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../nested-types-subplan-single-node.test          |  3 -
 tests/common/impala_test_suite.py                  | 90 ++++++++++++++++------
 tests/common/test_vector.py                        |  1 +
 tests/conftest.py                                  |  3 -
 tests/custom_cluster/test_admission_controller.py  | 20 ++---
 .../test_blacklisted_dbs_and_tables.py             | 10 +--
 tests/custom_cluster/test_data_cache.py            | 43 ++++++-----
 tests/failure/test_failpoints.py                   | 15 ++--
 tests/query_test/test_aggregation.py               | 14 ++--
 tests/query_test/test_cancellation.py              | 42 +++++-----
 tests/query_test/test_mem_usage_scaling.py         | 14 ++--
 tests/query_test/test_mt_dop.py                    | 13 ++--
 tests/query_test/test_nested_types.py              |  5 +-
 tests/query_test/test_runtime_filters.py           | 20 +++--
 tests/query_test/test_scanners.py                  | 29 +++----
 tests/query_test/test_tablesample.py               | 15 ++--
 tests/util/cancel_util.py                          | 14 +++-
 17 files changed, 207 insertions(+), 144 deletions(-)

diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test
 
b/testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test
similarity index 97%
rename from 
testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test
rename to 
testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test
index ece9c7007..3a9ab06ad 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test
+++ 
b/testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test
@@ -1,8 +1,5 @@
 ====
 ---- QUERY
-use tpch_nested_parquet
-====
----- QUERY
 # IMPALA-2289: Test proper handling of AtCapacity() inside the subplan node.
 # num_nodes is set to 1 in the python test to make it very likely to hit the 
once buggy
 # code path because a single scan node instance must process all input files.
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 7c5495f4f..0cc5d4ea5 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import, division, print_function
 from builtins import range, round
+import contextlib
 import glob
 import grp
 import hashlib
@@ -63,7 +64,7 @@ from tests.common.test_result_verifier import (
     verify_raw_results,
     verify_runtime_profile)
 from tests.common.test_vector import (
-  EXEC_OPTION, PROTOCOL, TABLE_FORMAT,
+  EXEC_OPTION, PROTOCOL, TABLE_FORMAT, VECTOR,
   BEESWAX, HS2, HS2_HTTP,
   ImpalaTestDimension)
 from tests.performance.query import Query
@@ -752,8 +753,9 @@ class ImpalaTestSuite(BaseTestSuite):
     # Change the database to reflect the file_format, compression codec etc, 
or the
     # user specified database for all targeted impalad.
     for impalad_client in target_impalad_clients:
-      ImpalaTestSuite.change_database(impalad_client,
-          table_format_info, use_db, pytest.config.option.scale_factor)
+      ImpalaTestSuite.__change_client_database(
+        impalad_client, table_format=table_format_info, db_name=use_db,
+        scale_factor=pytest.config.option.scale_factor)
       impalad_client.set_configuration(exec_options)
 
     def __exec_in_impala(query, user=None):
@@ -984,6 +986,10 @@ class ImpalaTestSuite(BaseTestSuite):
         os.makedirs(output_dir)
       write_test_file(output_file, sections, encoding=encoding)
 
+    # Revert target_impalad_clients back to default database.
+    for impalad_client in target_impalad_clients:
+      ImpalaTestSuite.__change_client_database(impalad_client, 
db_name='default')
+
   def get_query_lineage(self, query_id, lineage_dir):
     """Walks through the lineage files in lineage_dir to look for a given 
query_id.
     This is an expensive operation is lineage_dir is large, so use 
carefully."""
@@ -1010,18 +1016,50 @@ class ImpalaTestSuite(BaseTestSuite):
   def get_db_name_from_format(table_format, scale_factor=''):
     return QueryTestSectionReader.get_db_name(table_format, scale_factor)
 
-  @classmethod
-  def change_database(cls, impala_client, table_format=None,
-      db_name=None, scale_factor=None):
+  @staticmethod
+  def __change_client_database(impala_client, table_format=None, db_name=None,
+                               scale_factor=None):
+    """Change 'impala_client' to point to either 'db_name' or workload-specific
+    database described by 'table_format' and 'scale_vector'.
+    Restore client configuration back to its default. This is intended for 
internal use
+    within ImpalaTestSuite. For changing database in ImpalaTestSuite 
subclasses, please
+    refer to ImpalaTestSuite.change_database(), which provide a more 
consistent way to
+    temporarily change database and reverting back to default database.
+    """
     if db_name is None:
       assert table_format is not None
-      db_name = QueryTestSectionReader.get_db_name(table_format,
-          scale_factor if scale_factor else '')
-    query = 'use %s' % db_name
+      db_name = ImpalaTestSuite.get_db_name_from_format(
+        table_format, scale_factor if scale_factor else '')
     # Clear the exec_options before executing a USE statement.
     # The USE statement should not fail for negative exec_option tests.
     impala_client.clear_configuration()
-    impala_client.execute(query)
+    impala_client.execute('use ' + db_name)
+
+  @classmethod
+  @contextlib.contextmanager
+  def change_database(cls, impala_client, table_format=None, db_name=None,
+                      scale_factor=None):
+    """Change impala_client to point to approriate database under test and 
revert
+    it to default database once it exit the with scope.
+    Restore client configuration back to its default when getting in and 
getting out of
+    'with' scope. Test method should try to use fully qualified table name in 
the test
+    query as much as possible and only use this context manager function when 
it is
+    not practical to do so.
+    Sample usage:
+
+      with ImpalaTestSuite.change_database(client, 
db_name='functional_parquet'):
+        # client clear configuration and pointing to 'functional_parquet' 
database here.
+        client.execute('show tables')
+      # client clear configuration and pointing to 'default' database after it 
exit the
+      # 'with' scope.
+      client.execute('show tables')
+    """
+    try:
+      cls.__change_client_database(impala_client, table_format=table_format,
+                                   db_name=db_name, scale_factor=scale_factor)
+      yield
+    finally:
+      cls.__change_client_database(impala_client, db_name='default')
 
   def execute_wrapper(function):
     """
@@ -1031,6 +1069,8 @@ class ImpalaTestSuite(BaseTestSuite):
     remaining the same. A use database is issued before query execution. As 
such,
     database names need to be build pre execution, this method wraps around 
the different
     execute methods and provides a common interface to issue the proper use 
command.
+    IMPALA-13766: Remove this function decorator and let test method to change 
database
+    by themself.
     """
     @wraps(function)
     def wrapper(*args, **kwargs):
@@ -1038,13 +1078,16 @@ class ImpalaTestSuite(BaseTestSuite):
       if kwargs.get(TABLE_FORMAT):
         table_format = kwargs.get(TABLE_FORMAT)
         del kwargs[TABLE_FORMAT]
-      if kwargs.get('vector'):
-        table_format = kwargs.get('vector').get_table_format()
-        del kwargs['vector']
+      if kwargs.get(VECTOR):
+        ImpalaTestSuite.validate_exec_option_dimension(kwargs.get(VECTOR))
+        table_format = kwargs.get(VECTOR).get_table_format()
+        del kwargs[VECTOR]
         # self is the implicit first argument
       if table_format is not None:
-        args[0].change_database(args[0].client, table_format)
-      return function(*args, **kwargs)
+        with ImpalaTestSuite.change_database(args[0].client, table_format):
+          return function(*args, **kwargs)
+      else:
+        return function(*args, **kwargs)
     return wrapper
 
   @classmethod
@@ -1101,16 +1144,16 @@ class ImpalaTestSuite(BaseTestSuite):
 
   def execute_query_using_client(self, client, query, vector):
     self.validate_exec_option_dimension(vector)
-    self.change_database(client, vector.get_table_format())
     query_options = vector.get_value(EXEC_OPTION)
-    if query_options is not None: client.set_configuration(query_options)
+    if query_options is not None:
+      client.set_configuration(query_options)
     return client.execute(query)
 
   def execute_query_async_using_client(self, client, query, vector):
     self.validate_exec_option_dimension(vector)
-    self.change_database(client, vector.get_table_format())
     query_options = vector.get_value(EXEC_OPTION)
-    if query_options is not None: client.set_configuration(query_options)
+    if query_options is not None:
+      client.set_configuration(query_options)
     return client.execute_async(query)
 
   def close_query_using_client(self, client, query):
@@ -1248,7 +1291,10 @@ class ImpalaTestSuite(BaseTestSuite):
     assert abs(a - b) / float(max(abs(a), abs(b))) <= diff_perc
 
   def _get_table_location(self, table_name, vector):
-    """ Returns the HDFS location of the table """
+    """ Returns the HDFS location of the table.
+    This method changes self.client to point to the dabatase described by 
'vector'."""
+    db_name = self.get_db_name_from_format(vector.get_table_format())
+    self.__change_client_database(self.client, db_name=db_name)
     result = self.execute_query_using_client(self.client,
         "describe formatted %s" % table_name, vector)
     for row in result.data:
@@ -1626,7 +1672,8 @@ class ImpalaTestSuite(BaseTestSuite):
             str(e))
         time.sleep(1)
 
-  def validate_exec_option_dimension(self, vector):
+  @staticmethod
+  def validate_exec_option_dimension(vector):
     """Validate that test dimension with name matching query option name is
     also registered in 'exec_option' dimension."""
     option_dim_names = []
@@ -1641,7 +1688,6 @@ class ImpalaTestSuite(BaseTestSuite):
       return
 
     for name in option_dim_names:
-      # TODO: enforce these warnings by changing them into pytest.fail()
       if name not in exec_option:
         pytest.fail("Exec option {} declared as independent dimension but not 
inserted "
             "into {} dimension. Consider using helper function "
diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py
index 900f3a072..12f87bcb6 100644
--- a/tests/common/test_vector.py
+++ b/tests/common/test_vector.py
@@ -63,6 +63,7 @@ import logging
 
 
 LOG = logging.getLogger(__name__)
+VECTOR = 'vector'
 # Literal constants to refer to some standard dimension names.
 EXEC_OPTION = 'exec_option'
 PROTOCOL = 'protocol'
diff --git a/tests/conftest.py b/tests/conftest.py
index aa179c365..626d68b2d 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -402,9 +402,6 @@ def unique_database(request, testid_checksum):
     request.instance.filesystem_client.delete_file_dir(db_location, 
recursive=True)
 
   def cleanup():
-    # Make sure we don't try to drop the current session database
-    # TODO: clean this up via IMPALA-13758.
-    request.instance.execute_query_expect_success(request.instance.client, 
"use default")
     with request.cls.create_impala_client(protocol=HS2) as client:
       client.set_configuration({'sync_ddl': sync_ddl})
       for db_name in db_names:
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 7c6c9aed4..5223a421d 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -41,7 +41,6 @@ from tests.common.custom_cluster_test_suite import (
     START_ARGS,
     CustomClusterTestSuite)
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
-from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.resource_pool_config import ResourcePoolConfig
 from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_dimensions import (
@@ -74,8 +73,8 @@ SLOW_QUERY = "select count(*) from functional.alltypes where 
int_col = sleep(200
 # The unpartitioned fragments are both interior fragments that consume input
 # from a scan fragment and non-interior fragments with a constant UNION.
 QUERY_WITH_UNPARTITIONED_FRAGMENTS = """
-    select *, (select count(distinct int_col) from alltypestiny) subquery1,
-           (select count(distinct int_col) from alltypes) subquery2,
+    select *, (select count(distinct int_col) from functional.alltypestiny) 
subquery1,
+           (select count(distinct int_col) from functional.alltypes) subquery2,
            (select 1234) subquery3
     from (""" + QUERY + """) v"""
 
@@ -579,7 +578,6 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   def test_sanity_checks_dedicated_coordinator(self, vector, unique_database):
     """Sanity tests for verifying targeted dedicated coordinator memory 
estimations and
     behavior."""
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
     self.client.set_configuration_option('request_pool', "root.regularPool")
     exec_options = vector.get_value('exec_option')
     # Make sure query option MAX_MEM_ESTIMATE_FOR_ADMISSION is enforced on the 
dedicated
@@ -625,8 +623,8 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     the actual vs expected values for mem admitted and mem limit for both 
coord and
     executor. Also verifies that those memory values are different if
     'using_dedicated_coord_estimates' is true."""
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
-    self.client.set_configuration_option('request_pool', "root.regularPool")
+    vector.set_exec_option('request_pool', 'root.regularPool')
+    self.client.set_configuration(vector.get_exec_option_dict())
     # Use a test query that has unpartitioned non-coordinator fragments to make
     # sure those are handled correctly (IMPALA-10036).
     for query in [QUERY, QUERY_WITH_UNPARTITIONED_FRAGMENTS]:
@@ -707,10 +705,9 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=2)
-  def test_mem_limit_executors(self, vector):
+  def test_mem_limit_executors(self):
     """Verify that the query option mem_limit_executors is only enforced on the
     executors."""
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
     expected_exec_mem_limit = "999999999"
     self.client.set_configuration({"MEM_LIMIT_EXECUTORS": 
expected_exec_mem_limit})
     handle = self.client.execute_async(QUERY.format(1))
@@ -726,10 +723,9 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=2,
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 
* 1024))
-  def test_mem_limit_coordinators(self, vector):
+  def test_mem_limit_coordinators(self):
     """Verify that the query option mem_limit_coordinators is only enforced on 
the
     coordinators."""
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
     expected_exec_mem_limit = "999999999"
     expected_coord_mem_limit = "111111111"
     self.client.set_configuration({"MEM_LIMIT_EXECUTORS": 
expected_exec_mem_limit,
@@ -747,10 +743,9 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=2,
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 
* 1024))
-  def test_mem_limits(self, vector):
+  def test_mem_limits(self):
     """Verify that the query option mem_limit_coordinators and 
mem_limit_executors are
     ignored when mem_limit is set."""
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
     exec_mem_limit = "999999999"
     coord_mem_limit = "111111111"
     mem_limit = "888888888"
@@ -2475,7 +2470,6 @@ class 
TestAdmissionControllerStress(TestAdmissionControllerBase):
           query = QUERY.format(self.query_num)
           self.query_state = 'SUBMITTING'
           client = self.impalad.service.create_beeswax_client()
-          ImpalaTestSuite.change_database(client, 
self.vector.get_value('table_format'))
           client.set_configuration(exec_options)
 
           LOG.info("Submitting query %s with ending behavior %s",
diff --git a/tests/custom_cluster/test_blacklisted_dbs_and_tables.py 
b/tests/custom_cluster/test_blacklisted_dbs_and_tables.py
index 8ce0e5969..f5ea32da6 100644
--- a/tests/custom_cluster/test_blacklisted_dbs_and_tables.py
+++ b/tests/custom_cluster/test_blacklisted_dbs_and_tables.py
@@ -95,7 +95,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite):
                  
"--blacklisted_tables=functional.alltypes,functional_parquet.alltypes",
     catalogd_args="--blacklisted_dbs=functional_rc,functional_seq "
                   
"--blacklisted_tables=functional.alltypes,functional_parquet.alltypes")
-  def test_blacklisted_dbs_and_tables(self, vector):
+  def test_blacklisted_dbs_and_tables(self):
     self.__check_db_not_visible("functional_rc")
     self.__check_db_not_visible("functional_seq")
     self.__check_table_not_visible("functional", "alltypes")
@@ -128,7 +128,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="--blacklisted_tables=alltypes_def,functional.alltypes",
     catalogd_args="--blacklisted_tables=alltypes_def,functional.alltypes")
-  def test_resolving_default_database(self, vector):
+  def test_resolving_default_database(self):
     # Check that "alltypes_def" is resolved as "default.alltypes_def"
     table = self.hive_client.get_table("functional", "alltypes")
     table.dbName = "default"
@@ -138,8 +138,8 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite):
     self.hive_client.drop_table("default", "alltypes_def", True)
 
     # Check non fully qualified table names are recognized correctly
-    self.change_database(self.client, db_name="functional")
-    self.__check_create_drop_table(use_fully_qualified_table_name=False)
+    with self.change_database(self.client, db_name="functional"):
+      self.__check_create_drop_table(use_fully_qualified_table_name=False)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -147,7 +147,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite):
                   "--blacklisted_tables=functional.alltypes",
     impalad_args="--blacklisted_dbs=functional_seq "
                   "--blacklisted_tables=functional.alltypestiny")
-  def test_inconsistent_blacklist(self, vector):
+  def test_inconsistent_blacklist(self):
     """Test the error handling when blacklists are accidentally set 
differently between
     coordinators and the catalogd"""
     self.__expect_error_in_query(
diff --git a/tests/custom_cluster/test_data_cache.py 
b/tests/custom_cluster/test_data_cache.py
index 734d4c049..a020bfc68 100644
--- a/tests/custom_cluster/test_data_cache.py
+++ b/tests/custom_cluster/test_data_cache.py
@@ -82,7 +82,8 @@ class TestDataCache(CustomClusterTestSuite):
     # Expect all cache hits results in no opened files.
     opened_file_handles_metric = 
'impala-server.io.mgr.cached-file-handles-miss-count'
     baseline = self.get_metric(opened_file_handles_metric)
-    self.execute_query("select count(distinct l_orderkey) from test_parquet")
+    self.execute_query("select count(distinct l_orderkey) from 
{0}.test_parquet".format(
+      unique_database))
     assert self.get_metric(opened_file_handles_metric) == baseline
 
   @pytest.mark.execute_serially
@@ -106,7 +107,7 @@ class TestDataCache(CustomClusterTestSuite):
   def test_data_cache_deterministic_no_file_handle_cache(self, vector, 
unique_database):
     self.__test_data_cache_deterministic(vector, unique_database)
 
-  def __test_data_cache(self, vector):
+  def __test_data_cache(self):
     """ This test scans the same table twice and verifies the cache hit count 
metrics
     are correct. The exact number of bytes hit is non-deterministic between 
runs due
     to different mtime of files and multiple shards in the cache.
@@ -132,16 +133,16 @@ class TestDataCache(CustomClusterTestSuite):
       impalad_args=get_impalad_args("LRU", high_write_concurrency=False,
                                     force_single_shard=False),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_lru(self, vector):
-    self.__test_data_cache(vector)
+  def test_data_cache_lru(self):
+    self.__test_data_cache()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LIRS", high_write_concurrency=False,
                                     force_single_shard=False),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_lirs(self, vector):
-    self.__test_data_cache(vector)
+  def test_data_cache_lirs(self):
+    self.__test_data_cache()
 
   def __test_data_cache_disablement(self, vector):
     # Verifies that the cache metrics are all zero.
@@ -185,7 +186,7 @@ class TestDataCache(CustomClusterTestSuite):
       impalad_args=get_impalad_args("LIRS", high_write_concurrency=False),
       start_args="--data_cache_dir=/tmp --data_cache_size=9MB",
       cluster_size=1)
-  def test_data_cache_lirs_instant_evictions(self, vector):
+  def test_data_cache_lirs_instant_evictions(self):
     # The setup for this test is intricate. For Allocate() to succeed, the 
request
     # needs to be smaller than the protected size (95% of the cache). For 
Insert() to
     # fail, the request needs to be larger than the unprotected size (5% of 
the cache).
@@ -215,7 +216,7 @@ class TestDataCache(CustomClusterTestSuite):
     assert self.get_data_cache_metric('num-writes') >= 0
     assert self.get_data_cache_metric('total-bytes') >= 0
 
-  def __test_data_cache_keep_across_restarts(self, vector, 
test_reduce_size=False):
+  def __test_data_cache_keep_across_restarts(self, test_reduce_size=False):
     QUERY = "select * from tpch_parquet.lineitem"
     # Execute a query, record the total bytes and the number of entries of 
cache before
     # cache dump.
@@ -260,31 +261,31 @@ class TestDataCache(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_keep_across_restarts_lru(self, vector):
-    self.__test_data_cache_keep_across_restarts(vector)
+  def test_data_cache_keep_across_restarts_lru(self):
+    self.__test_data_cache_keep_across_restarts()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_keep_across_restarts_lirs(self, vector):
-    self.__test_data_cache_keep_across_restarts(vector)
+  def test_data_cache_keep_across_restarts_lirs(self):
+    self.__test_data_cache_keep_across_restarts()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_reduce_size_restarts_lru(self, vector):
-    self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True)
+  def test_data_cache_reduce_size_restarts_lru(self):
+    self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1)
-  def test_data_cache_reduce_size_restarts_lirs(self, vector):
-    self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True)
+  def test_data_cache_reduce_size_restarts_lirs(self):
+    self.__test_data_cache_keep_across_restarts(test_reduce_size=True)
 
-  def __test_data_cache_readonly(self, vector):
+  def __test_data_cache_readonly(self):
     QUERY = "select * from tpch_parquet.lineitem"
     # Execute the query asynchronously, wait a short while, and do gracefully 
shutdown
     # immediately to test the race between cache writes and setting cache 
read-only.
@@ -310,12 +311,12 @@ class TestDataCache(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LRU", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
-  def test_data_cache_readonly_lru(self, vector):
-    self.__test_data_cache_readonly(vector)
+  def test_data_cache_readonly_lru(self):
+    self.__test_data_cache_readonly()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=get_impalad_args("LIRS", keep_across_restarts=True),
       start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True)
-  def test_data_cache_readonly_lirs(self, vector):
-    self.__test_data_cache_readonly(vector)
+  def test_data_cache_readonly_lirs(self):
+    self.__test_data_cache_readonly()
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 38f0919e5..d5bb1bf9e 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -97,6 +97,10 @@ class TestFailpoints(ImpalaTestSuite):
   # killer on machines with 30GB RAM. This makes the test run in 4 minutes 
instead of 1-2.
   @pytest.mark.execute_serially
   def test_failpoints(self, vector):
+    with self.change_database(self.client, vector.get_table_format()):
+      self.__run_failpoints(vector)
+
+  def __run_failpoints(self, vector):
     query = vector.get_value('query')
     action = vector.get_value('action')
     location = vector.get_value('location')
@@ -131,9 +135,8 @@ class TestFailpoints(ImpalaTestSuite):
   def __parse_plan_nodes_from_explain(self, query, vector):
     """Parses the EXPLAIN <query> output and returns a list of node ids.
     Expects format of <ID>:<NAME>"""
-    explain_result =\
-        self.execute_query("explain " + query, vector.get_value('exec_option'),
-                           table_format=vector.get_value('table_format'))
+    explain_result = self.execute_query("explain " + query,
+                                        vector.get_value('exec_option'))
     node_ids = []
     for row in explain_result.data:
       match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row)
@@ -190,8 +193,7 @@ class TestFailpoints(ImpalaTestSuite):
 
   def __execute_fail_action(self, query, vector):
     try:
-      self.execute_query(query, vector.get_value('exec_option'),
-                         table_format=vector.get_value('table_format'))
+      self.execute_query(query, vector.get_value('exec_option'))
       assert 'Expected Failure'
     except ImpalaBeeswaxException as e:
       LOG.debug(e)
@@ -200,8 +202,7 @@ class TestFailpoints(ImpalaTestSuite):
 
   def __execute_cancel_action(self, query, vector):
     LOG.info('Starting async query execution')
-    handle = self.execute_query_async(query, vector.get_value('exec_option'),
-                                      
table_format=vector.get_value('table_format'))
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
     LOG.info('Sleeping')
     sleep(3)
     cancel_result = self.client.cancel(handle)
diff --git a/tests/query_test/test_aggregation.py 
b/tests/query_test/test_aggregation.py
index 2e98f4737..bb5711a53 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -147,11 +147,11 @@ class TestAggregation(ImpalaTestSuite):
     exec_option = vector.get_value('exec_option')
     disable_codegen = exec_option['disable_codegen']
     data_type, agg_func = (vector.get_value('data_type'), 
vector.get_value('agg_func'))
+    db_name = self.get_db_name_from_format(vector.get_table_format())
 
-    query = 'select %s(%s_col) from alltypesagg where day is not null' % 
(agg_func,
-        data_type)
-    result = self.execute_query(query, exec_option,
-       table_format=vector.get_value('table_format'))
+    query = 'select {0}({1}_col) from {2}.alltypesagg where day is not 
null'.format(
+      agg_func, data_type, db_name)
+    result = self.execute_query(query, exec_option)
     assert len(result.data) == 1
     self.verify_agg_result(agg_func, data_type, False, result.data[0])
 
@@ -160,9 +160,9 @@ class TestAggregation(ImpalaTestSuite):
       # It is deliberately disabled for the merge aggregation.
       assert_codegen_enabled(result.runtime_profile, [1])
 
-    query = 'select %s(DISTINCT(%s_col)) from alltypesagg where day is not 
null' % (
-        agg_func, data_type)
-    result = self.execute_query(query, vector.get_value('exec_option'))
+    query = ('select {0}(DISTINCT({1}_col)) from {2}.alltypesagg '
+             'where day is not null').format(agg_func, data_type, db_name)
+    result = self.execute_query(query, exec_option)
     assert len(result.data) == 1
     self.verify_agg_result(agg_func, data_type, True, result.data[0])
 
diff --git a/tests/query_test/test_cancellation.py 
b/tests/query_test/test_cancellation.py
index 342a39f03..49efa053f 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -21,10 +21,8 @@
 from __future__ import absolute_import, division, print_function
 from builtins import range
 import pytest
-import threading
 from time import sleep
 from RuntimeProfile.ttypes import TRuntimeProfileFormat
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.test_dimensions import add_mandatory_exec_option
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -57,9 +55,6 @@ QUERY_TYPE = ["SELECT", "CTAS"]
 # are prone to occur more often when the time between RPCs is small.
 CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
 
-# Number of times to execute/cancel each query under test
-NUM_CANCELATION_ITERATIONS = 1
-
 # Test cancellation on both running and hung queries. Node ID 0 is the scan 
node
 WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
 
@@ -77,7 +72,7 @@ JOIN_BEFORE_CLOSE = [False, True]
 # Extra dimensions to test order by without limit
 SORT_QUERY = 'select * from lineitem order by l_orderkey'
 SORT_CANCEL_DELAY = list(range(6, 10))
-SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts.
+SORT_BUFFER_POOL_LIMIT = ['0', '300m']  # Test spilling and non-spilling sorts.
 
 # Test with and without multithreading
 MT_DOP_VALUES = [0, 4]
@@ -87,6 +82,7 @@ MT_DOP_VALUES = [0, 4]
 # True: Execute a KILL QUERY statement.
 USE_KILL_QUERY_STATEMENT = [False, True]
 
+
 class TestCancellation(ImpalaTestSuite):
   @classmethod
   def get_workload(self):
@@ -115,17 +111,21 @@ class TestCancellation(ImpalaTestSuite):
         ImpalaTestDimension('mt_dop', *MT_DOP_VALUES))
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('use_kill_query_statement', 
*USE_KILL_QUERY_STATEMENT))
+    # Number of times to execute/cancel each query under test
+    num_iterations = 1 if cls.exploration_strategy() == 'core' else 3
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('num_cancellation_iterations', num_iterations))
 
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('query_type') != 'CTAS' or (\
+        lambda v: v.get_value('query_type') != 'CTAS' or (
             v.get_value('table_format').file_format in ['text', 'parquet', 
'kudu', 'json']
             and v.get_value('table_format').compression_codec == 'none'))
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('exec_option')['batch_size'] == 0)
     # Ignore 'compute stats' queries for the CTAS query type.
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: not (v.get_value('query_type') == 'CTAS' and
-            v.get_value('query').startswith('compute stats')))
+        lambda v: not (v.get_value('query_type') == 'CTAS'
+            and v.get_value('query').startswith('compute stats')))
     # 'use_kill_query_statement' and 'join_before_close' cannot be both True, 
since
     # the KILL QUERY statement will also close the query.
     cls.ImpalaTestMatrix.add_constraint(
@@ -134,16 +134,14 @@ class TestCancellation(ImpalaTestSuite):
 
     # Ignore CTAS on Kudu if there is no PRIMARY KEY specified.
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: not (v.get_value('query_type') == 'CTAS' and
-            v.get_value('table_format').file_format == 'kudu' and
-            QUERIES[v.get_value('query')] is None))
+        lambda v: not (v.get_value('query_type') == 'CTAS'
+            and v.get_value('table_format').file_format == 'kudu'
+            and QUERIES[v.get_value('query')] is None))
 
     # tpch tables are not generated for hbase as the data loading takes a very 
long time.
     # TODO: Add cancellation tests for hbase.
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format != 'hbase')
-    if cls.exploration_strategy() != 'core':
-      NUM_CANCELATION_ITERATIONS = 3
 
   def cleanup_test_table(self, table_format):
     self.execute_query("drop table if exists ctas_cancel", 
table_format=table_format)
@@ -176,7 +174,7 @@ class TestCancellation(ImpalaTestSuite):
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
 
     # Execute the query multiple times, cancelling it each time.
-    for i in range(NUM_CANCELATION_ITERATIONS):
+    for i in range(vector.get_value('num_cancellation_iterations')):
       cancel_query_and_validate_state(self.client, query,
           vector.get_value('exec_option'), vector.get_value('table_format'),
           vector.get_value('cancel_delay'), 
vector.get_value('join_before_close'),
@@ -188,7 +186,8 @@ class TestCancellation(ImpalaTestSuite):
     # Executing the same query without canceling should work fine. Only do 
this if the
     # query has a limit or aggregation
     if not debug_action and ('count' in query or 'limit' in query):
-      self.execute_query(query, vector.get_value('exec_option'))
+      with self.change_database(self.client, vector.get_value('table_format')):
+        self.execute_query(query, vector.get_value('exec_option'))
 
   @pytest.mark.execute_serially
   def test_misformatted_profile_text(self):
@@ -226,6 +225,7 @@ class TestCancellation(ImpalaTestSuite):
     # fail. Introducing a small delay allows everything to quiesce.
     # TODO: Figure out a better way to address this
     sleep(1)
+    super(TestCancellation, self).teardown_method(method)
 
 
 class TestCancellationParallel(TestCancellation):
@@ -237,6 +237,7 @@ class TestCancellationParallel(TestCancellation):
   def test_cancel_select(self, vector):
     self.execute_cancel_test(vector)
 
+
 class TestCancellationSerial(TestCancellation):
   @classmethod
   def add_test_dimensions(cls):
@@ -268,6 +269,7 @@ class TestCancellationSerial(TestCancellation):
     metric_verifier = MetricVerifier(self.impalad_test_service)
     metric_verifier.verify_no_open_files(timeout=10)
 
+
 class TestCancellationFullSort(TestCancellation):
   @classmethod
   def add_test_dimensions(cls):
@@ -283,9 +285,9 @@ class TestCancellationFullSort(TestCancellation):
         ImpalaTestDimension('buffer_pool_limit', *SORT_BUFFER_POOL_LIMIT))
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('fail_rpc_action') == FAIL_RPC_ACTIONS[0])
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-       v.get_value('table_format').file_format =='parquet' and\
-       v.get_value('table_format').compression_codec == 'none')
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+       v.get_value('table_format').file_format == 'parquet'
+       and v.get_value('table_format').compression_codec == 'none')
 
   def test_cancel_sort(self, vector):
     self.execute_cancel_test(vector)
diff --git a/tests/query_test/test_mem_usage_scaling.py 
b/tests/query_test/test_mem_usage_scaling.py
index ab4e1af32..07c35a588 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -327,7 +327,7 @@ class TestScanMemLimit(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
     
cls.ImpalaTestMatrix.add_dimension(create_avro_snappy_dimension(cls.get_workload()))
 
-  def test_wide_avro_mem_usage(self, vector, unique_database):
+  def test_wide_avro_mem_usage(self, unique_database):
     """Create a wide avro table with large strings and test scans that can 
cause OOM."""
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip("only run resource-intensive query on exhaustive")
@@ -409,12 +409,12 @@ class TestHashJoinMemLimit(ImpalaTestSuite):
     """Selective scan with hash join and aggregate above it. Regression test 
for
     IMPALA-9712 - before the fix this ran out of memory."""
     OPTS = {'mem_limit': "80MB", 'mt_dop': 1}
-    self.change_database(self.client, vector.get_value('table_format'))
-    result = self.execute_query_expect_success(self.client,
-        """select sum(l_extendedprice * (1 - l_discount)) as revenue
-           from lineitem join part on p_partkey = l_partkey
-           where l_comment like 'ab%'""", query_options=OPTS)
-    assert result.data[0] == '440443181.0505'
+    with self.change_database(self.client, vector.get_value('table_format')):
+      result = self.execute_query_expect_success(self.client,
+          """select sum(l_extendedprice * (1 - l_discount)) as revenue
+             from lineitem join part on p_partkey = l_partkey
+             where l_comment like 'ab%'""", query_options=OPTS)
+      assert result.data[0] == '440443181.0505'
 
 
 @SkipIfNotHdfsMinicluster.tuned_for_minicluster
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 535adf5c9..dc8ff9347 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -85,11 +85,12 @@ class TestMtDop(ImpalaTestSuite):
       expected_results = "Updated 24 partition(s) and 11 column(s)."
     else:
       # Create a second table in the same format pointing to the same data 
files.
-      # This function switches to the format-specific DB in 'vector'.
-      table_loc = self._get_table_location("alltypes", vector)
-      self.execute_query_using_client(self.client,
-        "create external table %s like alltypes location '%s'"
-        % (fq_table_name, table_loc), vector)
+      # This switches the client momentarily to the format-specific DB in 
'vector'.
+      with self.change_database(self.client, vector.get_table_format()):
+        table_loc = self._get_table_location("alltypes", vector)
+        self.execute_query_using_client(self.client,
+          "create external table %s like alltypes location '%s'"
+          % (fq_table_name, table_loc), vector)
       # Recover partitions for HDFS tables.
       self.execute_query("alter table %s recover partitions" % fq_table_name)
       expected_results = "Updated 24 partition(s) and 11 column(s)."
@@ -152,7 +153,7 @@ class TestMtDopNonZeroParquet(ImpalaTestSuite):
         test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
         .get_db_name_from_format(vector.get_value('table_format'))})
 
-  def test_mt_dop_only_joins(self, vector, unique_database):
+  def test_mt_dop_only_joins(self, vector):
     """MT_DOP specific tests for joins."""
     new_vector = deepcopy(vector)
     # Allow test to override num_nodes.
diff --git a/tests/query_test/test_nested_types.py 
b/tests/query_test/test_nested_types.py
index 0d4f5ba19..6f541c780 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -91,9 +91,10 @@ class TestNestedTypes(ImpalaTestSuite):
 class TestNestedTypesSingleNode(ImpalaTestSuite):
   """Functional tests for nested types, run for all file formats that support 
nested
   types. All tests here runs with single node only."""
+
   @classmethod
-  def get_workload(self):
-    return 'functional-query'
+  def get_workload(cls):
+    return 'tpch_nested'
 
   @classmethod
   def add_test_dimensions(cls):
diff --git a/tests/query_test/test_runtime_filters.py 
b/tests/query_test/test_runtime_filters.py
index 83a0031b7..6e5b4d394 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -116,8 +116,10 @@ class TestRuntimeFilters(ImpalaTestSuite):
     get woken up and exit promptly when the query is cancelled."""
     # Make sure the cluster is quiesced before we start this test
     self._verify_no_fragments_running()
+    with self.change_database(self.client, vector.get_value('table_format')):
+      self.__run_wait_time_cancellation(vector)
 
-    self.change_database(self.client, vector.get_value('table_format'))
+  def __run_wait_time_cancellation(self, vector):
     # Set up a query where a scan (plan node 0, scanning alltypes) will wait
     # indefinitely for a filter to arrive. The filter arrival is delayed
     # by adding a wait to the scan of alltypestime (plan node 0).
@@ -150,8 +152,11 @@ class TestRuntimeFilters(ImpalaTestSuite):
 
   def test_file_filtering(self, vector):
     if 'kudu' in str(vector.get_value('table_format')):
-      return
-    self.change_database(self.client, vector.get_value('table_format'))
+      pytest.skip('Skip test against kudu')
+    with self.change_database(self.client, vector.get_value('table_format')):
+      self.__run_file_filtering(vector)
+
+  def __run_file_filtering(self, vector):
     self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
     self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
     result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner 
join
@@ -165,8 +170,11 @@ class TestRuntimeFilters(ImpalaTestSuite):
     """Test that late-arriving filters are applied to files when the scanner 
starts processing
     each scan range."""
     if 'kudu' in str(vector.get_value('table_format')):
-      return
-    self.change_database(self.client, vector.get_value('table_format'))
+      pytest.skip('Skip test against kudu')
+    with self.change_database(self.client, vector.get_value('table_format')):
+      self.__run_file_filtering_late_arriving_filter(vector)
+
+  def __run_file_filtering_late_arriving_filter(self, vector):
     self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
     self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=1")
     self.execute_query("SET NUM_SCANNER_THREADS=1")
@@ -357,7 +365,7 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
                        unique_database, test_file_vars=DEFAULT_VARS)
 
   @SkipIfLocal.hdfs_client
-  def test_partition_column_in_parquet_data_file(self, vector, 
unique_database):
+  def test_partition_column_in_parquet_data_file(self, unique_database):
     """IMPALA-11147: Test that runtime min/max filters still work on data 
files that
     contain the partitioning columns."""
     tbl_name = "part_col_in_data_file"
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 85b53951e..825cb09fa 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -323,9 +323,10 @@ class TestUnmatchedSchema(ImpalaTestSuite):
           'invalidate metadata {0}.jointbl_test'.format(unique_database),
           vector)
     else:
+      origin_db = self.get_db_name_from_format(vector.get_table_format())
       self.execute_query_using_client(self.client,
-          "create external table {0}.jointbl_test like jointbl".format(
-              unique_database), vector)
+          "create external table {0}.jointbl_test like {1}.jointbl".format(
+              unique_database, origin_db), vector)
 
       # Update the location of the new table to point the same location as the 
old table
       location = self._get_table_location('jointbl', vector)
@@ -439,8 +440,8 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
     super(TestHdfsScannerSkew, cls).add_test_dimensions()
     add_mandatory_exec_option(cls, 'mt_dop', 2)
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('text') and
-        v.get_value('table_format').compression_codec == 'none')
+        v.get_value('table_format').file_format in ('text')
+        and v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   @pytest.mark.execute_serially
@@ -1499,9 +1500,11 @@ class TestTextScanRangeLengths1(ImpalaTestSuite):
     # the count(col) result because if the scan range is split right after the 
escape
     # char, the escape char has no effect because we cannot scan backwards to 
the
     # previous scan range.
-    for t in self.ESCAPE_TABLE_LIST:
-      expected_result = self.client.execute("select count(col) from " + t)
-      result = self.client.execute("select count(*) from " + t)
+    db_name = self.get_db_name_from_format(vector.get_table_format())
+    for table_name in self.ESCAPE_TABLE_LIST:
+      fq_table_name = db_name + '.' + table_name
+      expected_result = self.client.execute("select count(col) from " + 
fq_table_name)
+      result = self.client.execute("select count(*) from " + fq_table_name)
       assert result.data == expected_result.data
 
 
@@ -1548,10 +1551,10 @@ class TestTextSplitDelimiters(ImpalaTestSuite):
     in the main text parsing algorithm. The second scan range exercises 
correctly
     identifying a split delimiter as the first in a scan range."""
     DEFAULT_IO_BUFFER_SIZE = 8 * 1024 * 1024
-    data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" +  # first scan range
-            'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n" +
-            'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" +  # second scan range
-            'b' * (DEFAULT_IO_BUFFER_SIZE - 1))
+    data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n"  # first scan range
+            + 'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n"
+            + 'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n"  # second scan range
+            + 'b' * (DEFAULT_IO_BUFFER_SIZE - 1))
     assert len(data) == DEFAULT_IO_BUFFER_SIZE * 4
 
     max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2
@@ -1594,8 +1597,8 @@ class TestTextScanRangeLengths2(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestTextScanRangeLengths2, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'text' and
-        v.get_value('table_format').compression_codec in ['none', 'gzip'])
+        v.get_value('table_format').file_format == 'text'
+        and v.get_value('table_format').compression_codec in ['none', 'gzip'])
 
   def test_text_scanner_with_header(self, vector, unique_database):
     # Remove to allow .test file to set abort_on_error.
diff --git a/tests/query_test/test_tablesample.py 
b/tests/query_test/test_tablesample.py
index 08e73ccb7..3cab33be3 100644
--- a/tests/query_test/test_tablesample.py
+++ b/tests/query_test/test_tablesample.py
@@ -18,12 +18,12 @@
 # Tests the TABLESAMPLE clause.
 
 from __future__ import absolute_import, division, print_function
-import pytest
 import subprocess
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_vector import ImpalaTestDimension
 
+
 class TestTableSample(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
@@ -36,19 +36,23 @@ class TestTableSample(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('filtered', *[True, 
False]))
     # Tablesample is only supported on HDFS tables.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-      v.get_value('table_format').file_format != 'kudu' and
-      v.get_value('table_format').file_format != 'hbase')
+      v.get_value('table_format').file_format != 'kudu'
+      and v.get_value('table_format').file_format != 'hbase')
     if cls.exploration_strategy() != 'exhaustive':
       # Cut down on core testing time by limiting the file formats.
       cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format == 'parquet' or
-        v.get_value('table_format').file_format == 'text')
+        v.get_value('table_format').file_format == 'parquet'
+        or v.get_value('table_format').file_format == 'text')
 
   def test_tablesample(self, vector):
     # Do not use a .test to avoid making this test flaky.
     # 1. Queries without the repeatable clause are non-deterministic.
     # 2. The results of queries without a repeatable clause could change due to
     # changes in data loading that affect the number or size of files.
+    with self.change_database(self.client, vector.get_value('table_format')):
+      self.__run_tablesample(vector)
+
+  def __run_tablesample(self, vector):
     repeatable = vector.get_value('repeatable')
     filtered = vector.get_value('filtered')
 
@@ -56,7 +60,6 @@ class TestTableSample(ImpalaTestSuite):
     if filtered:
       where_clause = "where month between 1 and 6"
 
-    ImpalaTestSuite.change_database(self.client, 
vector.get_value('table_format'))
     result = self.client.execute("select count(*) from alltypes %s" % 
where_clause)
     baseline_count = int(result.data[0])
     prev_count = None
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
index 5957ce757..fc1ed4e8b 100644
--- a/tests/util/cancel_util.py
+++ b/tests/util/cancel_util.py
@@ -102,9 +102,17 @@ def cancel_query_and_validate_state(client, query, 
exec_option, table_format,
   statement will be executed to cancel and close the query, instead of sending 
the Thrift
   RPCs directly.
   """
+  assert table_format is not None
+  with ImpalaTestSuite.change_database(client, table_format):
+    __run_cancel_query_and_validate_state(
+      client, query, exec_option, cancel_delay, join_before_close,
+      use_kill_query_statement)
+
+
+def __run_cancel_query_and_validate_state(client, query, exec_option,
+    cancel_delay, join_before_close=False, use_kill_query_statement=False):
   assert not (join_before_close and use_kill_query_statement)
 
-  if table_format: ImpalaTestSuite.change_database(client, table_format)
   if exec_option: client.set_configuration(exec_option)
   handle = client.execute_async(query)
 
@@ -160,8 +168,8 @@ def cancel_query_and_validate_state(client, query, 
exec_option, table_format,
   # that the synchronous phase of query unregistration has finished and the 
profile
   # is final.
   profile = client.get_runtime_profile(handle)
-  if ("- Completed admission: " in profile and
-      ("- First row fetched:" in profile or "- Request finished:" in profile)):
+  if ("- Completed admission: " in profile
+      and ("- First row fetched:" in profile or "- Request finished:" in 
profile)):
     # TotalBytesRead is a sentinel that will only be created if 
ComputeQuerySummary()
     # has been run by the cancelling thread.
     assert "- TotalBytesRead:" in profile, profile

Reply via email to