This is an automated email from the ASF dual-hosted git repository. csringhofer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 9cb9bae84e8888203e4bdfd3e20ee3e61c6059fe Author: Riza Suminto <[email protected]> AuthorDate: Thu Feb 13 11:19:31 2025 -0800 IMPALA-13758: Use context manager in ImpalaTestSuite.change_database ImpalaTestSuite.change_database is responsible to point impala client to database under test. However, it left client pointing to that database after the test without reverting them back to default database. This patch does the reversal by changing ImpalaTestSuite.change_database to use context manager. This patch change the behavior of execute_query_using_client() and execute_query_async_using_client(). They used to change database according to the given vector parameter, but not anymore after this patch. In practice, this behavior change does not affect many tests because most queries going through these functions already use fully qualified table name. Going forward, querying through function other than run_test_case() should try to use fully qualified table name as much as possible. Retain behavior of ImpalaTestSuite._get_table_location() since there are considerable number of tests relies on it (changing database when called). Removed unused test fixtures and fixed several flake8 issues in modified test files. Testing: - Moved nested-types-subplan-single-node.test. This allows the test framework to point to the right tpch_nested* database. - Pass exhaustive test except IMPALA-13752 and IMPALA-13761. They will be fixed in separate patch. Change-Id: I75bec7403cc302728a630efe3f95e852a84594e2 Reviewed-on: http://gerrit.cloudera.org:8080/22487 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../nested-types-subplan-single-node.test | 3 - tests/common/impala_test_suite.py | 90 ++++++++++++++++------ tests/common/test_vector.py | 1 + tests/conftest.py | 3 - tests/custom_cluster/test_admission_controller.py | 20 ++--- .../test_blacklisted_dbs_and_tables.py | 10 +-- tests/custom_cluster/test_data_cache.py | 43 ++++++----- tests/failure/test_failpoints.py | 15 ++-- tests/query_test/test_aggregation.py | 14 ++-- tests/query_test/test_cancellation.py | 42 +++++----- tests/query_test/test_mem_usage_scaling.py | 14 ++-- tests/query_test/test_mt_dop.py | 13 ++-- tests/query_test/test_nested_types.py | 5 +- tests/query_test/test_runtime_filters.py | 20 +++-- tests/query_test/test_scanners.py | 29 +++---- tests/query_test/test_tablesample.py | 15 ++-- tests/util/cancel_util.py | 14 +++- 17 files changed, 207 insertions(+), 144 deletions(-) diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test b/testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test similarity index 97% rename from testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test rename to testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test index ece9c7007..3a9ab06ad 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan-single-node.test +++ b/testdata/workloads/tpch_nested/queries/QueryTest/nested-types-subplan-single-node.test @@ -1,8 +1,5 @@ ==== ---- QUERY -use tpch_nested_parquet -==== ----- QUERY # IMPALA-2289: Test proper handling of AtCapacity() inside the subplan node. # num_nodes is set to 1 in the python test to make it very likely to hit the once buggy # code path because a single scan node instance must process all input files. diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 7c5495f4f..0cc5d4ea5 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function from builtins import range, round +import contextlib import glob import grp import hashlib @@ -63,7 +64,7 @@ from tests.common.test_result_verifier import ( verify_raw_results, verify_runtime_profile) from tests.common.test_vector import ( - EXEC_OPTION, PROTOCOL, TABLE_FORMAT, + EXEC_OPTION, PROTOCOL, TABLE_FORMAT, VECTOR, BEESWAX, HS2, HS2_HTTP, ImpalaTestDimension) from tests.performance.query import Query @@ -752,8 +753,9 @@ class ImpalaTestSuite(BaseTestSuite): # Change the database to reflect the file_format, compression codec etc, or the # user specified database for all targeted impalad. for impalad_client in target_impalad_clients: - ImpalaTestSuite.change_database(impalad_client, - table_format_info, use_db, pytest.config.option.scale_factor) + ImpalaTestSuite.__change_client_database( + impalad_client, table_format=table_format_info, db_name=use_db, + scale_factor=pytest.config.option.scale_factor) impalad_client.set_configuration(exec_options) def __exec_in_impala(query, user=None): @@ -984,6 +986,10 @@ class ImpalaTestSuite(BaseTestSuite): os.makedirs(output_dir) write_test_file(output_file, sections, encoding=encoding) + # Revert target_impalad_clients back to default database. + for impalad_client in target_impalad_clients: + ImpalaTestSuite.__change_client_database(impalad_client, db_name='default') + def get_query_lineage(self, query_id, lineage_dir): """Walks through the lineage files in lineage_dir to look for a given query_id. This is an expensive operation is lineage_dir is large, so use carefully.""" @@ -1010,18 +1016,50 @@ class ImpalaTestSuite(BaseTestSuite): def get_db_name_from_format(table_format, scale_factor=''): return QueryTestSectionReader.get_db_name(table_format, scale_factor) - @classmethod - def change_database(cls, impala_client, table_format=None, - db_name=None, scale_factor=None): + @staticmethod + def __change_client_database(impala_client, table_format=None, db_name=None, + scale_factor=None): + """Change 'impala_client' to point to either 'db_name' or workload-specific + database described by 'table_format' and 'scale_vector'. + Restore client configuration back to its default. This is intended for internal use + within ImpalaTestSuite. For changing database in ImpalaTestSuite subclasses, please + refer to ImpalaTestSuite.change_database(), which provide a more consistent way to + temporarily change database and reverting back to default database. + """ if db_name is None: assert table_format is not None - db_name = QueryTestSectionReader.get_db_name(table_format, - scale_factor if scale_factor else '') - query = 'use %s' % db_name + db_name = ImpalaTestSuite.get_db_name_from_format( + table_format, scale_factor if scale_factor else '') # Clear the exec_options before executing a USE statement. # The USE statement should not fail for negative exec_option tests. impala_client.clear_configuration() - impala_client.execute(query) + impala_client.execute('use ' + db_name) + + @classmethod + @contextlib.contextmanager + def change_database(cls, impala_client, table_format=None, db_name=None, + scale_factor=None): + """Change impala_client to point to approriate database under test and revert + it to default database once it exit the with scope. + Restore client configuration back to its default when getting in and getting out of + 'with' scope. Test method should try to use fully qualified table name in the test + query as much as possible and only use this context manager function when it is + not practical to do so. + Sample usage: + + with ImpalaTestSuite.change_database(client, db_name='functional_parquet'): + # client clear configuration and pointing to 'functional_parquet' database here. + client.execute('show tables') + # client clear configuration and pointing to 'default' database after it exit the + # 'with' scope. + client.execute('show tables') + """ + try: + cls.__change_client_database(impala_client, table_format=table_format, + db_name=db_name, scale_factor=scale_factor) + yield + finally: + cls.__change_client_database(impala_client, db_name='default') def execute_wrapper(function): """ @@ -1031,6 +1069,8 @@ class ImpalaTestSuite(BaseTestSuite): remaining the same. A use database is issued before query execution. As such, database names need to be build pre execution, this method wraps around the different execute methods and provides a common interface to issue the proper use command. + IMPALA-13766: Remove this function decorator and let test method to change database + by themself. """ @wraps(function) def wrapper(*args, **kwargs): @@ -1038,13 +1078,16 @@ class ImpalaTestSuite(BaseTestSuite): if kwargs.get(TABLE_FORMAT): table_format = kwargs.get(TABLE_FORMAT) del kwargs[TABLE_FORMAT] - if kwargs.get('vector'): - table_format = kwargs.get('vector').get_table_format() - del kwargs['vector'] + if kwargs.get(VECTOR): + ImpalaTestSuite.validate_exec_option_dimension(kwargs.get(VECTOR)) + table_format = kwargs.get(VECTOR).get_table_format() + del kwargs[VECTOR] # self is the implicit first argument if table_format is not None: - args[0].change_database(args[0].client, table_format) - return function(*args, **kwargs) + with ImpalaTestSuite.change_database(args[0].client, table_format): + return function(*args, **kwargs) + else: + return function(*args, **kwargs) return wrapper @classmethod @@ -1101,16 +1144,16 @@ class ImpalaTestSuite(BaseTestSuite): def execute_query_using_client(self, client, query, vector): self.validate_exec_option_dimension(vector) - self.change_database(client, vector.get_table_format()) query_options = vector.get_value(EXEC_OPTION) - if query_options is not None: client.set_configuration(query_options) + if query_options is not None: + client.set_configuration(query_options) return client.execute(query) def execute_query_async_using_client(self, client, query, vector): self.validate_exec_option_dimension(vector) - self.change_database(client, vector.get_table_format()) query_options = vector.get_value(EXEC_OPTION) - if query_options is not None: client.set_configuration(query_options) + if query_options is not None: + client.set_configuration(query_options) return client.execute_async(query) def close_query_using_client(self, client, query): @@ -1248,7 +1291,10 @@ class ImpalaTestSuite(BaseTestSuite): assert abs(a - b) / float(max(abs(a), abs(b))) <= diff_perc def _get_table_location(self, table_name, vector): - """ Returns the HDFS location of the table """ + """ Returns the HDFS location of the table. + This method changes self.client to point to the dabatase described by 'vector'.""" + db_name = self.get_db_name_from_format(vector.get_table_format()) + self.__change_client_database(self.client, db_name=db_name) result = self.execute_query_using_client(self.client, "describe formatted %s" % table_name, vector) for row in result.data: @@ -1626,7 +1672,8 @@ class ImpalaTestSuite(BaseTestSuite): str(e)) time.sleep(1) - def validate_exec_option_dimension(self, vector): + @staticmethod + def validate_exec_option_dimension(vector): """Validate that test dimension with name matching query option name is also registered in 'exec_option' dimension.""" option_dim_names = [] @@ -1641,7 +1688,6 @@ class ImpalaTestSuite(BaseTestSuite): return for name in option_dim_names: - # TODO: enforce these warnings by changing them into pytest.fail() if name not in exec_option: pytest.fail("Exec option {} declared as independent dimension but not inserted " "into {} dimension. Consider using helper function " diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py index 900f3a072..12f87bcb6 100644 --- a/tests/common/test_vector.py +++ b/tests/common/test_vector.py @@ -63,6 +63,7 @@ import logging LOG = logging.getLogger(__name__) +VECTOR = 'vector' # Literal constants to refer to some standard dimension names. EXEC_OPTION = 'exec_option' PROTOCOL = 'protocol' diff --git a/tests/conftest.py b/tests/conftest.py index aa179c365..626d68b2d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -402,9 +402,6 @@ def unique_database(request, testid_checksum): request.instance.filesystem_client.delete_file_dir(db_location, recursive=True) def cleanup(): - # Make sure we don't try to drop the current session database - # TODO: clean this up via IMPALA-13758. - request.instance.execute_query_expect_success(request.instance.client, "use default") with request.cls.create_impala_client(protocol=HS2) as client: client.set_configuration({'sync_ddl': sync_ddl}) for db_name in db_names: diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 7c6c9aed4..5223a421d 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -41,7 +41,6 @@ from tests.common.custom_cluster_test_suite import ( START_ARGS, CustomClusterTestSuite) from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties -from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.resource_pool_config import ResourcePoolConfig from tests.common.skip import SkipIfFS, SkipIfEC, SkipIfNotHdfsMinicluster from tests.common.test_dimensions import ( @@ -74,8 +73,8 @@ SLOW_QUERY = "select count(*) from functional.alltypes where int_col = sleep(200 # The unpartitioned fragments are both interior fragments that consume input # from a scan fragment and non-interior fragments with a constant UNION. QUERY_WITH_UNPARTITIONED_FRAGMENTS = """ - select *, (select count(distinct int_col) from alltypestiny) subquery1, - (select count(distinct int_col) from alltypes) subquery2, + select *, (select count(distinct int_col) from functional.alltypestiny) subquery1, + (select count(distinct int_col) from functional.alltypes) subquery2, (select 1234) subquery3 from (""" + QUERY + """) v""" @@ -579,7 +578,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): def test_sanity_checks_dedicated_coordinator(self, vector, unique_database): """Sanity tests for verifying targeted dedicated coordinator memory estimations and behavior.""" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) self.client.set_configuration_option('request_pool', "root.regularPool") exec_options = vector.get_value('exec_option') # Make sure query option MAX_MEM_ESTIMATE_FOR_ADMISSION is enforced on the dedicated @@ -625,8 +623,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): the actual vs expected values for mem admitted and mem limit for both coord and executor. Also verifies that those memory values are different if 'using_dedicated_coord_estimates' is true.""" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) - self.client.set_configuration_option('request_pool', "root.regularPool") + vector.set_exec_option('request_pool', 'root.regularPool') + self.client.set_configuration(vector.get_exec_option_dict()) # Use a test query that has unpartitioned non-coordinator fragments to make # sure those are handled correctly (IMPALA-10036). for query in [QUERY, QUERY_WITH_UNPARTITIONED_FRAGMENTS]: @@ -707,10 +705,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2) - def test_mem_limit_executors(self, vector): + def test_mem_limit_executors(self): """Verify that the query option mem_limit_executors is only enforced on the executors.""" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) expected_exec_mem_limit = "999999999" self.client.set_configuration({"MEM_LIMIT_EXECUTORS": expected_exec_mem_limit}) handle = self.client.execute_async(QUERY.format(1)) @@ -726,10 +723,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2, impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024)) - def test_mem_limit_coordinators(self, vector): + def test_mem_limit_coordinators(self): """Verify that the query option mem_limit_coordinators is only enforced on the coordinators.""" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) expected_exec_mem_limit = "999999999" expected_coord_mem_limit = "111111111" self.client.set_configuration({"MEM_LIMIT_EXECUTORS": expected_exec_mem_limit, @@ -747,10 +743,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=2, impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, pool_max_mem=2 * 1024 * 1024 * 1024, proc_mem_limit=3 * 1024 * 1024 * 1024)) - def test_mem_limits(self, vector): + def test_mem_limits(self): """Verify that the query option mem_limit_coordinators and mem_limit_executors are ignored when mem_limit is set.""" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) exec_mem_limit = "999999999" coord_mem_limit = "111111111" mem_limit = "888888888" @@ -2475,7 +2470,6 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase): query = QUERY.format(self.query_num) self.query_state = 'SUBMITTING' client = self.impalad.service.create_beeswax_client() - ImpalaTestSuite.change_database(client, self.vector.get_value('table_format')) client.set_configuration(exec_options) LOG.info("Submitting query %s with ending behavior %s", diff --git a/tests/custom_cluster/test_blacklisted_dbs_and_tables.py b/tests/custom_cluster/test_blacklisted_dbs_and_tables.py index 8ce0e5969..f5ea32da6 100644 --- a/tests/custom_cluster/test_blacklisted_dbs_and_tables.py +++ b/tests/custom_cluster/test_blacklisted_dbs_and_tables.py @@ -95,7 +95,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite): "--blacklisted_tables=functional.alltypes,functional_parquet.alltypes", catalogd_args="--blacklisted_dbs=functional_rc,functional_seq " "--blacklisted_tables=functional.alltypes,functional_parquet.alltypes") - def test_blacklisted_dbs_and_tables(self, vector): + def test_blacklisted_dbs_and_tables(self): self.__check_db_not_visible("functional_rc") self.__check_db_not_visible("functional_seq") self.__check_table_not_visible("functional", "alltypes") @@ -128,7 +128,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( impalad_args="--blacklisted_tables=alltypes_def,functional.alltypes", catalogd_args="--blacklisted_tables=alltypes_def,functional.alltypes") - def test_resolving_default_database(self, vector): + def test_resolving_default_database(self): # Check that "alltypes_def" is resolved as "default.alltypes_def" table = self.hive_client.get_table("functional", "alltypes") table.dbName = "default" @@ -138,8 +138,8 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite): self.hive_client.drop_table("default", "alltypes_def", True) # Check non fully qualified table names are recognized correctly - self.change_database(self.client, db_name="functional") - self.__check_create_drop_table(use_fully_qualified_table_name=False) + with self.change_database(self.client, db_name="functional"): + self.__check_create_drop_table(use_fully_qualified_table_name=False) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( @@ -147,7 +147,7 @@ class TestBlacklistedDbsAndTables(CustomClusterTestSuite): "--blacklisted_tables=functional.alltypes", impalad_args="--blacklisted_dbs=functional_seq " "--blacklisted_tables=functional.alltypestiny") - def test_inconsistent_blacklist(self, vector): + def test_inconsistent_blacklist(self): """Test the error handling when blacklists are accidentally set differently between coordinators and the catalogd""" self.__expect_error_in_query( diff --git a/tests/custom_cluster/test_data_cache.py b/tests/custom_cluster/test_data_cache.py index 734d4c049..a020bfc68 100644 --- a/tests/custom_cluster/test_data_cache.py +++ b/tests/custom_cluster/test_data_cache.py @@ -82,7 +82,8 @@ class TestDataCache(CustomClusterTestSuite): # Expect all cache hits results in no opened files. opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count' baseline = self.get_metric(opened_file_handles_metric) - self.execute_query("select count(distinct l_orderkey) from test_parquet") + self.execute_query("select count(distinct l_orderkey) from {0}.test_parquet".format( + unique_database)) assert self.get_metric(opened_file_handles_metric) == baseline @pytest.mark.execute_serially @@ -106,7 +107,7 @@ class TestDataCache(CustomClusterTestSuite): def test_data_cache_deterministic_no_file_handle_cache(self, vector, unique_database): self.__test_data_cache_deterministic(vector, unique_database) - def __test_data_cache(self, vector): + def __test_data_cache(self): """ This test scans the same table twice and verifies the cache hit count metrics are correct. The exact number of bytes hit is non-deterministic between runs due to different mtime of files and multiple shards in the cache. @@ -132,16 +133,16 @@ class TestDataCache(CustomClusterTestSuite): impalad_args=get_impalad_args("LRU", high_write_concurrency=False, force_single_shard=False), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_lru(self, vector): - self.__test_data_cache(vector) + def test_data_cache_lru(self): + self.__test_data_cache() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LIRS", high_write_concurrency=False, force_single_shard=False), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_lirs(self, vector): - self.__test_data_cache(vector) + def test_data_cache_lirs(self): + self.__test_data_cache() def __test_data_cache_disablement(self, vector): # Verifies that the cache metrics are all zero. @@ -185,7 +186,7 @@ class TestDataCache(CustomClusterTestSuite): impalad_args=get_impalad_args("LIRS", high_write_concurrency=False), start_args="--data_cache_dir=/tmp --data_cache_size=9MB", cluster_size=1) - def test_data_cache_lirs_instant_evictions(self, vector): + def test_data_cache_lirs_instant_evictions(self): # The setup for this test is intricate. For Allocate() to succeed, the request # needs to be smaller than the protected size (95% of the cache). For Insert() to # fail, the request needs to be larger than the unprotected size (5% of the cache). @@ -215,7 +216,7 @@ class TestDataCache(CustomClusterTestSuite): assert self.get_data_cache_metric('num-writes') >= 0 assert self.get_data_cache_metric('total-bytes') >= 0 - def __test_data_cache_keep_across_restarts(self, vector, test_reduce_size=False): + def __test_data_cache_keep_across_restarts(self, test_reduce_size=False): QUERY = "select * from tpch_parquet.lineitem" # Execute a query, record the total bytes and the number of entries of cache before # cache dump. @@ -260,31 +261,31 @@ class TestDataCache(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LRU", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_keep_across_restarts_lru(self, vector): - self.__test_data_cache_keep_across_restarts(vector) + def test_data_cache_keep_across_restarts_lru(self): + self.__test_data_cache_keep_across_restarts() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_keep_across_restarts_lirs(self, vector): - self.__test_data_cache_keep_across_restarts(vector) + def test_data_cache_keep_across_restarts_lirs(self): + self.__test_data_cache_keep_across_restarts() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LRU", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_reduce_size_restarts_lru(self, vector): - self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True) + def test_data_cache_reduce_size_restarts_lru(self): + self.__test_data_cache_keep_across_restarts(test_reduce_size=True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1) - def test_data_cache_reduce_size_restarts_lirs(self, vector): - self.__test_data_cache_keep_across_restarts(vector, test_reduce_size=True) + def test_data_cache_reduce_size_restarts_lirs(self): + self.__test_data_cache_keep_across_restarts(test_reduce_size=True) - def __test_data_cache_readonly(self, vector): + def __test_data_cache_readonly(self): QUERY = "select * from tpch_parquet.lineitem" # Execute the query asynchronously, wait a short while, and do gracefully shutdown # immediately to test the race between cache writes and setting cache read-only. @@ -310,12 +311,12 @@ class TestDataCache(CustomClusterTestSuite): @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LRU", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True) - def test_data_cache_readonly_lru(self, vector): - self.__test_data_cache_readonly(vector) + def test_data_cache_readonly_lru(self): + self.__test_data_cache_readonly() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True) - def test_data_cache_readonly_lirs(self, vector): - self.__test_data_cache_readonly(vector) + def test_data_cache_readonly_lirs(self): + self.__test_data_cache_readonly() diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index 38f0919e5..d5bb1bf9e 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -97,6 +97,10 @@ class TestFailpoints(ImpalaTestSuite): # killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2. @pytest.mark.execute_serially def test_failpoints(self, vector): + with self.change_database(self.client, vector.get_table_format()): + self.__run_failpoints(vector) + + def __run_failpoints(self, vector): query = vector.get_value('query') action = vector.get_value('action') location = vector.get_value('location') @@ -131,9 +135,8 @@ class TestFailpoints(ImpalaTestSuite): def __parse_plan_nodes_from_explain(self, query, vector): """Parses the EXPLAIN <query> output and returns a list of node ids. Expects format of <ID>:<NAME>""" - explain_result =\ - self.execute_query("explain " + query, vector.get_value('exec_option'), - table_format=vector.get_value('table_format')) + explain_result = self.execute_query("explain " + query, + vector.get_value('exec_option')) node_ids = [] for row in explain_result.data: match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row) @@ -190,8 +193,7 @@ class TestFailpoints(ImpalaTestSuite): def __execute_fail_action(self, query, vector): try: - self.execute_query(query, vector.get_value('exec_option'), - table_format=vector.get_value('table_format')) + self.execute_query(query, vector.get_value('exec_option')) assert 'Expected Failure' except ImpalaBeeswaxException as e: LOG.debug(e) @@ -200,8 +202,7 @@ class TestFailpoints(ImpalaTestSuite): def __execute_cancel_action(self, query, vector): LOG.info('Starting async query execution') - handle = self.execute_query_async(query, vector.get_value('exec_option'), - table_format=vector.get_value('table_format')) + handle = self.execute_query_async(query, vector.get_value('exec_option')) LOG.info('Sleeping') sleep(3) cancel_result = self.client.cancel(handle) diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index 2e98f4737..bb5711a53 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -147,11 +147,11 @@ class TestAggregation(ImpalaTestSuite): exec_option = vector.get_value('exec_option') disable_codegen = exec_option['disable_codegen'] data_type, agg_func = (vector.get_value('data_type'), vector.get_value('agg_func')) + db_name = self.get_db_name_from_format(vector.get_table_format()) - query = 'select %s(%s_col) from alltypesagg where day is not null' % (agg_func, - data_type) - result = self.execute_query(query, exec_option, - table_format=vector.get_value('table_format')) + query = 'select {0}({1}_col) from {2}.alltypesagg where day is not null'.format( + agg_func, data_type, db_name) + result = self.execute_query(query, exec_option) assert len(result.data) == 1 self.verify_agg_result(agg_func, data_type, False, result.data[0]) @@ -160,9 +160,9 @@ class TestAggregation(ImpalaTestSuite): # It is deliberately disabled for the merge aggregation. assert_codegen_enabled(result.runtime_profile, [1]) - query = 'select %s(DISTINCT(%s_col)) from alltypesagg where day is not null' % ( - agg_func, data_type) - result = self.execute_query(query, vector.get_value('exec_option')) + query = ('select {0}(DISTINCT({1}_col)) from {2}.alltypesagg ' + 'where day is not null').format(agg_func, data_type, db_name) + result = self.execute_query(query, exec_option) assert len(result.data) == 1 self.verify_agg_result(agg_func, data_type, True, result.data[0]) diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py index 342a39f03..49efa053f 100644 --- a/tests/query_test/test_cancellation.py +++ b/tests/query_test/test_cancellation.py @@ -21,10 +21,8 @@ from __future__ import absolute_import, division, print_function from builtins import range import pytest -import threading from time import sleep from RuntimeProfile.ttypes import TRuntimeProfileFormat -from tests.beeswax.impala_beeswax import ImpalaBeeswaxException from tests.common.test_dimensions import add_mandatory_exec_option from tests.common.test_vector import ImpalaTestDimension from tests.common.impala_test_suite import ImpalaTestSuite @@ -57,9 +55,6 @@ QUERY_TYPE = ["SELECT", "CTAS"] # are prone to occur more often when the time between RPCs is small. CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4] -# Number of times to execute/cancel each query under test -NUM_CANCELATION_ITERATIONS = 1 - # Test cancellation on both running and hung queries. Node ID 0 is the scan node WAIT_ACTIONS = [None, '0:GETNEXT:WAIT'] @@ -77,7 +72,7 @@ JOIN_BEFORE_CLOSE = [False, True] # Extra dimensions to test order by without limit SORT_QUERY = 'select * from lineitem order by l_orderkey' SORT_CANCEL_DELAY = list(range(6, 10)) -SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts. +SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts. # Test with and without multithreading MT_DOP_VALUES = [0, 4] @@ -87,6 +82,7 @@ MT_DOP_VALUES = [0, 4] # True: Execute a KILL QUERY statement. USE_KILL_QUERY_STATEMENT = [False, True] + class TestCancellation(ImpalaTestSuite): @classmethod def get_workload(self): @@ -115,17 +111,21 @@ class TestCancellation(ImpalaTestSuite): ImpalaTestDimension('mt_dop', *MT_DOP_VALUES)) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('use_kill_query_statement', *USE_KILL_QUERY_STATEMENT)) + # Number of times to execute/cancel each query under test + num_iterations = 1 if cls.exploration_strategy() == 'core' else 3 + cls.ImpalaTestMatrix.add_dimension( + ImpalaTestDimension('num_cancellation_iterations', num_iterations)) cls.ImpalaTestMatrix.add_constraint( - lambda v: v.get_value('query_type') != 'CTAS' or (\ + lambda v: v.get_value('query_type') != 'CTAS' or ( v.get_value('table_format').file_format in ['text', 'parquet', 'kudu', 'json'] and v.get_value('table_format').compression_codec == 'none')) cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('exec_option')['batch_size'] == 0) # Ignore 'compute stats' queries for the CTAS query type. cls.ImpalaTestMatrix.add_constraint( - lambda v: not (v.get_value('query_type') == 'CTAS' and - v.get_value('query').startswith('compute stats'))) + lambda v: not (v.get_value('query_type') == 'CTAS' + and v.get_value('query').startswith('compute stats'))) # 'use_kill_query_statement' and 'join_before_close' cannot be both True, since # the KILL QUERY statement will also close the query. cls.ImpalaTestMatrix.add_constraint( @@ -134,16 +134,14 @@ class TestCancellation(ImpalaTestSuite): # Ignore CTAS on Kudu if there is no PRIMARY KEY specified. cls.ImpalaTestMatrix.add_constraint( - lambda v: not (v.get_value('query_type') == 'CTAS' and - v.get_value('table_format').file_format == 'kudu' and - QUERIES[v.get_value('query')] is None)) + lambda v: not (v.get_value('query_type') == 'CTAS' + and v.get_value('table_format').file_format == 'kudu' + and QUERIES[v.get_value('query')] is None)) # tpch tables are not generated for hbase as the data loading takes a very long time. # TODO: Add cancellation tests for hbase. - cls.ImpalaTestMatrix.add_constraint(lambda v:\ + cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format != 'hbase') - if cls.exploration_strategy() != 'core': - NUM_CANCELATION_ITERATIONS = 3 def cleanup_test_table(self, table_format): self.execute_query("drop table if exists ctas_cancel", table_format=table_format) @@ -176,7 +174,7 @@ class TestCancellation(ImpalaTestSuite): vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') # Execute the query multiple times, cancelling it each time. - for i in range(NUM_CANCELATION_ITERATIONS): + for i in range(vector.get_value('num_cancellation_iterations')): cancel_query_and_validate_state(self.client, query, vector.get_value('exec_option'), vector.get_value('table_format'), vector.get_value('cancel_delay'), vector.get_value('join_before_close'), @@ -188,7 +186,8 @@ class TestCancellation(ImpalaTestSuite): # Executing the same query without canceling should work fine. Only do this if the # query has a limit or aggregation if not debug_action and ('count' in query or 'limit' in query): - self.execute_query(query, vector.get_value('exec_option')) + with self.change_database(self.client, vector.get_value('table_format')): + self.execute_query(query, vector.get_value('exec_option')) @pytest.mark.execute_serially def test_misformatted_profile_text(self): @@ -226,6 +225,7 @@ class TestCancellation(ImpalaTestSuite): # fail. Introducing a small delay allows everything to quiesce. # TODO: Figure out a better way to address this sleep(1) + super(TestCancellation, self).teardown_method(method) class TestCancellationParallel(TestCancellation): @@ -237,6 +237,7 @@ class TestCancellationParallel(TestCancellation): def test_cancel_select(self, vector): self.execute_cancel_test(vector) + class TestCancellationSerial(TestCancellation): @classmethod def add_test_dimensions(cls): @@ -268,6 +269,7 @@ class TestCancellationSerial(TestCancellation): metric_verifier = MetricVerifier(self.impalad_test_service) metric_verifier.verify_no_open_files(timeout=10) + class TestCancellationFullSort(TestCancellation): @classmethod def add_test_dimensions(cls): @@ -283,9 +285,9 @@ class TestCancellationFullSort(TestCancellation): ImpalaTestDimension('buffer_pool_limit', *SORT_BUFFER_POOL_LIMIT)) cls.ImpalaTestMatrix.add_constraint( lambda v: v.get_value('fail_rpc_action') == FAIL_RPC_ACTIONS[0]) - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format =='parquet' and\ - v.get_value('table_format').compression_codec == 'none') + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet' + and v.get_value('table_format').compression_codec == 'none') def test_cancel_sort(self, vector): self.execute_cancel_test(vector) diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py index ab4e1af32..07c35a588 100644 --- a/tests/query_test/test_mem_usage_scaling.py +++ b/tests/query_test/test_mem_usage_scaling.py @@ -327,7 +327,7 @@ class TestScanMemLimit(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) cls.ImpalaTestMatrix.add_dimension(create_avro_snappy_dimension(cls.get_workload())) - def test_wide_avro_mem_usage(self, vector, unique_database): + def test_wide_avro_mem_usage(self, unique_database): """Create a wide avro table with large strings and test scans that can cause OOM.""" if self.exploration_strategy() != 'exhaustive': pytest.skip("only run resource-intensive query on exhaustive") @@ -409,12 +409,12 @@ class TestHashJoinMemLimit(ImpalaTestSuite): """Selective scan with hash join and aggregate above it. Regression test for IMPALA-9712 - before the fix this ran out of memory.""" OPTS = {'mem_limit': "80MB", 'mt_dop': 1} - self.change_database(self.client, vector.get_value('table_format')) - result = self.execute_query_expect_success(self.client, - """select sum(l_extendedprice * (1 - l_discount)) as revenue - from lineitem join part on p_partkey = l_partkey - where l_comment like 'ab%'""", query_options=OPTS) - assert result.data[0] == '440443181.0505' + with self.change_database(self.client, vector.get_value('table_format')): + result = self.execute_query_expect_success(self.client, + """select sum(l_extendedprice * (1 - l_discount)) as revenue + from lineitem join part on p_partkey = l_partkey + where l_comment like 'ab%'""", query_options=OPTS) + assert result.data[0] == '440443181.0505' @SkipIfNotHdfsMinicluster.tuned_for_minicluster diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py index 535adf5c9..dc8ff9347 100644 --- a/tests/query_test/test_mt_dop.py +++ b/tests/query_test/test_mt_dop.py @@ -85,11 +85,12 @@ class TestMtDop(ImpalaTestSuite): expected_results = "Updated 24 partition(s) and 11 column(s)." else: # Create a second table in the same format pointing to the same data files. - # This function switches to the format-specific DB in 'vector'. - table_loc = self._get_table_location("alltypes", vector) - self.execute_query_using_client(self.client, - "create external table %s like alltypes location '%s'" - % (fq_table_name, table_loc), vector) + # This switches the client momentarily to the format-specific DB in 'vector'. + with self.change_database(self.client, vector.get_table_format()): + table_loc = self._get_table_location("alltypes", vector) + self.execute_query_using_client(self.client, + "create external table %s like alltypes location '%s'" + % (fq_table_name, table_loc), vector) # Recover partitions for HDFS tables. self.execute_query("alter table %s recover partitions" % fq_table_name) expected_results = "Updated 24 partition(s) and 11 column(s)." @@ -152,7 +153,7 @@ class TestMtDopNonZeroParquet(ImpalaTestSuite): test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite .get_db_name_from_format(vector.get_value('table_format'))}) - def test_mt_dop_only_joins(self, vector, unique_database): + def test_mt_dop_only_joins(self, vector): """MT_DOP specific tests for joins.""" new_vector = deepcopy(vector) # Allow test to override num_nodes. diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py index 0d4f5ba19..6f541c780 100644 --- a/tests/query_test/test_nested_types.py +++ b/tests/query_test/test_nested_types.py @@ -91,9 +91,10 @@ class TestNestedTypes(ImpalaTestSuite): class TestNestedTypesSingleNode(ImpalaTestSuite): """Functional tests for nested types, run for all file formats that support nested types. All tests here runs with single node only.""" + @classmethod - def get_workload(self): - return 'functional-query' + def get_workload(cls): + return 'tpch_nested' @classmethod def add_test_dimensions(cls): diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py index 83a0031b7..6e5b4d394 100644 --- a/tests/query_test/test_runtime_filters.py +++ b/tests/query_test/test_runtime_filters.py @@ -116,8 +116,10 @@ class TestRuntimeFilters(ImpalaTestSuite): get woken up and exit promptly when the query is cancelled.""" # Make sure the cluster is quiesced before we start this test self._verify_no_fragments_running() + with self.change_database(self.client, vector.get_value('table_format')): + self.__run_wait_time_cancellation(vector) - self.change_database(self.client, vector.get_value('table_format')) + def __run_wait_time_cancellation(self, vector): # Set up a query where a scan (plan node 0, scanning alltypes) will wait # indefinitely for a filter to arrive. The filter arrival is delayed # by adding a wait to the scan of alltypestime (plan node 0). @@ -150,8 +152,11 @@ class TestRuntimeFilters(ImpalaTestSuite): def test_file_filtering(self, vector): if 'kudu' in str(vector.get_value('table_format')): - return - self.change_database(self.client, vector.get_value('table_format')) + pytest.skip('Skip test against kudu') + with self.change_database(self.client, vector.get_value('table_format')): + self.__run_file_filtering(vector) + + def __run_file_filtering(self, vector): self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL") self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000") result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner join @@ -165,8 +170,11 @@ class TestRuntimeFilters(ImpalaTestSuite): """Test that late-arriving filters are applied to files when the scanner starts processing each scan range.""" if 'kudu' in str(vector.get_value('table_format')): - return - self.change_database(self.client, vector.get_value('table_format')) + pytest.skip('Skip test against kudu') + with self.change_database(self.client, vector.get_value('table_format')): + self.__run_file_filtering_late_arriving_filter(vector) + + def __run_file_filtering_late_arriving_filter(self, vector): self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL") self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=1") self.execute_query("SET NUM_SCANNER_THREADS=1") @@ -357,7 +365,7 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite): unique_database, test_file_vars=DEFAULT_VARS) @SkipIfLocal.hdfs_client - def test_partition_column_in_parquet_data_file(self, vector, unique_database): + def test_partition_column_in_parquet_data_file(self, unique_database): """IMPALA-11147: Test that runtime min/max filters still work on data files that contain the partitioning columns.""" tbl_name = "part_col_in_data_file" diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 85b53951e..825cb09fa 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -323,9 +323,10 @@ class TestUnmatchedSchema(ImpalaTestSuite): 'invalidate metadata {0}.jointbl_test'.format(unique_database), vector) else: + origin_db = self.get_db_name_from_format(vector.get_table_format()) self.execute_query_using_client(self.client, - "create external table {0}.jointbl_test like jointbl".format( - unique_database), vector) + "create external table {0}.jointbl_test like {1}.jointbl".format( + unique_database, origin_db), vector) # Update the location of the new table to point the same location as the old table location = self._get_table_location('jointbl', vector) @@ -439,8 +440,8 @@ class TestHdfsScannerSkew(ImpalaTestSuite): super(TestHdfsScannerSkew, cls).add_test_dimensions() add_mandatory_exec_option(cls, 'mt_dop', 2) cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format in ('text') and - v.get_value('table_format').compression_codec == 'none') + v.get_value('table_format').file_format in ('text') + and v.get_value('table_format').compression_codec == 'none') @SkipIfLocal.multiple_impalad @pytest.mark.execute_serially @@ -1499,9 +1500,11 @@ class TestTextScanRangeLengths1(ImpalaTestSuite): # the count(col) result because if the scan range is split right after the escape # char, the escape char has no effect because we cannot scan backwards to the # previous scan range. - for t in self.ESCAPE_TABLE_LIST: - expected_result = self.client.execute("select count(col) from " + t) - result = self.client.execute("select count(*) from " + t) + db_name = self.get_db_name_from_format(vector.get_table_format()) + for table_name in self.ESCAPE_TABLE_LIST: + fq_table_name = db_name + '.' + table_name + expected_result = self.client.execute("select count(col) from " + fq_table_name) + result = self.client.execute("select count(*) from " + fq_table_name) assert result.data == expected_result.data @@ -1548,10 +1551,10 @@ class TestTextSplitDelimiters(ImpalaTestSuite): in the main text parsing algorithm. The second scan range exercises correctly identifying a split delimiter as the first in a scan range.""" DEFAULT_IO_BUFFER_SIZE = 8 * 1024 * 1024 - data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" + # first scan range - 'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n" + - 'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" + # second scan range - 'b' * (DEFAULT_IO_BUFFER_SIZE - 1)) + data = ('a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" # first scan range + + 'b' * (DEFAULT_IO_BUFFER_SIZE - 3) + "\r\n" + + 'a' * (DEFAULT_IO_BUFFER_SIZE - 1) + "\r\n" # second scan range + + 'b' * (DEFAULT_IO_BUFFER_SIZE - 1)) assert len(data) == DEFAULT_IO_BUFFER_SIZE * 4 max_scan_range_length = DEFAULT_IO_BUFFER_SIZE * 2 @@ -1594,8 +1597,8 @@ class TestTextScanRangeLengths2(ImpalaTestSuite): def add_test_dimensions(cls): super(TestTextScanRangeLengths2, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'text' and - v.get_value('table_format').compression_codec in ['none', 'gzip']) + v.get_value('table_format').file_format == 'text' + and v.get_value('table_format').compression_codec in ['none', 'gzip']) def test_text_scanner_with_header(self, vector, unique_database): # Remove to allow .test file to set abort_on_error. diff --git a/tests/query_test/test_tablesample.py b/tests/query_test/test_tablesample.py index 08e73ccb7..3cab33be3 100644 --- a/tests/query_test/test_tablesample.py +++ b/tests/query_test/test_tablesample.py @@ -18,12 +18,12 @@ # Tests the TABLESAMPLE clause. from __future__ import absolute_import, division, print_function -import pytest import subprocess from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.test_vector import ImpalaTestDimension + class TestTableSample(ImpalaTestSuite): @classmethod def get_workload(cls): @@ -36,19 +36,23 @@ class TestTableSample(ImpalaTestSuite): cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('filtered', *[True, False])) # Tablesample is only supported on HDFS tables. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format != 'kudu' and - v.get_value('table_format').file_format != 'hbase') + v.get_value('table_format').file_format != 'kudu' + and v.get_value('table_format').file_format != 'hbase') if cls.exploration_strategy() != 'exhaustive': # Cut down on core testing time by limiting the file formats. cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('table_format').file_format == 'parquet' or - v.get_value('table_format').file_format == 'text') + v.get_value('table_format').file_format == 'parquet' + or v.get_value('table_format').file_format == 'text') def test_tablesample(self, vector): # Do not use a .test to avoid making this test flaky. # 1. Queries without the repeatable clause are non-deterministic. # 2. The results of queries without a repeatable clause could change due to # changes in data loading that affect the number or size of files. + with self.change_database(self.client, vector.get_value('table_format')): + self.__run_tablesample(vector) + + def __run_tablesample(self, vector): repeatable = vector.get_value('repeatable') filtered = vector.get_value('filtered') @@ -56,7 +60,6 @@ class TestTableSample(ImpalaTestSuite): if filtered: where_clause = "where month between 1 and 6" - ImpalaTestSuite.change_database(self.client, vector.get_value('table_format')) result = self.client.execute("select count(*) from alltypes %s" % where_clause) baseline_count = int(result.data[0]) prev_count = None diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py index 5957ce757..fc1ed4e8b 100644 --- a/tests/util/cancel_util.py +++ b/tests/util/cancel_util.py @@ -102,9 +102,17 @@ def cancel_query_and_validate_state(client, query, exec_option, table_format, statement will be executed to cancel and close the query, instead of sending the Thrift RPCs directly. """ + assert table_format is not None + with ImpalaTestSuite.change_database(client, table_format): + __run_cancel_query_and_validate_state( + client, query, exec_option, cancel_delay, join_before_close, + use_kill_query_statement) + + +def __run_cancel_query_and_validate_state(client, query, exec_option, + cancel_delay, join_before_close=False, use_kill_query_statement=False): assert not (join_before_close and use_kill_query_statement) - if table_format: ImpalaTestSuite.change_database(client, table_format) if exec_option: client.set_configuration(exec_option) handle = client.execute_async(query) @@ -160,8 +168,8 @@ def cancel_query_and_validate_state(client, query, exec_option, table_format, # that the synchronous phase of query unregistration has finished and the profile # is final. profile = client.get_runtime_profile(handle) - if ("- Completed admission: " in profile and - ("- First row fetched:" in profile or "- Request finished:" in profile)): + if ("- Completed admission: " in profile + and ("- First row fetched:" in profile or "- Request finished:" in profile)): # TotalBytesRead is a sentinel that will only be created if ComputeQuerySummary() # has been run by the cancelling thread. assert "- TotalBytesRead:" in profile, profile
