This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit ea8f74a6acc1a60f7efb2bd8790bc5ec22c84c0f Author: Riza Suminto <[email protected]> AuthorDate: Tue Mar 11 09:34:05 2025 -0700 IMPALA-13861: Standardize workload management tests This patch standardizes tests against workload management tables (sys.impala_query_log and sys.impala_query_live) to use a common superclass named WorkloadManagementTestSuite. The setup_method of this superclass waits for workload management init completion (wait_for_wm_init_complete()), while the teardown_method waits until impala-server.completed-queries.queued metric reaches 0 (wait_for_wm_idle()). test_query_log.py and test_workload_mgmt_sql_details.py are refactored to extend from WorkloadManagementTestSuite. Tests to assert the query log table flush behavior are grouped together in TestQueryLogTableFlush. test_workload_mgmt_sql_details.py::TestWorkloadManagementSQLDetails now uses 1 minicluster instance for all tests. test_workload_mgmt_init.py does not extend from WorkloadManagementTestSuite because it is testing cluster start and restart scenario. This patch only adds wait_for_wm_idle() at teardown_method where it make sense to do so. test_query_live.py does not extend from WorkloadManagementTestSuite because most of its test method require long --query_log_write_interval_s so that DML queries from workload management worker does not disturb sys.impala_query_live. workload_mgmt parameter in CustomClusterTestSuite.with_args() is standardized to setup appropriate default flags in cluster_setup() rather than passing it down to _start_impala_cluster(): IMPALAD_ARGS --enable_workload_mgmt=true --query_log_write_interval_s=1 \ --shutdown_grace_period_s=0 --shutdown_deadline_s=60 and CATALOGD_ARGS --enable_workload_mgmt=true Note that IMPALAD_ARGS and CATALOGD_ARGS flags added by workload_mgmt and impalad_graceful_shutdown parameter are still overridable to different value by explicitly adding it in the impalad_args and catalogd_args parameters. Setting workload_mgmt=True now automatically enables graceful shutdown for the test. Thus, impalad_graceful_shutdown=True is now removed. With beeswax protocol deprecated, this patch also changes the protocol under test from beeswax to hs2. TestQueryLogTableBeeswax is now renamed to TestQueryLogTableBasic. Additionally, print total wait time in wait_for_metric_value(). Testing: - Run modified tests and pass. Change-Id: Iecf6452fa963304e263805ebeb017c843d17dd16 Reviewed-on: http://gerrit.cloudera.org:8080/22617 Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/service/workload-management-worker.cc | 1 + tests/common/custom_cluster_test_suite.py | 64 +-- tests/common/impala_cluster.py | 20 +- tests/common/impala_service.py | 22 +- tests/common/wm_test_suite.py | 40 ++ tests/custom_cluster/test_admission_controller.py | 23 +- tests/custom_cluster/test_query_live.py | 103 ++-- tests/custom_cluster/test_query_log.py | 553 +++++++++------------ tests/custom_cluster/test_workload_mgmt_init.py | 263 +++++----- .../test_workload_mgmt_sql_details.py | 70 +-- 10 files changed, 576 insertions(+), 583 deletions(-) diff --git a/be/src/service/workload-management-worker.cc b/be/src/service/workload-management-worker.cc index c4a849a00..6ceb85532 100644 --- a/be/src/service/workload-management-worker.cc +++ b/be/src/service/workload-management-worker.cc @@ -502,6 +502,7 @@ void ImpalaServer::ShutdownWorkloadManagement() { // chance to flush the in-memory queue to the completed queries table. if (workload_mgmt_state_ == WorkloadManagementState::RUNNING) { workload_mgmt_state_ = WorkloadManagementState::SHUTTING_DOWN; + LOG(INFO) << "Workload management is shutting down"; _completed_queries_cv.notify_all(); _completed_queries_shutdown_cv.wait_for(l, chrono::seconds(FLAGS_query_log_shutdown_timeout_s), diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index 30cba724a..e6a1957ef 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -30,7 +30,6 @@ import subprocess from glob import glob from impala_py_lib.helpers import find_all_files, is_core_dump from re import search -from signal import SIGRTMIN from subprocess import check_call from tests.common.file_utils import cleanup_tmp_test_dir, make_tmp_test_dir from tests.common.impala_test_suite import ImpalaTestSuite @@ -69,11 +68,6 @@ IMPALAD_TIMEOUT_S = 'impalad_timeout_s' EXPECT_CORES = 'expect_cores' # Additional arg to determine whether we should reset the Ranger policy repository. RESET_RANGER = 'reset_ranger' -# By default, Impalad processes are left running at the end of the test. The next test run -# terminates the processes when calling the `bin/start-impala-cluster.py`` script. Setting -# this to `True` causes Impalad processes to be sent the SIGRTMIN signal at the end of the -# test which runs the Impalad shutdown steps instead of abruptly ending the process. -IMPALAD_GRACEFUL_SHUTDOWN = 'impalad_graceful_shutdown' # Decorator key to support temporary dir creation. TMP_DIR_PLACEHOLDERS = 'tmp_dir_placeholders' # Indicates if a failure to start is acceptable or not. If set to `True` and the cluster @@ -95,6 +89,15 @@ DEFAULT_STATESTORE_ARGS = ('--statestore_update_frequency_ms=50 ' '--statestore_priority_update_frequency_ms=50 ' '--statestore_heartbeat_frequency_ms=50') +# Additional flags appended to impalad_args if workload_mgmt=True. +# IMPALA-13051: Add faster default graceful shutdown options before processing +# explicit args. Impala doesn't start graceful shutdown until the grace period +# has passed, and most tests that use graceful shutdown are testing flushing +# the query log, which doesn't start until after the grace period has passed. +WORKLOAD_MGMT_IMPALAD_FLAGS = ( + '--enable_workload_mgmt=true --query_log_write_interval_s=1 ' + '--shutdown_grace_period_s=0 --shutdown_deadline_s=60 ') + class CustomClusterTestSuite(ImpalaTestSuite): """Runs tests with a custom Impala cluster. There are two modes: @@ -155,7 +158,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): impala_log_dir=None, hive_conf_dir=None, cluster_size=None, num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None, impalad_timeout_s=None, expect_cores=None, reset_ranger=False, - impalad_graceful_shutdown=False, tmp_dir_placeholders=[], + tmp_dir_placeholders=[], expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False, workload_mgmt=False): """Records arguments to be passed to a cluster by adding them to the decorated @@ -191,8 +194,6 @@ class CustomClusterTestSuite(ImpalaTestSuite): args[EXPECT_CORES] = expect_cores if reset_ranger: args[RESET_RANGER] = True - if impalad_graceful_shutdown: - args[IMPALAD_GRACEFUL_SHUTDOWN] = True if tmp_dir_placeholders: args[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders if expect_startup_fail: @@ -280,18 +281,17 @@ class CustomClusterTestSuite(ImpalaTestSuite): del cls.TMP_DIRS[name] cls.make_tmp_dir(name) - if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False): - # IMPALA-13051: Add faster default graceful shutdown options before processing - # explicit args. Impala doesn't start graceful shutdown until the grace period has - # passed, and most tests that use graceful shutdown are testing flushing the query - # log, which doesn't start until after the grace period has passed. - cluster_args.append( - "--impalad=--shutdown_grace_period_s=0 --shutdown_deadline_s=15") impala_daemons = [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, ADMISSIOND_ARGS] for arg in (impala_daemons + [JVM_ARGS]): val = '' + if args.get(WORKLOAD_MGMT, False): + if arg == CATALOGD_ARGS: + val += '--enable_workload_mgmt=true ' + if arg == IMPALAD_ARGS: + val += WORKLOAD_MGMT_IMPALAD_FLAGS if arg in impala_daemons and disable_log_buffering: val += '--logbuflevel=-1 ' + # append this the very last so it can override anything above. if arg in args: val += (args[arg] if arg not in ACCEPT_FORMATTING else args[arg].format(**cls.TMP_DIRS)) @@ -339,10 +339,6 @@ class CustomClusterTestSuite(ImpalaTestSuite): if IMPALAD_TIMEOUT_S in args: kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S] - if args.get(WORKLOAD_MGMT, False): - if IMPALAD_ARGS or CATALOGD_ARGS in args: - kwargs[WORKLOAD_MGMT] = True - if args.get(EXPECT_CORES, False): # Make a note of any core files that already exist possible_cores = find_all_files('*core*') @@ -384,11 +380,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): @classmethod def cluster_teardown(cls, name, args): - if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False): - for impalad in cls.cluster.impalads: - impalad.kill(SIGRTMIN) - for impalad in cls.cluster.impalads: - impalad.wait_for_exit() + if args.get(WORKLOAD_MGMT, False): + cls.cluster.graceful_shutdown_impalads() cls.clear_tmp_dirs() @@ -437,6 +430,20 @@ class CustomClusterTestSuite(ImpalaTestSuite): backoff=1), "Did not find table '{}' in local catalog of coordinator " \ "'{}:{}'.".format(tbl, coord.hostname, coord.get_webserver_port()) + def wait_for_wm_idle(self, coordinators=[], timeout_s=370): + """Wait until workload management worker in each coordinator becomes idle. + The 'timeout_s' is applied on each coordinator wait. Default 'timeout_s' is: + query_log_dml_exec_timeout_s * query_log_max_insert_attempts + 10 = 370 + It is intentionally set high to avoid HMS deadlock in the event of slow insert + followed by ungraceful shutdown (IMPALA-13842).""" + if not coordinators: + # Refresh cluster in case cluster has changed. + self.cluster.refresh() + coordinators = self.cluster.get_all_coordinators() + for coord in coordinators: + coord.service.wait_for_metric_value( + "impala-server.completed-queries.queued", 0, timeout=timeout_s, interval=1) + @classmethod def _stop_impala_cluster(cls): # TODO: Figure out a better way to handle case where processes are just starting @@ -532,8 +539,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): impalad_timeout_s=60, ignore_pid_on_log_rotation=False, wait_for_backends=True, - log_symlinks=False, - workload_mgmt=False): + log_symlinks=False): cls.impala_log_dir = impala_log_dir # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that # certain custom startup arguments work and we want to keep them independent of dev @@ -563,10 +569,6 @@ class CustomClusterTestSuite(ImpalaTestSuite): cmd.append("--impalad_args=--use_local_catalog=1") cmd.append("--catalogd_args=--catalog_topic_mode=minimal") - if workload_mgmt: - cmd.append("--impalad_args=--enable_workload_mgmt=true") - cmd.append("--catalogd_args=--enable_workload_mgmt=true") - default_query_option_kvs = [] # Put any defaults first, then any arguments after that so they can override defaults. if default_query_options is not None: diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index f08c9fcff..7979c4754 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -29,7 +29,7 @@ import time import requests from getpass import getuser from random import choice -from signal import SIGKILL +from signal import SIGKILL, SIGRTMIN from subprocess import check_call, check_output from time import sleep @@ -279,6 +279,14 @@ class ImpalaCluster(object): if msg: raise RuntimeError(msg) + def graceful_shutdown_impalads(self): + # Refresh cluster in case cluster has changed. + self.refresh() + for impalad in self.impalads: + impalad.kill(SIGRTMIN) + for impalad in self.impalads: + impalad.wait_for_exit() + def __build_impala_process_lists(self): """ Gets all the running Impala procs (with start arguments) on the machine. @@ -500,9 +508,9 @@ class Process(object): LOG.info("Starting container: {0}".format(self.container_id)) check_call(["docker", "container", "start", self.container_id]) - def restart(self): + def restart(self, signal=SIGKILL): """Kills and restarts the process""" - self.kill() + self.kill(signal=signal) self.wait_for_exit() self.start() @@ -632,8 +640,8 @@ class ImpaladProcess(BaseImpalaProcess): """Waits for client ports to be opened. Assumes that the webservice ports are open.""" start_time = time.time() LOG.info( - "Waiting for coordinator client services " + - "- hs2 port: %d hs2-http port: %d beeswax port: %d", + "Waiting for coordinator client services " + + "- hs2 port: %d hs2-http port: %d beeswax port: %d", self.service.hs2_port, self.service.hs2_http_port, self.service.beeswax_port) while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS: beeswax_port_is_open = self.service.beeswax_port_is_open() @@ -752,7 +760,7 @@ def find_user_processes(binaries): except KeyError as e: if "uid not found" not in str(e): raise - except psutil.NoSuchProcess as e: + except psutil.NoSuchProcess: # Ignore the case when a process no longer exists. pass diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 65f0ea0f7..49c059c5b 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -116,7 +116,8 @@ class BaseImpalaService(object): def wait_for_metric_value(self, metric_name, expected_value, timeout=10, interval=1, allow_greater=False): start_time = time() - while (time() - start_time < timeout): + total_wait = 0 + while (total_wait < timeout): LOG.info("Getting metric: %s from %s:%s" % (metric_name, self.hostname, self.webserver_port)) value = None @@ -127,21 +128,22 @@ class BaseImpalaService(object): # if allow_greater is True we wait until the metric value becomes >= the expected # value. - if allow_greater: - if value >= expected_value: - LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, value)) - return value - elif value == expected_value: - LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, value)) + if (value == expected_value) or (allow_greater and value >= expected_value): + LOG.info("Metric '{0}' has reached desired value: {1}. total_wait: {2}s".format( + metric_name, value, total_wait)) return value else: - LOG.info("Waiting for metric value '%s'%s%s. Current value: %s" % - (metric_name, '>=' if allow_greater else '=', expected_value, value)) + LOG.info("Waiting for metric value '{0}'{1}{2}. Current value: {3}. " + "total_wait: {4}s".format( + metric_name, ('>=' if allow_greater else '='), expected_value, value, + total_wait)) LOG.info("Sleeping %ds before next retry." % interval) sleep(interval) + total_wait = time() - start_time LOG.info("Metric {0} did not reach value {1} in {2}s. Actual value was '{3}'. " - "Failing...".format(metric_name, expected_value, timeout, value)) + "total_wait: {4}s. Failing...".format( + metric_name, expected_value, timeout, value, total_wait)) self.__metric_timeout_assert(metric_name, expected_value, timeout, value) def __request_minidump(self, pid): diff --git a/tests/common/wm_test_suite.py b/tests/common/wm_test_suite.py new file mode 100644 index 000000000..e2152ece5 --- /dev/null +++ b/tests/common/wm_test_suite.py @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite + + +class WorkloadManagementTestSuite(CustomClusterTestSuite): + """Base class for tests that exercise workload management behavior. + The setup_method waits for workload management init completion, while the + teardown_method waits until impala-server.completed-queries.queued metric + reaches 0.""" + + def setup_method(self, method): + super(WorkloadManagementTestSuite, self).setup_method(method) + self.wait_for_wm_init_complete() + + def teardown_method(self, method): + self.wait_for_wm_idle() + super(WorkloadManagementTestSuite, self).teardown_method(method) + + def get_client(self, protocol): + """Retrieves the default Impala client for the specified protocol. This client is + automatically closed after the test completes.""" + return self.default_impala_client(protocol=protocol) diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index dff30367c..d7673e580 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -38,6 +38,7 @@ from tests.common.custom_cluster_test_suite import ( ADMISSIOND_ARGS, IMPALAD_ARGS, START_ARGS, + WORKLOAD_MGMT_IMPALAD_FLAGS, CustomClusterTestSuite) from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties from tests.common.impala_connection import ( @@ -1901,7 +1902,7 @@ class TestAdmissionController(TestAdmissionControllerBase): workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"), - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_happy_path(self, vector): """Asserts queries set to use an only coordinators request pool run all the fragment instances on all coordinators and no executors even if the query includes @@ -1926,7 +1927,7 @@ class TestAdmissionController(TestAdmissionControllerBase): workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"), - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_no_executors(self, vector): """Asserts queries that only select from the active queries table run even if no executors are running.""" @@ -1938,7 +1939,7 @@ class TestAdmissionController(TestAdmissionControllerBase): workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"), - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_one_quiescing_coord(self, vector): """Asserts quiescing coordinators do not run fragment instances for queries that only select from the active queries table.""" @@ -1960,7 +1961,7 @@ class TestAdmissionController(TestAdmissionControllerBase): workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"), - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_one_coord_terminate(self, vector): """Asserts a force terminated coordinator is eventually removed from the list of active coordinators.""" @@ -2003,7 +2004,7 @@ class TestAdmissionController(TestAdmissionControllerBase): workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"), - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_add_coord(self, vector): self.wait_for_wm_init_complete() @@ -2011,15 +2012,16 @@ class TestAdmissionController(TestAdmissionControllerBase): cluster_size = len(self.cluster.impalads) self._start_impala_cluster( options=[ - "--impalad_args=s{}".format(impalad_admission_ctrl_config_args( + "--impalad_args={0} {1}".format( + WORKLOAD_MGMT_IMPALAD_FLAGS, + impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-onlycoords.xml", llama_site_file="llama-site-onlycoords.xml"))], add_impalads=True, cluster_size=6, num_coordinators=1, use_exclusive_coordinators=True, - wait_for_backends=False, - workload_mgmt=True) + wait_for_backends=False) self.assert_log_contains("impalad_node" + str(cluster_size), "INFO", "join Impala Service pool") @@ -2035,8 +2037,7 @@ class TestAdmissionController(TestAdmissionControllerBase): additional_args="--expected_executor_group_sets=root.group-set-small:1," "root.group-set-large:2 " "--num_expected_executors=2 --executor_groups=coordinator"), - impalad_graceful_shutdown=True, - statestored_args=_STATESTORED_ARGS) + statestored_args=_STATESTORED_ARGS) def test_coord_only_pool_exec_groups(self, vector): """Asserts queries using only coordinators request pools can run successfully when executor groups are configured.""" @@ -2075,8 +2076,6 @@ class TestAdmissionController(TestAdmissionControllerBase): expected_subscribers=expected_subscribers, expected_num_impalads=expected_num_impalads) self.__run_assert_systables_query(vector) - # Refresh cluster to include those two new impalad for graceful shutdown. - self.cluster.refresh() class TestAdmissionControllerWithACService(TestAdmissionController): diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py index 8b3892aac..5f72337f0 100644 --- a/tests/custom_cluster/test_query_live.py +++ b/tests/custom_cluster/test_query_live.py @@ -23,13 +23,20 @@ from getpass import getuser from signal import SIGRTMIN from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_cluster import DEFAULT_KRPC_PORT -from tests.common.impala_connection import FINISHED, PENDING +from tests.common.impala_connection import ERROR, FINISHED, PENDING +from tests.common.test_vector import HS2 from tests.util.workload_management import assert_query, redaction_rules_file from time import sleep class TestQueryLive(CustomClusterTestSuite): - """Tests to assert the query live table is correctly populated.""" + """Tests to assert the query live table is correctly populated. + This test class does not extend from WorkloadManagementTestSuite due to long + --query_log_write_interval_s requirement in most test method.""" + + @classmethod + def default_test_protocol(cls): + return HS2 def setup_method(self, method): super(TestQueryLive, self).setup_method(method) @@ -74,9 +81,9 @@ class TestQueryLive(CustomClusterTestSuite): assert False, "did not find host {}".format(host) assert len(actual_hosts) == 0, "did not find all expected hosts" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, disable_log_buffering=True) def test_query_live_hs2(self): """Asserts the query live table shows and allows filtering queries. Uses the hs2 @@ -88,9 +95,9 @@ class TestQueryLive(CustomClusterTestSuite): assert_query('sys.impala_query_live', self.hs2_client, 'test_query_live', result1.runtime_profile) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, disable_log_buffering=True) def test_query_live(self): """Asserts the query live table shows and allows filtering queries. Uses the default @@ -186,11 +193,11 @@ class TestQueryLive(CustomClusterTestSuite): self.execute_query_expect_success(self.client, 'drop table sys.impala_query_live') # Must come directly after "drop table sys.impala_query_live" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live " "--use_local_catalog=true", - catalogd_args="--enable_workload_mgmt " - "--catalog_topic_mode=minimal", + catalogd_args="--catalog_topic_mode=minimal", + workload_mgmt=True, default_query_options=[ ('default_transactional_type', 'insert_only')], disable_log_buffering=True) @@ -203,11 +210,11 @@ class TestQueryLive(CustomClusterTestSuite): result.runtime_profile) self.assert_describe_extended() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live " "--use_local_catalog=true", - catalogd_args="--enable_workload_mgmt " - "--catalog_topic_mode=minimal", + catalogd_args="--catalog_topic_mode=minimal", + workload_mgmt=True, disable_log_buffering=True) def test_local_catalog(self): """Asserts the query live table works with local catalog mode.""" @@ -216,12 +223,11 @@ class TestQueryLive(CustomClusterTestSuite): assert_query('sys.impala_query_live', self.client, 'test_query_live', result.runtime_profile) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live " "--redaction_rules_file={}" .format(redaction_rules_file()), - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_redaction(self): """Asserts the query live table table redacts the statement.""" @@ -231,9 +237,9 @@ class TestQueryLive(CustomClusterTestSuite): assert_query('sys.impala_query_live', self.client, 'test_query_live', result.runtime_profile) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, disable_log_buffering=True) def test_alter(self): """Asserts alter works on query live table.""" @@ -267,9 +273,9 @@ class TestQueryLive(CustomClusterTestSuite): select_column2 = self.execute_query('select * from sys.impala_query_live') assert len(select_column2.data) > 1 - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, cluster_size=3, num_exclusive_coordinators=2, disable_log_buffering=True) @@ -301,17 +307,21 @@ class TestQueryLive(CustomClusterTestSuite): client2.close_query(handle2) client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, cluster_size=3, num_exclusive_coordinators=2, disable_log_buffering=True) def test_executor_groups(self): """Asserts scans are performed only on coordinators with multiple executor groups.""" # Add a (non-dedicated) coordinator and executor in a different executor group. - self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra', - '--impalad_args=--cluster_id=test_query_live'], + self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra ' + '--enable_workload_mgmt ' + '--query_log_write_interval_s=300 ' + '--shutdown_grace_period_s=0 ' + '--shutdown_deadline_s=15 ' + '--cluster_id=test_query_live '], cluster_size=1, add_executors=True, expected_num_impalads=4) @@ -322,9 +332,9 @@ class TestQueryLive(CustomClusterTestSuite): assert len(result.data) == 1 self.assert_only_coordinators(result.runtime_profile, coords=[0, 1], execs=[2, 3]) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, disable_log_buffering=True) def test_query_entries_are_unique(self): """Asserts queries in the query live table are unique.""" @@ -364,9 +374,9 @@ class TestQueryLive(CustomClusterTestSuite): client2.close_query(handle2) client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " + "--cluster_id=test_query_live ", + workload_mgmt=True, cluster_size=3, num_exclusive_coordinators=2, disable_log_buffering=True) @@ -377,10 +387,13 @@ class TestQueryLive(CustomClusterTestSuite): handle = self.execute_query_async(query, query_options={ 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'}) - # Wait for query to compile and assign ranges, then kill impalad during debug delay. + # Wait for query to compile and assign ranges, then kill impalad + # in non-graceful manner during debug delay. self.client.wait_for_impala_state(handle, PENDING, 3) self.cluster.impalads[1].kill() + # Wait for query to pass admission control before fetching. + self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60) result = self.client.fetch(query, handle) assert len(result.data) == 1 expected_message = 'is no longer available for system table scan assignment' @@ -389,9 +402,9 @@ class TestQueryLive(CustomClusterTestSuite): self.close_query(handle) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + workload_mgmt=True, disable_log_buffering=True) def test_shutdown_coordinator(self): """Asserts query fails if a coordinator disappears after scheduling. Depends on @@ -404,19 +417,24 @@ class TestQueryLive(CustomClusterTestSuite): self.client.wait_for_impala_state(handle, PENDING, 3) # Ensure enough time for scheduling to assign ranges. sleep(1) - # Kill impalad during debug delay. + # Kill impalad in non-graceful manner during debug delay. self.cluster.impalads[1].kill() try: + # Wait for query to pass admission control before fetching. + self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60) self.client.fetch(query, handle) assert False, "fetch should fail" except Exception as e: assert "Network error: Client connection negotiation failed" in str(e) - # Beeswax client closes the query on failure. + # client closes the query on failure. - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " + "--cluster_id=test_query_live " + # Override flag set by + # workload_mgmt arg. + "--shutdown_grace_period_s=120", + workload_mgmt=True, disable_log_buffering=True) def test_graceful_shutdown_coordinator(self): """Asserts query succeeds if another coordinator is shutdown gracefully after @@ -445,11 +463,8 @@ class TestQueryLive(CustomClusterTestSuite): assert len(shutdown.data) == 2 self.assert_impalads(shutdown.runtime_profile, present=[0, 2], absent=[1]) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_live ", + workload_mgmt=True, cluster_size=3, num_exclusive_coordinators=2, disable_log_buffering=True) @@ -482,9 +497,9 @@ class TestQueryLive(CustomClusterTestSuite): # in scans. self.assert_fragment_instances(result.runtime_profile, [3, 2, 2]) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--cluster_id=test_query_live", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300 " + "--cluster_id=test_query_live ", + workload_mgmt=True, cluster_size=3, num_exclusive_coordinators=2, disable_log_buffering=True) diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index c5d2a9ffa..afe1a6501 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -33,7 +33,9 @@ from tests.common.cluster_config import impalad_admission_ctrl_config_args from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_connection import FINISHED from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT +from tests.common.test_dimensions import hs2_client_protocol_dimension from tests.common.test_vector import ImpalaTestDimension +from tests.common.wm_test_suite import WorkloadManagementTestSuite from tests.util.retry import retry from tests.util.workload_management import ( assert_query, @@ -42,42 +44,14 @@ from tests.util.workload_management import ( redaction_rules_file) -class TestQueryLogTableBase(CustomClusterTestSuite): - """Base class for all query log tests. Sets up the tests to use the Beeswax and HS2 - client protocols.""" - - PROTOCOL_BEESWAX = "beeswax" - PROTOCOL_HS2 = "hs2" - - @classmethod - def add_test_dimensions(cls): - super(TestQueryLogTableBase, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', - cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2)) - - def setup_method(self, method): - super(TestQueryLogTableBase, self).setup_method(method) - self.wait_for_wm_init_complete() - - def get_client(self, protocol): - """Retrieves the default Impala client for the specified protocol. This client is - automatically closed after the test completes.""" - if protocol == self.PROTOCOL_BEESWAX: - return self.client - elif protocol == self.PROTOCOL_HS2: - return self.hs2_client - raise Exception("unknown protocol: {0}".format(protocol)) - - -class TestQueryLogTableBeeswax(TestQueryLogTableBase): +class TestQueryLogTableBasic(WorkloadManagementTestSuite): """Tests to assert the query log table is correctly populated when using the Beeswax client protocol.""" @classmethod def add_test_dimensions(cls): - super(TestQueryLogTableBeeswax, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('protocol') == 'beeswax') + super(TestQueryLogTableBasic, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) @classmethod def get_workload(self): @@ -85,17 +59,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): MAX_SQL_PLAN_LEN = 2000 LOG_DIR_MAX_WRITES = 'max_attempts_exceeded' - FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + str(int(time())) - FLUSH_MAX_RECORDS_QUERY_COUNT = 30 - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_max_select " + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_max_select " "--query_log_max_sql_length={0} " "--query_log_max_plan_length={0}" .format(MAX_SQL_PLAN_LEN), - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_lower_max_sql_plan(self, vector): """Asserts that length limits on the sql and plan columns in the completed queries @@ -128,11 +97,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): assert len(data[1]) == self.MAX_SQL_PLAN_LEN - data[1].count("\n") - 1, \ "incorrect plan length" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_max_select", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_max_select", + workload_mgmt=True, disable_log_buffering=True) def test_sql_plan_too_long(self, vector): """Asserts that very long queries have their corresponding plan and sql columns @@ -164,13 +130,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): # Newline characters are not counted by Impala's length function. assert len(data[1]) == 16777216 - data[1].count("\n") - 1 - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_1 " + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_1 " "--query_log_size=0 " "--query_log_size_in_bytes=0", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_no_query_log(self, vector): """Asserts queries are written to the completed queries table when the in-memory @@ -194,13 +157,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): assert len(actual.data) == 1 assert actual.data[0] == select_sql - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_2 " + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2 " "--always_use_data_cache " "--data_cache={query_data_cache}:5GB", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, cluster_size=1, tmp_dir_placeholders=['query_data_cache'], disable_log_buffering=True) @@ -240,11 +200,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): assert data["BYTES_READ_CACHE_TOTAL"] != "0", "bytes read from cache total was " \ "zero, test did not assert anything" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=5", + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=5", impala_log_dir=("{" + LOG_DIR_MAX_WRITES + "}"), - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, tmp_dir_placeholders=[LOG_DIR_MAX_WRITES], disable_log_buffering=True) def test_max_attempts_exceeded(self, vector): @@ -290,80 +248,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): assert impalad.service.get_metric_value( "impala-server.completed-queries.written") == 0 - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_max_queued={0} " - "--query_log_write_interval_s=9999 " - "--cluster_id={1} " - "--query_log_expression_limit=5000" - .format(FLUSH_MAX_RECORDS_QUERY_COUNT, - FLUSH_MAX_RECORDS_CLUSTER_ID), - catalogd_args="--enable_workload_mgmt", - default_query_options=[ - ('statement_expression_limit', 1024)], - impalad_graceful_shutdown=True, - disable_log_buffering=True) - def test_flush_on_queued_count_exceeded(self, vector): - """Asserts that queries that have completed are written to the query log table when - the maximum number of queued records is reached. Also verifies that writing - completed queries is not limited by default statement_expression_limit.""" - - impalad = self.cluster.get_first_impalad() - client = self.get_client(vector.get_value('protocol')) - - rand_str = "{0}-{1}".format(vector.get_value('protocol'), time()) - - test_sql = "select '{0}','{1}'".format(rand_str, - self.FLUSH_MAX_RECORDS_CLUSTER_ID) - test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format( - rand_str, QUERY_TBL_LOG, test_sql.replace("'", r"\'")) - - for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT): - res = client.execute(test_sql) - assert res.success - - # Running this query results in the number of queued completed queries to exceed - # the max and thus all completed queries will be written to the query log table. - res = client.execute(test_sql_assert) - assert res.success - assert 1 == len(res.data) - assert "0" == res.data[0].split("\t")[1] - - # Wait until the completed queries have all been written out because the max queued - # count was exceeded. - impalad.service.wait_for_metric_value( - "impala-server.completed-queries.max-records-writes", 1, 60) - self.cluster.get_first_impalad().service.wait_for_metric_value( - "impala-server.completed-queries.written", - self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60) - - # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + QUERY_TBL_LOG) - - # This query will remain queued due to the long write interval and max queued - # records limit not being reached. - res = client.execute(r"select count(*) from {0} where sql like 'select \'{1}\'%'" - .format(QUERY_TBL_LOG, rand_str)) - assert res.success - assert 1 == len(res.data) - assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0] - impalad.service.wait_for_metric_value( - "impala-server.completed-queries.queued", 2, 60) - - assert impalad.service.get_metric_value( - "impala-server.completed-queries.max-records-writes") == 1 - assert impalad.service.get_metric_value( - "impala-server.completed-queries.scheduled-writes") == 0 - assert impalad.service.get_metric_value("impala-server.completed-queries.written") \ - == self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1 - assert impalad.service.get_metric_value( - "impala-server.completed-queries.queued") == 2 - - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1", - cluster_size=3, + @CustomClusterTestSuite.with_args(cluster_size=3, num_exclusive_coordinators=2, - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_dedicated_coordinator_no_mt_dop(self, vector): """Asserts the values written to the query log table match the values from the @@ -386,12 +273,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1", - cluster_size=3, + @CustomClusterTestSuite.with_args(cluster_size=3, num_exclusive_coordinators=2, - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_dedicated_coordinator_with_mt_dop(self, vector): """Asserts the values written to the query log table match the values from the @@ -416,26 +300,24 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--redaction_rules_file={}" + @CustomClusterTestSuite.with_args(impalad_args="--redaction_rules_file={}" .format(redaction_rules_file()), - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) - def test_redaction(self): + def test_redaction(self, vector): """Asserts the query log table redacts the statement.""" - result = self.client.execute( + client = self.get_client(vector.get_value('protocol')) + result = client.execute( "select *, 'supercalifragilisticexpialidocious' from functional.alltypes", fetch_profile_after_close=True) assert result.success self.cluster.get_first_impalad().service.wait_for_metric_value( "impala-server.completed-queries.written", 1, 60) - assert_query(QUERY_TBL_LOG, self.client, raw_profile=result.runtime_profile) + assert_query(QUERY_TBL_LOG, client, raw_profile=result.runtime_profile) -class TestQueryLogOtherTable(TestQueryLogTableBase): +class TestQueryLogOtherTable(WorkloadManagementTestSuite): """Tests to assert that query_log_table_name works with non-default value.""" OTHER_TBL = "completed_queries_table_{0}".format(int(time())) @@ -443,19 +325,15 @@ class TestQueryLogOtherTable(TestQueryLogTableBase): @classmethod def add_test_dimensions(cls): super(TestQueryLogOtherTable, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('protocol') == 'beeswax') + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--blacklisted_dbs=information_schema " + @CustomClusterTestSuite.with_args(impalad_args="--blacklisted_dbs=information_schema " "--query_log_table_name={0}" .format(OTHER_TBL), - catalogd_args="--enable_workload_mgmt " - "--blacklisted_dbs=information_schema " + catalogd_args="--blacklisted_dbs=information_schema " "--query_log_table_name={0}" .format(OTHER_TBL), - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_renamed_log_table(self, vector): """Asserts that the completed queries table can be renamed.""" @@ -479,7 +357,7 @@ class TestQueryLogOtherTable(TestQueryLogTableBase): client.execute("drop table {0}.{1} purge".format(WM_DB, self.OTHER_TBL)) -class TestQueryLogTableHS2(TestQueryLogTableBase): +class TestQueryLogTableHS2(WorkloadManagementTestSuite): """Tests to assert the query log table is correctly populated when using the HS2 client protocol.""" @@ -488,16 +366,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): @classmethod def add_test_dimensions(cls): super(TestQueryLogTableHS2, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value('protocol') == 'hs2') + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id={}" + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id={}" .format(HS2_OPERATIONS_CLUSTER_ID), - catalogd_args="--enable_workload_mgmt", cluster_size=2, - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_hs2_metadata_operations(self, vector): """Certain HS2 operations appear to Impala as a special kind of query. Specifically, @@ -620,12 +494,9 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): assert assert_results.success assert assert_results.data[0] == "1" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_mult", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_mult", cluster_size=2, - impalad_graceful_shutdown=True, + workload_mgmt=True, disable_log_buffering=True) def test_query_multiple_tables(self, vector): """Asserts the values written to the query log table match the values from the @@ -650,11 +521,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_3", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_3", + workload_mgmt=True, disable_log_buffering=True) def test_insert_select(self, vector, unique_database, unique_name): @@ -684,117 +552,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=15", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, - disable_log_buffering=True) - def test_flush_on_interval(self, vector): - """Asserts that queries that have completed are written to the query log table - after the specified write interval elapses.""" - - client = self.get_client(vector.get_value('protocol')) - - query_count = 10 - - for i in range(query_count): - res = client.execute("select sleep(1000)") - assert res.success - - # Wait for at least one iteration of the workload management processing loop to write - # to the completed queries table. - self.cluster.get_first_impalad().service.wait_for_metric_value( - "impala-server.completed-queries.written", query_count, 20) - - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=9999 " - "--shutdown_grace_period_s=0 " - "--shutdown_deadline_s=15 " - "--debug_actions=" - "WM_SHUTDOWN_DELAY:SLEEP@5000", - catalogd_args="--enable_workload_mgmt", - disable_log_buffering=True) - def test_flush_on_shutdown(self, vector): - """Asserts that queries that have completed but are not yet written to the query - log table are flushed to the table before the coordinator exits. Graceful shutdown - for 2nd coordinator not needed because query_log_write_interval_s is very long.""" - - impalad = self.cluster.get_first_impalad() - client = self.get_client(vector.get_value('protocol')) - - # Execute sql statements to ensure all get written to the query log table. - sql1 = client.execute("select 1") - assert sql1.success - - sql2 = client.execute("select 2") - assert sql2.success - - sql3 = client.execute("select 3") - assert sql3.success - - impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, - 60) - - impalad.kill_and_wait_for_exit(SIGRTMIN) - self.assert_impalad_log_contains("INFO", r'Workload management shutdown successful', - timeout_s=60) - - client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) - - try: - def assert_func(): - results = client2.execute("select query_id,sql from {0} where query_id in " - "('{1}','{2}','{3}')".format(QUERY_TBL_LOG, - sql1.query_id, sql2.query_id, sql3.query_id)) - - return len(results.data) == 3 - - assert retry(func=assert_func, max_attempts=5, sleep_time_s=3) - finally: - client2.close() - - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=9999 " - "--shutdown_grace_period_s=0 " - "--query_log_shutdown_timeout_s=3 " - "--shutdown_deadline_s=15 " - "--debug_actions=" - "WM_SHUTDOWN_DELAY:SLEEP@10000", - catalogd_args="--enable_workload_mgmt", - disable_log_buffering=True) - def test_shutdown_flush_timed_out(self, vector): - """Asserts that queries that have completed but are not yet written to the query - log table are lost if the completed queries queue drain takes too long and that - the coordinator logs the estimated number of queries lost.""" - - impalad = self.cluster.get_first_impalad() - client = self.get_client(vector.get_value('protocol')) - - # Execute sql statements to ensure all get written to the query log table. - sql1 = client.execute("select 1") - assert sql1.success - - sql2 = client.execute("select 2") - assert sql2.success - - sql3 = client.execute("select 3") - assert sql3.success - - impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, - 60) - - impalad.kill_and_wait_for_exit(SIGRTMIN) - self.assert_impalad_log_contains("INFO", r"Workload management shutdown timed out. " - r"Up to '3' queries may have been lost", - timeout_s=60) - @CustomClusterTestSuite.with_args( - impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=3 " + impalad_args="--query_log_write_interval_s=3 " "--query_log_dml_exec_timeout_s=1 " "--debug_actions=INTERNAL_SERVER_AFTER_SUBMIT:SLEEP@2000", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, cluster_size=1, disable_log_buffering=True) + workload_mgmt=True, + cluster_size=1, disable_log_buffering=True) def test_exec_timeout(self, vector): """Asserts the --query_log_dml_exec_timeout_s startup flag is added to the workload management insert DML and the DML will be cancelled when its execution time exceeds @@ -820,15 +583,17 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): "limit of 1s000ms".format(self.insert_query_id)) -class TestQueryLogTableAll(TestQueryLogTableBase): +class TestQueryLogTableAll(WorkloadManagementTestSuite): """Tests to assert the query log table is correctly populated when using all the client protocols.""" - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_2", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @classmethod + def add_test_dimensions(cls): + super(TestQueryLogTableAll, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) + + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2", + workload_mgmt=True, disable_log_buffering=True) def test_ddl(self, vector, unique_database, unique_name): """Asserts the values written to the query log table match the values from the @@ -851,11 +616,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_3", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_3", + workload_mgmt=True, disable_log_buffering=True) def test_dml(self, vector, unique_database, unique_name): """Asserts the values written to the query log table match the values from the @@ -885,11 +647,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase): finally: client2.close() - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_2", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2", + workload_mgmt=True, disable_log_buffering=True) def test_invalid_query(self, vector): """Asserts correct values are written to the completed queries table for a failed @@ -918,10 +677,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): assert_query(query_tbl=QUERY_TBL_LOG, client=client, expected_cluster_id="test_query_hist_2", impalad=impalad, query_id=result.data[0]) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(workload_mgmt=True, disable_log_buffering=True) def test_ignored_sqls_not_written(self, vector): """Asserts that expected queries are not written to the query log table.""" @@ -1013,10 +769,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase): assert self.cluster.get_first_impalad().service.get_metric_value( "impala-server.completed-queries.failure") == 0 - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + @CustomClusterTestSuite.with_args(workload_mgmt=True, disable_log_buffering=True) def test_sql_injection_attempts(self, vector): client = self.get_client(vector.get_value('protocol')) @@ -1089,21 +842,19 @@ class TestQueryLogTableAll(TestQueryLogTableBase): .format(esc_sql, test_case) -class TestQueryLogTableBufferPool(TestQueryLogTableBase): +class TestQueryLogTableBufferPool(WorkloadManagementTestSuite): """Base class for all query log tests that set the buffer pool query option.""" @classmethod def add_test_dimensions(cls): super(TestQueryLogTableBufferPool, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('buffer_pool_limit', None, "14.97MB")) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " - "--query_log_write_interval_s=1 " - "--cluster_id=test_query_hist_1 " + @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_1 " "--scratch_dirs={scratch_dir}:5G", - catalogd_args="--enable_workload_mgmt", - impalad_graceful_shutdown=True, + workload_mgmt=True, tmp_dir_placeholders=['scratch_dir'], disable_log_buffering=True) def test_select(self, vector): @@ -1152,16 +903,21 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase): "was zero, test did not assert anything" -class TestQueryLogQueuedQueries(CustomClusterTestSuite): +class TestQueryLogQueuedQueries(WorkloadManagementTestSuite): """Simulates a cluster that is under load and has queries that are queueing in admission control.""" + @classmethod + def add_test_dimensions(cls): + super(TestQueryLogQueuedQueries, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) + @CustomClusterTestSuite.with_args( impalad_args=impalad_admission_ctrl_config_args( fs_allocation_file="fair-scheduler-one-query.xml", llama_site_file="llama-site-one-query.xml", additional_args="--query_log_write_interval_s=5"), - impalad_graceful_shutdown=True, workload_mgmt=True, + workload_mgmt=True, num_exclusive_coordinators=1, cluster_size=2, default_query_options=[('fetch_rows_timeout_ms', '1000')]) def test_query_queued(self): @@ -1233,6 +989,183 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite): backoff=1) +class TestQueryLogTableFlush(CustomClusterTestSuite): + """Tests to assert the query log table flush correctly under some shutdown + scenario. They are separated from others because test may stop impalad individually. + This test class does not extend from WorkloadManagementTestSuite because it must not + wait until workload management is idle to begin graceful shutdown.""" + + FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_flush_max_records_" + str(int(time())) + FLUSH_MAX_RECORDS_QUERY_COUNT = 30 + + def setup_method(self, method): + super(TestQueryLogTableFlush, self).setup_method(method) + self.wait_for_wm_init_complete() + + @CustomClusterTestSuite.with_args(impalad_args="--query_log_max_queued={0} " + "--query_log_write_interval_s=9999 " + "--cluster_id={1} " + "--query_log_expression_limit=5000" + .format(FLUSH_MAX_RECORDS_QUERY_COUNT, + FLUSH_MAX_RECORDS_CLUSTER_ID), + default_query_options=[ + ('statement_expression_limit', 1024)], + workload_mgmt=True, + disable_log_buffering=True) + def test_flush_on_queued_count_exceeded(self): + """Asserts that queries that have completed are written to the query log table when + the maximum number of queued records is reached. Also verifies that writing + completed queries is not limited by default statement_expression_limit.""" + + impalad = self.cluster.get_first_impalad() + client = impalad.service.create_hs2_client() + + rand_str = "{0}-{1}".format(client.get_test_protocol(), time()) + + test_sql = "select '{0}','{1}'".format(rand_str, + self.FLUSH_MAX_RECORDS_CLUSTER_ID) + test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format( + rand_str, QUERY_TBL_LOG, test_sql.replace("'", r"\'")) + + for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT): + res = client.execute(test_sql) + assert res.success + + # Running this query results in the number of queued completed queries to exceed + # the max and thus all completed queries will be written to the query log table. + res = client.execute(test_sql_assert) + assert res.success + assert 1 == len(res.data) + assert "0" == res.data[0].split("\t")[1] + + # Wait until the completed queries have all been written out because the max queued + # count was exceeded. + impalad.service.wait_for_metric_value( + "impala-server.completed-queries.max-records-writes", 1, 60) + self.cluster.get_first_impalad().service.wait_for_metric_value( + "impala-server.completed-queries.written", + self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60) + + # Force Impala to process the inserts to the completed queries table. + client.execute("refresh " + QUERY_TBL_LOG) + + # This query will remain queued due to the long write interval and max queued + # records limit not being reached. + res = client.execute(r"select count(*) from {0} where sql like 'select \'{1}\'%'" + .format(QUERY_TBL_LOG, rand_str)) + assert res.success + assert 1 == len(res.data) + assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0] + impalad.service.wait_for_metric_value( + "impala-server.completed-queries.queued", 2, 60) + + assert impalad.service.get_metric_value( + "impala-server.completed-queries.max-records-writes") == 1 + assert impalad.service.get_metric_value( + "impala-server.completed-queries.scheduled-writes") == 0 + assert impalad.service.get_metric_value("impala-server.completed-queries.written") \ + == self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1 + assert impalad.service.get_metric_value( + "impala-server.completed-queries.queued") == 2 + + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=15", + workload_mgmt=True, + disable_log_buffering=True) + def test_flush_on_interval(self): + """Asserts that queries that have completed are written to the query log table + after the specified write interval elapses.""" + + impalad = self.cluster.get_first_impalad() + client = impalad.service.create_hs2_client() + + query_count = 10 + + for i in range(query_count): + res = client.execute("select sleep(1000)") + assert res.success + + # Wait for at least one iteration of the workload management processing loop to write + # to the completed queries table. + self.cluster.get_first_impalad().service.wait_for_metric_value( + "impala-server.completed-queries.written", query_count, 20) + + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999 " + "--shutdown_grace_period_s=0 " + "--shutdown_deadline_s=15 " + "--debug_actions=" + "WM_SHUTDOWN_DELAY:SLEEP@5000", + workload_mgmt=True, + disable_log_buffering=True) + def test_flush_on_shutdown(self): + """Asserts that queries that have completed but are not yet written to the query + log table are flushed to the table before the coordinator exits. Graceful shutdown + for 2nd coordinator not needed because query_log_write_interval_s is very long.""" + + impalad = self.cluster.get_first_impalad() + client = impalad.service.create_hs2_client() + + # Execute sql statements to ensure all get written to the query log table. + sql1 = client.execute("select 1") + assert sql1.success + + sql2 = client.execute("select 2") + assert sql2.success + + sql3 = client.execute("select 3") + assert sql3.success + + impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, + 60) + + impalad.kill_and_wait_for_exit(SIGRTMIN) + self.assert_impalad_log_contains("INFO", r'Workload management shutdown successful', + timeout_s=60) + + with self.cluster.impalads[1].service.create_hs2_client() as client2: + def assert_func(): + results = client2.execute("select query_id,sql from {0} where query_id in " + "('{1}','{2}','{3}')".format(QUERY_TBL_LOG, + sql1.query_id, sql2.query_id, sql3.query_id)) + + return len(results.data) == 3 + + assert retry(func=assert_func, max_attempts=5, sleep_time_s=3) + + @CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999 " + "--shutdown_grace_period_s=0 " + "--query_log_shutdown_timeout_s=3 " + "--shutdown_deadline_s=15 " + "--debug_actions=" + "WM_SHUTDOWN_DELAY:SLEEP@10000", + workload_mgmt=True, + disable_log_buffering=True) + def test_shutdown_flush_timed_out(self): + """Asserts that queries that have completed but are not yet written to the query + log table are lost if the completed queries queue drain takes too long and that + the coordinator logs the estimated number of queries lost.""" + + impalad = self.cluster.get_first_impalad() + client = impalad.service.create_hs2_client() + + # Execute sql statements to ensure all get written to the query log table. + sql1 = client.execute("select 1") + assert sql1.success + + sql2 = client.execute("select 2") + assert sql2.success + + sql3 = client.execute("select 3") + assert sql3.success + + impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, + 60) + + impalad.kill_and_wait_for_exit(SIGRTMIN) + self.assert_impalad_log_contains("INFO", r"Workload management shutdown timed out. " + r"Up to '3' queries may have been lost", + timeout_s=60) + + # Helper function to determine if a query from the debug UI is a workload management # insert DML. def _is_insert_query(query): diff --git a/tests/custom_cluster/test_workload_mgmt_init.py b/tests/custom_cluster/test_workload_mgmt_init.py index 76c16d7f2..45ef09bd5 100644 --- a/tests/custom_cluster/test_workload_mgmt_init.py +++ b/tests/custom_cluster/test_workload_mgmt_init.py @@ -24,7 +24,11 @@ from subprocess import CalledProcessError from logging import getLogger from SystemTables.ttypes import TQueryTableColumn -from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.custom_cluster_test_suite import ( + WORKLOAD_MGMT_IMPALAD_FLAGS, + CustomClusterTestSuite) +from tests.common.test_dimensions import hs2_client_protocol_dimension +from tests.common.test_vector import HS2 from tests.util.workload_management import ( assert_query, WM_DB, @@ -35,27 +39,33 @@ from tests.util.workload_management import ( LOG = getLogger(__name__) QUERY_TBL_ALL = "{},{}".format(QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME) +# Latest workload_mgmt_schema_version. +LATEST_SCHEMA = "1.2.0" class TestWorkloadManagementInitBase(CustomClusterTestSuite): - """Defines common setup and methods for all workload management init tests.""" - - LATEST_SCHEMA = "1.2.0" + """Defines common setup and methods for all workload management init tests. + This test class does not extend WorkloadManagementTestSuite because its subclasses + define its own setup_method and teardown_method.""" @classmethod def get_workload(self): return 'functional-query' + @classmethod + def default_test_protocol(cls): + return HS2 + @classmethod def add_test_dimensions(cls): super(TestWorkloadManagementInitBase, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('protocol') == 'beeswax') + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) def setup_method(self, method): super(TestWorkloadManagementInitBase, self).setup_method(method) - def restart_cluster(self, vector, schema_version="", wait_for_init_complete=True, + def restart_cluster(self, schema_version="", wait_for_init_complete=True, cluster_size=3, additional_impalad_opts="", wait_for_backends=True, additional_catalogd_opts="", expect_startup_err=False, log_symlinks=False): """Wraps the existing custom cluster _start_impala_cluster function to restart the @@ -64,7 +74,8 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): function blocks until the workload management init process completes. If additional_impalad_opts is specified, that string is appended to the impala_args startup flag.""" - coord_opts = "--impalad_args=--enable_workload_mgmt --logbuflevel=-1 " + coord_opts = "--impalad_args=--logbuflevel=-1 {} ".format( + WORKLOAD_MGMT_IMPALAD_FLAGS) coord_opts += additional_impalad_opts catalog_opts = "--catalogd_args=--enable_workload_mgmt --logbuflevel=-1 " @@ -123,7 +134,7 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite): table. The regex is passed the fully qualified table name using python string substitution.""" for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): - self.assert_catalogd_log_contains("INFO", line_regex.format(table)) + self.assert_catalogd_log_contains(level, line_regex.format(table)) def check_schema(self, schema_ver, vector, multiple_impalad=False): """Asserts that all workload management tables have the correct columns and are at the @@ -143,55 +154,57 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): super(TestWorkloadManagementInitWait, self).setup_method(method) self.wait_for_wm_init_complete() + def teardown_method(self, method): + self.wait_for_wm_idle() + super(TestWorkloadManagementInitWait, self).teardown_method(method) + @CustomClusterTestSuite.with_args( - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", + workload_mgmt=True, disable_log_buffering=True) def test_no_upgrade(self, vector): """Tests that no upgrade happens when starting a cluster where the workload management tables are already at the latest version.""" - self.restart_cluster(vector, schema_version=self.LATEST_SCHEMA, log_symlinks=True) - self.check_schema(self.LATEST_SCHEMA, vector) + self.restart_cluster(schema_version=LATEST_SCHEMA, log_symlinks=True) + self.check_schema(LATEST_SCHEMA, vector) self.assert_catalogd_log_contains("INFO", r"Workload management table .*? will be " r"upgraded", expected_count=0) - @CustomClusterTestSuite.with_args(cluster_size=10, disable_log_buffering=True, - log_symlinks=True, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_schema_version=1.0.0 " + @CustomClusterTestSuite.with_args( + cluster_size=10, disable_log_buffering=True, + log_symlinks=True, workload_mgmt=True, + impalad_args="--workload_mgmt_schema_version=1.0.0", + catalogd_args="--workload_mgmt_schema_version=1.0.0 " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_0_0(self, vector): """Asserts that workload management tables are properly created on version 1.0.0 using a 10 node cluster when no tables exist.""" self.check_schema("1.0.0", vector, multiple_impalad=True) - @CustomClusterTestSuite.with_args(cluster_size=10, disable_log_buffering=True, - log_symlinks=True, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_schema_version=1.1.0 " + @CustomClusterTestSuite.with_args( + cluster_size=10, disable_log_buffering=True, + log_symlinks=True, workload_mgmt=True, + impalad_args="--workload_mgmt_schema_version=1.1.0", + catalogd_args="--workload_mgmt_schema_version=1.1.0 " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_1_0(self, vector): """Asserts that workload management tables are properly created on version 1.1.0 using a 10 node cluster when no tables exist.""" self.check_schema("1.1.0", vector, multiple_impalad=True) - @CustomClusterTestSuite.with_args(cluster_size=10, disable_log_buffering=True, - log_symlinks=True, - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) + @CustomClusterTestSuite.with_args( + cluster_size=10, disable_log_buffering=True, + log_symlinks=True, workload_mgmt=True, + catalogd_args="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_on_version_1_2_0(self, vector): """Asserts that workload management tables are properly created on the latest version using a 10 node cluster when no tables exist.""" self.check_schema("1.2.0", vector, multiple_impalad=True) - @CustomClusterTestSuite.with_args(cluster_size=1, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_schema_version=1.0.0 " + @CustomClusterTestSuite.with_args( + cluster_size=1, workload_mgmt=True, + impalad_args="--workload_mgmt_schema_version=1.0.0", + catalogd_args="--workload_mgmt_schema_version=1.0.0 " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_0_0_to_1_1_0(self, vector): @@ -203,8 +216,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_log_contains("catalogd", "WARNING", r"Target schema version '1.0.0' is " r"not the latest schema version '\d+\.\d+\.\d+'") - self.restart_cluster(vector, schema_version="1.1.0", cluster_size=1, - log_symlinks=True) + self.restart_cluster(schema_version="1.1.0", cluster_size=1, log_symlinks=True) # Assert the upgrade process ran. self.assert_catalogd_all_tables(r"Workload management table '{}' is at version " @@ -212,10 +224,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.check_schema("1.1.0", vector) - @CustomClusterTestSuite.with_args(cluster_size=1, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.1.0", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_schema_version=1.1.0 " + @CustomClusterTestSuite.with_args( + cluster_size=1, workload_mgmt=True, + impalad_args="--workload_mgmt_schema_version=1.1.0", + catalogd_args="--workload_mgmt_schema_version=1.1.0 " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_1_0_to_1_2_0(self, vector): @@ -227,8 +239,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_log_contains("catalogd", "WARNING", r"Target schema version '1.1.0' is " r"not the latest schema version '\d+\.\d+\.\d+'") - self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1, - log_symlinks=True) + self.restart_cluster(schema_version="1.2.0", cluster_size=1, log_symlinks=True) # Assert the upgrade process ran. self.assert_catalogd_all_tables(r"Workload management table '{}' is at version " @@ -236,10 +247,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.check_schema("1.2.0", vector) - @CustomClusterTestSuite.with_args(cluster_size=1, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=1.0.0", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_schema_version=1.0.0 " + @CustomClusterTestSuite.with_args( + cluster_size=1, workload_mgmt=True, + impalad_args="--workload_mgmt_schema_version=1.0.0", + catalogd_args="--workload_mgmt_schema_version=1.0.0 " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL), disable_log_buffering=True) def test_upgrade_1_0_0_to_1_2_0(self, vector): @@ -251,8 +262,7 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_log_contains("catalogd", "WARNING", r"Target schema version '1.0.0' is " r"not the latest schema version '\d+\.\d+\.\d+'") - self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1, - log_symlinks=True) + self.restart_cluster(schema_version="1.2.0", cluster_size=1, log_symlinks=True) # Assert the upgrade process ran. self.assert_catalogd_all_tables(r"Workload management table '{}' is at version " @@ -260,20 +270,21 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.check_schema("1.2.0", vector) - @CustomClusterTestSuite.with_args(cluster_size=1, impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt", disable_log_buffering=True) + @CustomClusterTestSuite.with_args( + cluster_size=1, workload_mgmt=True, disable_log_buffering=True) def test_log_table_newer_schema_version(self, vector): """Asserts a catalog startup flag version that is older than the workload management table schema version will write only the fields associated with the startup flag version.""" - self.restart_cluster(vector, schema_version="1.0.0", cluster_size=1, - log_symlinks=True, additional_impalad_opts="--query_log_write_interval_s=15") + self.restart_cluster( + schema_version="1.0.0", cluster_size=1, log_symlinks=True, + additional_impalad_opts="--query_log_write_interval_s=15") self.assert_catalogd_log_contains("WARNING", "Target schema version '1.0.0' is not " - "the latest schema version '{}'".format(self.LATEST_SCHEMA)) + "the latest schema version '{}'".format(LATEST_SCHEMA)) # The workload management tables will be on the latest schema version. - self.check_schema(self.LATEST_SCHEMA, vector) + self.check_schema(LATEST_SCHEMA, vector) # The workload management processing will be running on schema version 1.0.0. self.assert_catalogd_all_tables(r"Target schema version '1.0.0' of the '{}' table is " @@ -309,11 +320,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): TQueryTableColumn.COORDINATOR_SLOTS: "NULL", TQueryTableColumn.EXECUTOR_SLOTS: "NULL"}) - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, - log_symlinks=True, - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt " - "--query_log_table_props=\"foo=bar,foo1=bar1\" " + @CustomClusterTestSuite.with_args( + cluster_size=1, disable_log_buffering=True, + log_symlinks=True, workload_mgmt=True, + catalogd_args="--query_log_table_props=\"foo=bar,foo1=bar1\" " "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_table_with_custom_props(self): """Asserts that creating workload management tables with additional properties @@ -322,21 +332,20 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): self.assert_table_prop(QUERY_TBL_LOG, "foo", "bar") self.assert_table_prop(QUERY_TBL_LIVE, "foo", "bar") - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, - log_symlinks=True, - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) + @CustomClusterTestSuite.with_args( + cluster_size=1, disable_log_buffering=True, + log_symlinks=True, workload_mgmt=True, + catalogd_args="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) def test_create_from_scratch(self, vector): """Tests the conditions that exist when workload management is first started by deleteing the workload management tables and the sys db and restarting.""" assert self.client.execute("drop database {} cascade" .format(WM_DB)).success - self.restart_cluster(vector, log_symlinks=True) - self.check_schema(self.LATEST_SCHEMA, vector) + self.restart_cluster(log_symlinks=True) + self.check_schema(LATEST_SCHEMA, vector) - def _run_invalid_table_prop_test(self, table, prop_name, vector, expect_success=False): + def _run_invalid_table_prop_test(self, table, prop_name, expect_success=False): """Runs a test where one of the workload management schema version table properties on a workload management table has been reset to an invalid value.""" try: @@ -347,7 +356,8 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): "{}".format(table)) tmp_dir = self.get_tmp_dir('invalid_schema') - self.restart_cluster(vector, wait_for_init_complete=False, cluster_size=1, + self.restart_cluster( + wait_for_init_complete=False, cluster_size=1, wait_for_backends=False, expect_startup_err=True, log_symlinks=True, additional_catalogd_opts="--minidump_path={}".format(tmp_dir), additional_impalad_opts="--minidump_path={}".format(tmp_dir)) @@ -362,54 +372,59 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): assert len(os.listdir("{}/catalogd".format(tmp_dir))) == 0, \ "Found minidumps but none should exist." finally: - self.restart_cluster(vector, cluster_size=1, - additional_catalogd_opts="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL)) + self.restart_cluster( + cluster_size=1, + additional_catalogd_opts="--workload_mgmt_drop_tables={}".format( + QUERY_TBL_ALL)) - @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, - impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", - catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, workload_mgmt=True, + impalad_args="--minidump_path={invalid_schema}", + catalogd_args="--minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_schema_version_log_table_prop(self, vector): + def test_invalid_schema_version_log_table_prop(self): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_log table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(QUERY_TBL_LOG, "schema_version", vector, True) + self._run_invalid_table_prop_test(QUERY_TBL_LOG, "schema_version", True) - @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, - impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", - catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, workload_mgmt=True, + impalad_args="--minidump_path={invalid_schema}", + catalogd_args="--minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_wm_schema_version_log_table_prop(self, vector): + def test_invalid_wm_schema_version_log_table_prop(self): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_log table contains an invalid value.""" - self._run_invalid_table_prop_test(QUERY_TBL_LOG, "wm_schema_version", vector) + self._run_invalid_table_prop_test(QUERY_TBL_LOG, "wm_schema_version") - @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, - impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", - catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, workload_mgmt=True, + impalad_args="--minidump_path={invalid_schema}", + catalogd_args="--minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_schema_version_live_table_prop(self, vector): + def test_invalid_schema_version_live_table_prop(self): """Tests that startup succeeds when the 'schema_version' table property on the sys.impala_query_live table contains an invalid value but the wm_schema_version table property contains a valid value.""" - self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "schema_version", vector, True) + self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "schema_version", True) - @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, - impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}", - catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}", + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, workload_mgmt=True, + impalad_args="--minidump_path={invalid_schema}", + catalogd_args="--minidump_path={invalid_schema}", tmp_dir_placeholders=['invalid_schema'], disable_log_buffering=True) - def test_invalid_wm_schema_version_live_table_prop(self, vector): + def test_invalid_wm_schema_version_live_table_prop(self): """Tests that startup fails when the 'wm_schema_version' table property on the sys.impala_query_live table contains an invalid value.""" - self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "wm_schema_version", vector) + self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "wm_schema_version") - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt") + @CustomClusterTestSuite.with_args( + cluster_size=1, disable_log_buffering=True, workload_mgmt=True) def test_upgrade_to_latest_from_previous_binary(self, vector): """Simulated an upgrade situation from workload management tables created by previous builds of Impala.""" @@ -423,9 +438,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): create_sql = f.read() assert self.client.execute(create_sql).success - self.restart_cluster(vector, cluster_size=1, log_symlinks=True, - additional_impalad_opts="--query_log_write_interval_s=30") - self.check_schema(self.LATEST_SCHEMA, vector) + self.restart_cluster( + cluster_size=1, log_symlinks=True, + additional_impalad_opts="--query_log_write_interval_s=30") + self.check_schema(LATEST_SCHEMA, vector) # Run a query and ensure it does not populate fields from the latest schema. res = self.client.execute("select * from functional.alltypes") @@ -441,10 +457,9 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): "impala-server.completed-queries.written", 2, 60) assert_query(QUERY_TBL_LOG, self.client, impalad=impalad, query_id=res.query_id) - @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt") - def test_start_at_1_0_0(self, vector): + @CustomClusterTestSuite.with_args( + cluster_size=1, disable_log_buffering=True, workload_mgmt=True) + def test_start_at_1_0_0(self): """Tests the situation where workload management tables were created by the original workload management code, and the current code is started at workload management schema version 1.0.0 (even though that version is not the latest).""" @@ -458,8 +473,9 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): create_sql = f.read() assert self.client.execute(create_sql).success - self.restart_cluster(vector, schema_version="1.0.0", log_symlinks=True, - additional_impalad_opts="--query_log_write_interval_s=15") + self.restart_cluster( + schema_version="1.0.0", log_symlinks=True, + additional_impalad_opts="--query_log_write_interval_s=15") for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE): self.assert_table_prop(table, "schema_version", "1.0.0") @@ -491,11 +507,10 @@ class TestWorkloadManagementInitWait(TestWorkloadManagementInitBase): data = log_results.data[0].split("\t") assert len(data) == len(log_results.column_labels) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args( statestored_args="--use_subscriber_id_as_catalogd_priority=true", start_args="--enable_statestored_ha", - disable_log_buffering=True, log_symlinks=True) + disable_log_buffering=True, log_symlinks=True, workload_mgmt=True) def test_statestore_ha(self): """Asserts workload management initialization completes successfully when statestore ha is enabled.""" @@ -514,12 +529,17 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): def setup_method(self, method): super(TestWorkloadManagementInitNoWait, self).setup_method(method) - @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=3", - catalogd_args="--enable_workload_mgmt " - "--workload_mgmt_drop_tables={} " + def teardown_method(self, method): + self.wait_for_wm_idle() + super(TestWorkloadManagementInitNoWait, self).teardown_method(method) + + @CustomClusterTestSuite.with_args( + cluster_size=1, log_symlinks=True, + impalad_args="--query_log_write_interval_s=3", + catalogd_args="--workload_mgmt_drop_tables={} " "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000" .format(QUERY_TBL_ALL), + workload_mgmt=True, disable_log_buffering=True) def test_catalog_init_delay(self): # Workload management init is slightly delayed after catalogd startup, wait for the @@ -543,12 +563,14 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): assert res.success impalad.wait_for_metric_value("impala-server.completed-queries.written", 1, 15) - @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True, + @CustomClusterTestSuite.with_args( + cluster_size=1, expect_startup_fail=True, impalad_timeout_s=60, log_symlinks=True, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo " + impalad_args="--workload_mgmt_schema_version=foo " "--minidump_path={minidumps}", - catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo " + catalogd_args="--workload_mgmt_schema_version=foo " "--minidump_path={minidumps}", tmp_dir_placeholders=['minidumps'], + workload_mgmt=True, disable_log_buffering=True) def test_start_invalid_version(self): """Asserts that starting a cluster with an invalid workload management version @@ -563,10 +585,11 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True, impalad_timeout_s=60, log_symlinks=True, - impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=0.0.1 " + impalad_args="--workload_mgmt_schema_version=0.0.1 " "--minidump_path={minidumps}", - catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=0.0.1 " + catalogd_args="--workload_mgmt_schema_version=0.0.1 " "--minidump_path={minidumps}", tmp_dir_placeholders=['minidumps'], + workload_mgmt=True, disable_log_buffering=True) def test_start_unknown_version(self): """Asserts that starting a cluster with an unknown workload management version errors. @@ -579,7 +602,8 @@ class TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase): self.assert_catalogd_log_contains("FATAL", r"Workload management schema version " r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0', '1.2.0'$") - @CustomClusterTestSuite.with_args(start_args="--enable_catalogd_ha", + @CustomClusterTestSuite.with_args( + start_args="--enable_catalogd_ha", statestored_args="--use_subscriber_id_as_catalogd_priority=true", disable_log_buffering=True, log_symlinks=True) def test_catalog_ha_no_workload_mgmt(self): @@ -628,10 +652,14 @@ class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase): r"Skipping workload management initialization since catalogd HA is enabled and " r"this catalogd is not active", node_index=self.standby_catalog) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt", start_args="--enable_catalogd_ha", + def teardown_method(self, method): + self.wait_for_wm_idle() + super(TestWorkloadManagementCatalogHA, self).teardown_method(method) + + @CustomClusterTestSuite.with_args( + start_args="--enable_catalogd_ha", statestored_args="--use_subscriber_id_as_catalogd_priority=true", - disable_log_buffering=True, log_symlinks=True) + disable_log_buffering=True, log_symlinks=True, workload_mgmt=True) def test_catalog_ha_failover(self): """Asserts workload management initialization is not run a second time when catalogd failover happens.""" @@ -652,11 +680,10 @@ class TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase): self.assert_catalogd_log_contains("INFO", r"Starting workload management " r"initialization", expected_count=0, node_index=self.standby_catalog) - @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt", - catalogd_args="--enable_workload_mgmt", + @CustomClusterTestSuite.with_args( statestored_args="--use_subscriber_id_as_catalogd_priority=true", start_args="--enable_catalogd_ha --enable_statestored_ha", - disable_log_buffering=True, log_symlinks=True) + disable_log_buffering=True, log_symlinks=True, workload_mgmt=True) def test_catalog_statestore_ha(self): """Asserts workload management initialization is only done on the active catalogd when both catalog and statestore ha is enabled.""" diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py b/tests/custom_cluster/test_workload_mgmt_sql_details.py index cddb18fd5..7938fc03c 100644 --- a/tests/custom_cluster/test_workload_mgmt_sql_details.py +++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py @@ -21,11 +21,15 @@ import os from SystemTables.ttypes import TQueryTableColumn from tests.common.custom_cluster_test_suite import CustomClusterTestSuite -from test_query_log import TestQueryLogTableBase +from tests.common.test_dimensions import hs2_client_protocol_dimension +from tests.common.wm_test_suite import WorkloadManagementTestSuite from tests.util.workload_management import assert_csv_col, QUERY_TBL_LOG -class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): [email protected]_args( + cluster_size=1, disable_log_buffering=True, workload_mgmt=True, + impalad_args="--query_log_max_queued=1") +class TestWorkloadManagementSQLDetails(WorkloadManagementTestSuite): """Tests that ensure the workload management data describing the details of the sql statement values (columns tables_queried, select_columns, where_columns, join_columns, aggregate_columns, and orderby_columns) in the completed @@ -37,12 +41,7 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): @classmethod def add_test_dimensions(cls): super(TestWorkloadManagementSQLDetails, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_constraint(lambda v: - v.get_value("protocol") == "beeswax") - - def setup_method(self, method): - super(TestWorkloadManagementSQLDetails, self).setup_method(method) - self.wait_for_wm_init_complete() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) # Assert tables queried. def _assert_all(self, vector, query, expected_tables_queried, expected_select_cols, @@ -60,7 +59,7 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Wait for the query to be written to the completed queries table. self.cluster.get_first_impalad().service.wait_for_metric_value( - "impala-server.completed-queries.written", 1, 60) + "impala-server.completed-queries.queued", 0, 60) # Assert tables queried. assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.TABLES_QUERIED, res.query_id, @@ -101,10 +100,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): self._assert_all(vector, query_text, expected_tables_queried, expected_select, expected_where, expected_join, expected_aggregate, expected_orderby) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_1(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 1. @@ -134,10 +129,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["customer.c_customer_id"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_3(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 3. @@ -165,10 +156,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["date_dim.d_year", "store_sales.ss_ext_sales_price", "item.i_brand_id"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_51(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 51. @@ -198,10 +185,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["web_sales.ws_item_sk", "store_sales.ss_item_sk", "date_dim.d_date"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_62(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 62. @@ -232,10 +215,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["warehouse.w_warehouse_name", "ship_mode.sm_type", "web_site.web_name"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_64(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 64. @@ -303,10 +282,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["item.i_product_name", "store.s_store_name", "store_sales.ss_wholesale_cost"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_66(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 66. @@ -349,10 +324,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): # Expected order by columns. ["warehouse.w_warehouse_name"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_tpcds_75(self, vector): """Asserts the values inserted into the workload management table columns match what is expected for TPCDS query 75. @@ -404,10 +375,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): "web_returns.wr_return_amt", "web_returns.wr_return_quantity", "web_sales.ws_ext_sales_price", "web_sales.ws_quantity"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_sql_having(self, vector): """Asserts the values inserted into the workload management table columns match what is expected when the sql contains both a group by and a having clause that @@ -422,10 +389,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): ["store_sales.ss_item_sk", "store_sales.ss_quantity"], ["store_sales.ss_quantity"]) - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_complex_types(self, vector): """Asserts that queries using subtypes of complex type columns only report the column name and do not also report the subtype.""" @@ -439,10 +402,6 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): [], "functional") - @CustomClusterTestSuite.with_args( - cluster_size=1, impalad_graceful_shutdown=True, disable_log_buffering=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") def test_arithmetic(self, vector): """Asserts that arithmetic operations record the columns used in them.""" self._assert_all(vector, "select (tinyint_col + smallint_col), sum(float_col) from " @@ -456,10 +415,17 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase): [], "functional") + +class TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite): + """Variant of TestWorkloadManagementSQLDetails using calcite planner.""" + + @classmethod + def add_test_dimensions(cls): + super(TestWorkloadManagementSQLDetailsCalcite, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension()) + @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true", - cluster_size=1, impalad_graceful_shutdown=True, - impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1", - catalogd_args="--enable_workload_mgmt") + cluster_size=1, workload_mgmt=True) def test_tpcds_8_decimal(self, vector): """Runs the tpcds-decimal_v2-q8 query using the calcite planner and asserts the query completes successfully. See IMPALA-13505 for details on why this query in
