This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit ea8f74a6acc1a60f7efb2bd8790bc5ec22c84c0f
Author: Riza Suminto <[email protected]>
AuthorDate: Tue Mar 11 09:34:05 2025 -0700

    IMPALA-13861: Standardize workload management tests
    
    This patch standardizes tests against workload management tables
    (sys.impala_query_log and sys.impala_query_live) to use a common
    superclass named WorkloadManagementTestSuite. The setup_method of this
    superclass waits for workload management init completion
    (wait_for_wm_init_complete()), while the teardown_method waits until
    impala-server.completed-queries.queued metric reaches
    0 (wait_for_wm_idle()).
    
    test_query_log.py and test_workload_mgmt_sql_details.py are refactored
    to extend from WorkloadManagementTestSuite. Tests to assert the query
    log table flush behavior are grouped together in TestQueryLogTableFlush.
    test_workload_mgmt_sql_details.py::TestWorkloadManagementSQLDetails now
    uses 1 minicluster instance for all tests.
    
    test_workload_mgmt_init.py does not extend from
    WorkloadManagementTestSuite because it is testing cluster start and
    restart scenario. This patch only adds wait_for_wm_idle() at
    teardown_method where it make sense to do so.
    
    test_query_live.py does not extend from WorkloadManagementTestSuite
    because most of its test method require long
    --query_log_write_interval_s so that DML queries from workload
    management worker does not disturb sys.impala_query_live.
    
    workload_mgmt parameter in CustomClusterTestSuite.with_args() is
    standardized to setup appropriate default flags in cluster_setup()
    rather than passing it down to _start_impala_cluster():
    IMPALAD_ARGS
      --enable_workload_mgmt=true --query_log_write_interval_s=1 \
      --shutdown_grace_period_s=0 --shutdown_deadline_s=60
    and CATALOGD_ARGS
      --enable_workload_mgmt=true
    
    Note that IMPALAD_ARGS and CATALOGD_ARGS flags added by workload_mgmt
    and impalad_graceful_shutdown parameter are still overridable to
    different value by explicitly adding it in the impalad_args and
    catalogd_args parameters. Setting workload_mgmt=True now automatically
    enables graceful shutdown for the test. Thus,
    impalad_graceful_shutdown=True is now removed.
    
    With beeswax protocol deprecated, this patch also changes the protocol
    under test from beeswax to hs2. TestQueryLogTableBeeswax is now renamed
    to TestQueryLogTableBasic.
    
    Additionally, print total wait time in wait_for_metric_value().
    
    Testing:
    - Run modified tests and pass.
    
    Change-Id: Iecf6452fa963304e263805ebeb017c843d17dd16
    Reviewed-on: http://gerrit.cloudera.org:8080/22617
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/service/workload-management-worker.cc       |   1 +
 tests/common/custom_cluster_test_suite.py          |  64 +--
 tests/common/impala_cluster.py                     |  20 +-
 tests/common/impala_service.py                     |  22 +-
 tests/common/wm_test_suite.py                      |  40 ++
 tests/custom_cluster/test_admission_controller.py  |  23 +-
 tests/custom_cluster/test_query_live.py            | 103 ++--
 tests/custom_cluster/test_query_log.py             | 553 +++++++++------------
 tests/custom_cluster/test_workload_mgmt_init.py    | 263 +++++-----
 .../test_workload_mgmt_sql_details.py              |  70 +--
 10 files changed, 576 insertions(+), 583 deletions(-)

diff --git a/be/src/service/workload-management-worker.cc 
b/be/src/service/workload-management-worker.cc
index c4a849a00..6ceb85532 100644
--- a/be/src/service/workload-management-worker.cc
+++ b/be/src/service/workload-management-worker.cc
@@ -502,6 +502,7 @@ void ImpalaServer::ShutdownWorkloadManagement() {
   // chance to flush the in-memory queue to the completed queries table.
   if (workload_mgmt_state_ == WorkloadManagementState::RUNNING) {
     workload_mgmt_state_ = WorkloadManagementState::SHUTTING_DOWN;
+    LOG(INFO) << "Workload management is shutting down";
     _completed_queries_cv.notify_all();
     _completed_queries_shutdown_cv.wait_for(l,
         chrono::seconds(FLAGS_query_log_shutdown_timeout_s),
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index 30cba724a..e6a1957ef 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -30,7 +30,6 @@ import subprocess
 from glob import glob
 from impala_py_lib.helpers import find_all_files, is_core_dump
 from re import search
-from signal import SIGRTMIN
 from subprocess import check_call
 from tests.common.file_utils import cleanup_tmp_test_dir, make_tmp_test_dir
 from tests.common.impala_test_suite import ImpalaTestSuite
@@ -69,11 +68,6 @@ IMPALAD_TIMEOUT_S = 'impalad_timeout_s'
 EXPECT_CORES = 'expect_cores'
 # Additional arg to determine whether we should reset the Ranger policy 
repository.
 RESET_RANGER = 'reset_ranger'
-# By default, Impalad processes are left running at the end of the test. The 
next test run
-# terminates the processes when calling the `bin/start-impala-cluster.py`` 
script. Setting
-# this to `True` causes Impalad processes to be sent the SIGRTMIN signal at 
the end of the
-# test which runs the Impalad shutdown steps instead of abruptly ending the 
process.
-IMPALAD_GRACEFUL_SHUTDOWN = 'impalad_graceful_shutdown'
 # Decorator key to support temporary dir creation.
 TMP_DIR_PLACEHOLDERS = 'tmp_dir_placeholders'
 # Indicates if a failure to start is acceptable or not. If set to `True` and 
the cluster
@@ -95,6 +89,15 @@ DEFAULT_STATESTORE_ARGS = 
('--statestore_update_frequency_ms=50 '
                            '--statestore_priority_update_frequency_ms=50 '
                            '--statestore_heartbeat_frequency_ms=50')
 
+# Additional flags appended to impalad_args if workload_mgmt=True.
+# IMPALA-13051: Add faster default graceful shutdown options before processing
+# explicit args. Impala doesn't start graceful shutdown until the grace period
+# has passed, and most tests that use graceful shutdown are testing flushing
+# the query log, which doesn't start until after the grace period has passed.
+WORKLOAD_MGMT_IMPALAD_FLAGS = (
+  '--enable_workload_mgmt=true --query_log_write_interval_s=1 '
+  '--shutdown_grace_period_s=0 --shutdown_deadline_s=60 ')
+
 
 class CustomClusterTestSuite(ImpalaTestSuite):
   """Runs tests with a custom Impala cluster. There are two modes:
@@ -155,7 +158,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       impala_log_dir=None, hive_conf_dir=None, cluster_size=None,
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
       impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
-      impalad_graceful_shutdown=False, tmp_dir_placeholders=[],
+      tmp_dir_placeholders=[],
       expect_startup_fail=False, disable_log_buffering=False, 
log_symlinks=False,
       workload_mgmt=False):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
@@ -191,8 +194,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       args[EXPECT_CORES] = expect_cores
     if reset_ranger:
       args[RESET_RANGER] = True
-    if impalad_graceful_shutdown:
-      args[IMPALAD_GRACEFUL_SHUTDOWN] = True
     if tmp_dir_placeholders:
       args[TMP_DIR_PLACEHOLDERS] = tmp_dir_placeholders
     if expect_startup_fail:
@@ -280,18 +281,17 @@ class CustomClusterTestSuite(ImpalaTestSuite):
           del cls.TMP_DIRS[name]
         cls.make_tmp_dir(name)
 
-    if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
-      # IMPALA-13051: Add faster default graceful shutdown options before 
processing
-      # explicit args. Impala doesn't start graceful shutdown until the grace 
period has
-      # passed, and most tests that use graceful shutdown are testing flushing 
the query
-      # log, which doesn't start until after the grace period has passed.
-      cluster_args.append(
-          "--impalad=--shutdown_grace_period_s=0 --shutdown_deadline_s=15")
     impala_daemons = [IMPALAD_ARGS, STATESTORED_ARGS, CATALOGD_ARGS, 
ADMISSIOND_ARGS]
     for arg in (impala_daemons + [JVM_ARGS]):
       val = ''
+      if args.get(WORKLOAD_MGMT, False):
+        if arg == CATALOGD_ARGS:
+          val += '--enable_workload_mgmt=true '
+        if arg == IMPALAD_ARGS:
+          val += WORKLOAD_MGMT_IMPALAD_FLAGS
       if arg in impala_daemons and disable_log_buffering:
         val += '--logbuflevel=-1 '
+      # append this the very last so it can override anything above.
       if arg in args:
         val += (args[arg] if arg not in ACCEPT_FORMATTING
                else args[arg].format(**cls.TMP_DIRS))
@@ -339,10 +339,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if IMPALAD_TIMEOUT_S in args:
       kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S]
 
-    if args.get(WORKLOAD_MGMT, False):
-      if IMPALAD_ARGS or CATALOGD_ARGS in args:
-        kwargs[WORKLOAD_MGMT] = True
-
     if args.get(EXPECT_CORES, False):
       # Make a note of any core files that already exist
       possible_cores = find_all_files('*core*')
@@ -384,11 +380,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @classmethod
   def cluster_teardown(cls, name, args):
-    if args.get(IMPALAD_GRACEFUL_SHUTDOWN, False):
-      for impalad in cls.cluster.impalads:
-        impalad.kill(SIGRTMIN)
-      for impalad in cls.cluster.impalads:
-        impalad.wait_for_exit()
+    if args.get(WORKLOAD_MGMT, False):
+      cls.cluster.graceful_shutdown_impalads()
 
     cls.clear_tmp_dirs()
 
@@ -437,6 +430,20 @@ class CustomClusterTestSuite(ImpalaTestSuite):
             backoff=1), "Did not find table '{}' in local catalog of 
coordinator " \
             "'{}:{}'.".format(tbl, coord.hostname, coord.get_webserver_port())
 
+  def wait_for_wm_idle(self, coordinators=[], timeout_s=370):
+    """Wait until workload management worker in each coordinator becomes idle.
+    The 'timeout_s' is applied on each coordinator wait. Default 'timeout_s' 
is:
+      query_log_dml_exec_timeout_s * query_log_max_insert_attempts + 10 = 370
+    It is intentionally set high to avoid HMS deadlock in the event of slow 
insert
+    followed by ungraceful shutdown (IMPALA-13842)."""
+    if not coordinators:
+      # Refresh cluster in case cluster has changed.
+      self.cluster.refresh()
+      coordinators = self.cluster.get_all_coordinators()
+    for coord in coordinators:
+      coord.service.wait_for_metric_value(
+        "impala-server.completed-queries.queued", 0, timeout=timeout_s, 
interval=1)
+
   @classmethod
   def _stop_impala_cluster(cls):
     # TODO: Figure out a better way to handle case where processes are just 
starting
@@ -532,8 +539,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             impalad_timeout_s=60,
                             ignore_pid_on_log_rotation=False,
                             wait_for_backends=True,
-                            log_symlinks=False,
-                            workload_mgmt=False):
+                            log_symlinks=False):
     cls.impala_log_dir = impala_log_dir
     # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests 
specifically test that
     # certain custom startup arguments work and we want to keep them 
independent of dev
@@ -563,10 +569,6 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       cmd.append("--impalad_args=--use_local_catalog=1")
       cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
 
-    if workload_mgmt:
-      cmd.append("--impalad_args=--enable_workload_mgmt=true")
-      cmd.append("--catalogd_args=--enable_workload_mgmt=true")
-
     default_query_option_kvs = []
     # Put any defaults first, then any arguments after that so they can 
override defaults.
     if default_query_options is not None:
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index f08c9fcff..7979c4754 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -29,7 +29,7 @@ import time
 import requests
 from getpass import getuser
 from random import choice
-from signal import SIGKILL
+from signal import SIGKILL, SIGRTMIN
 from subprocess import check_call, check_output
 from time import sleep
 
@@ -279,6 +279,14 @@ class ImpalaCluster(object):
     if msg:
       raise RuntimeError(msg)
 
+  def graceful_shutdown_impalads(self):
+    # Refresh cluster in case cluster has changed.
+    self.refresh()
+    for impalad in self.impalads:
+      impalad.kill(SIGRTMIN)
+    for impalad in self.impalads:
+      impalad.wait_for_exit()
+
   def __build_impala_process_lists(self):
     """
     Gets all the running Impala procs (with start arguments) on the machine.
@@ -500,9 +508,9 @@ class Process(object):
       LOG.info("Starting container: {0}".format(self.container_id))
       check_call(["docker", "container", "start", self.container_id])
 
-  def restart(self):
+  def restart(self, signal=SIGKILL):
     """Kills and restarts the process"""
-    self.kill()
+    self.kill(signal=signal)
     self.wait_for_exit()
     self.start()
 
@@ -632,8 +640,8 @@ class ImpaladProcess(BaseImpalaProcess):
     """Waits for client ports to be opened. Assumes that the webservice ports 
are open."""
     start_time = time.time()
     LOG.info(
-        "Waiting for coordinator client services " +
-        "- hs2 port: %d hs2-http port: %d beeswax port: %d",
+        "Waiting for coordinator client services "
+        + "- hs2 port: %d hs2-http port: %d beeswax port: %d",
         self.service.hs2_port, self.service.hs2_http_port, 
self.service.beeswax_port)
     while time.time() - start_time < CLUSTER_WAIT_TIMEOUT_IN_SECONDS:
       beeswax_port_is_open = self.service.beeswax_port_is_open()
@@ -752,7 +760,7 @@ def find_user_processes(binaries):
     except KeyError as e:
       if "uid not found" not in str(e):
         raise
-    except psutil.NoSuchProcess as e:
+    except psutil.NoSuchProcess:
       # Ignore the case when a process no longer exists.
       pass
 
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 65f0ea0f7..49c059c5b 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -116,7 +116,8 @@ class BaseImpalaService(object):
   def wait_for_metric_value(self, metric_name, expected_value, timeout=10, 
interval=1,
       allow_greater=False):
     start_time = time()
-    while (time() - start_time < timeout):
+    total_wait = 0
+    while (total_wait < timeout):
       LOG.info("Getting metric: %s from %s:%s" %
           (metric_name, self.hostname, self.webserver_port))
       value = None
@@ -127,21 +128,22 @@ class BaseImpalaService(object):
 
       # if allow_greater is True we wait until the metric value becomes >= the 
expected
       # value.
-      if allow_greater:
-        if value >= expected_value:
-          LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, 
value))
-          return value
-      elif value == expected_value:
-        LOG.info("Metric '%s' has reached desired value: %s" % (metric_name, 
value))
+      if (value == expected_value) or (allow_greater and value >= 
expected_value):
+        LOG.info("Metric '{0}' has reached desired value: {1}. total_wait: 
{2}s".format(
+          metric_name, value, total_wait))
         return value
       else:
-        LOG.info("Waiting for metric value '%s'%s%s. Current value: %s" %
-            (metric_name, '>=' if allow_greater else '=', expected_value, 
value))
+        LOG.info("Waiting for metric value '{0}'{1}{2}. Current value: {3}. "
+                 "total_wait: {4}s".format(
+                   metric_name, ('>=' if allow_greater else '='), 
expected_value, value,
+                   total_wait))
       LOG.info("Sleeping %ds before next retry." % interval)
       sleep(interval)
+      total_wait = time() - start_time
 
     LOG.info("Metric {0} did not reach value {1} in {2}s. Actual value was 
'{3}'. "
-        "Failing...".format(metric_name, expected_value, timeout, value))
+             "total_wait: {4}s. Failing...".format(
+               metric_name, expected_value, timeout, value, total_wait))
     self.__metric_timeout_assert(metric_name, expected_value, timeout, value)
 
   def __request_minidump(self, pid):
diff --git a/tests/common/wm_test_suite.py b/tests/common/wm_test_suite.py
new file mode 100644
index 000000000..e2152ece5
--- /dev/null
+++ b/tests/common/wm_test_suite.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class WorkloadManagementTestSuite(CustomClusterTestSuite):
+  """Base class for tests that exercise workload management behavior.
+  The setup_method waits for workload management init completion, while the
+  teardown_method waits until impala-server.completed-queries.queued metric
+  reaches 0."""
+
+  def setup_method(self, method):
+    super(WorkloadManagementTestSuite, self).setup_method(method)
+    self.wait_for_wm_init_complete()
+
+  def teardown_method(self, method):
+    self.wait_for_wm_idle()
+    super(WorkloadManagementTestSuite, self).teardown_method(method)
+
+  def get_client(self, protocol):
+    """Retrieves the default Impala client for the specified protocol. This 
client is
+       automatically closed after the test completes."""
+    return self.default_impala_client(protocol=protocol)
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index dff30367c..d7673e580 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -38,6 +38,7 @@ from tests.common.custom_cluster_test_suite import (
     ADMISSIOND_ARGS,
     IMPALAD_ARGS,
     START_ARGS,
+    WORKLOAD_MGMT_IMPALAD_FLAGS,
     CustomClusterTestSuite)
 from tests.common.environ import build_flavor_timeout, 
ImpalaTestClusterProperties
 from tests.common.impala_connection import (
@@ -1901,7 +1902,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
         fs_allocation_file="fair-scheduler-onlycoords.xml",
         llama_site_file="llama-site-onlycoords.xml"),
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_happy_path(self, vector):
     """Asserts queries set to use an only coordinators request pool run all 
the fragment
        instances on all coordinators and no executors even if the query 
includes
@@ -1926,7 +1927,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
         fs_allocation_file="fair-scheduler-onlycoords.xml",
         llama_site_file="llama-site-onlycoords.xml"),
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_no_executors(self, vector):
     """Asserts queries that only select from the active queries table run even 
if no
        executors are running."""
@@ -1938,7 +1939,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
         fs_allocation_file="fair-scheduler-onlycoords.xml",
         llama_site_file="llama-site-onlycoords.xml"),
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_one_quiescing_coord(self, vector):
     """Asserts quiescing coordinators do not run fragment instances for 
queries that only
        select from the active queries table."""
@@ -1960,7 +1961,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
         fs_allocation_file="fair-scheduler-onlycoords.xml",
         llama_site_file="llama-site-onlycoords.xml"),
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_one_coord_terminate(self, vector):
     """Asserts a force terminated coordinator is eventually removed from the 
list of
        active coordinators."""
@@ -2003,7 +2004,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
       workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
         fs_allocation_file="fair-scheduler-onlycoords.xml",
         llama_site_file="llama-site-onlycoords.xml"),
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_add_coord(self, vector):
     self.wait_for_wm_init_complete()
 
@@ -2011,15 +2012,16 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
     cluster_size = len(self.cluster.impalads)
     self._start_impala_cluster(
         options=[
-            "--impalad_args=s{}".format(impalad_admission_ctrl_config_args(
+            "--impalad_args={0} {1}".format(
+              WORKLOAD_MGMT_IMPALAD_FLAGS,
+              impalad_admission_ctrl_config_args(
                 fs_allocation_file="fair-scheduler-onlycoords.xml",
                 llama_site_file="llama-site-onlycoords.xml"))],
         add_impalads=True,
         cluster_size=6,
         num_coordinators=1,
         use_exclusive_coordinators=True,
-        wait_for_backends=False,
-        workload_mgmt=True)
+        wait_for_backends=False)
 
     self.assert_log_contains("impalad_node" + str(cluster_size), "INFO",
         "join Impala Service pool")
@@ -2035,8 +2037,7 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
         
additional_args="--expected_executor_group_sets=root.group-set-small:1,"
                         "root.group-set-large:2 "
                         "--num_expected_executors=2 
--executor_groups=coordinator"),
-        impalad_graceful_shutdown=True,
-        statestored_args=_STATESTORED_ARGS)
+      statestored_args=_STATESTORED_ARGS)
   def test_coord_only_pool_exec_groups(self, vector):
     """Asserts queries using only coordinators request pools can run 
successfully when
        executor groups are configured."""
@@ -2075,8 +2076,6 @@ class 
TestAdmissionController(TestAdmissionControllerBase):
         expected_subscribers=expected_subscribers,
         expected_num_impalads=expected_num_impalads)
     self.__run_assert_systables_query(vector)
-    # Refresh cluster to include those two new impalad for graceful shutdown.
-    self.cluster.refresh()
 
 
 class TestAdmissionControllerWithACService(TestAdmissionController):
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 8b3892aac..5f72337f0 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -23,13 +23,20 @@ from getpass import getuser
 from signal import SIGRTMIN
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_cluster import DEFAULT_KRPC_PORT
-from tests.common.impala_connection import FINISHED, PENDING
+from tests.common.impala_connection import ERROR, FINISHED, PENDING
+from tests.common.test_vector import HS2
 from tests.util.workload_management import assert_query, redaction_rules_file
 from time import sleep
 
 
 class TestQueryLive(CustomClusterTestSuite):
-  """Tests to assert the query live table is correctly populated."""
+  """Tests to assert the query live table is correctly populated.
+  This test class does not extend from WorkloadManagementTestSuite due to long
+  --query_log_write_interval_s requirement in most test method."""
+
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
 
   def setup_method(self, method):
     super(TestQueryLive, self).setup_method(method)
@@ -74,9 +81,9 @@ class TestQueryLive(CustomClusterTestSuite):
         assert False, "did not find host {}".format(host)
     assert len(actual_hosts) == 0, "did not find all expected hosts"
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_query_live_hs2(self):
     """Asserts the query live table shows and allows filtering queries. Uses 
the hs2
@@ -88,9 +95,9 @@ class TestQueryLive(CustomClusterTestSuite):
     assert_query('sys.impala_query_live', self.hs2_client, 'test_query_live',
                  result1.runtime_profile)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_query_live(self):
     """Asserts the query live table shows and allows filtering queries. Uses 
the default
@@ -186,11 +193,11 @@ class TestQueryLive(CustomClusterTestSuite):
     self.execute_query_expect_success(self.client, 'drop table 
sys.impala_query_live')
 
   # Must come directly after "drop table sys.impala_query_live"
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  "--cluster_id=test_query_live 
"
                                                  "--use_local_catalog=true",
-                                    catalogd_args="--enable_workload_mgmt "
-                                                  
"--catalog_topic_mode=minimal",
+                                    
catalogd_args="--catalog_topic_mode=minimal",
+                                    workload_mgmt=True,
                                     default_query_options=[
                                       ('default_transactional_type', 
'insert_only')],
                                     disable_log_buffering=True)
@@ -203,11 +210,11 @@ class TestQueryLive(CustomClusterTestSuite):
                  result.runtime_profile)
     self.assert_describe_extended()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  "--cluster_id=test_query_live 
"
                                                  "--use_local_catalog=true",
-                                    catalogd_args="--enable_workload_mgmt "
-                                                  
"--catalog_topic_mode=minimal",
+                                    
catalogd_args="--catalog_topic_mode=minimal",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_local_catalog(self):
     """Asserts the query live table works with local catalog mode."""
@@ -216,12 +223,11 @@ class TestQueryLive(CustomClusterTestSuite):
     assert_query('sys.impala_query_live', self.client, 'test_query_live',
                  result.runtime_profile)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  "--cluster_id=test_query_live 
"
                                                  "--redaction_rules_file={}"
                                                  
.format(redaction_rules_file()),
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_redaction(self):
     """Asserts the query live table table redacts the statement."""
@@ -231,9 +237,9 @@ class TestQueryLive(CustomClusterTestSuite):
     assert_query('sys.impala_query_live', self.client, 'test_query_live',
                  result.runtime_profile)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_alter(self):
     """Asserts alter works on query live table."""
@@ -267,9 +273,9 @@ class TestQueryLive(CustomClusterTestSuite):
     select_column2 = self.execute_query('select * from sys.impala_query_live')
     assert len(select_column2.data) > 1
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     disable_log_buffering=True)
@@ -301,17 +307,21 @@ class TestQueryLive(CustomClusterTestSuite):
     client2.close_query(handle2)
     client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     disable_log_buffering=True)
   def test_executor_groups(self):
     """Asserts scans are performed only on coordinators with multiple executor 
groups."""
     # Add a (non-dedicated) coordinator and executor in a different executor 
group.
-    
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra',
-                                        
'--impalad_args=--cluster_id=test_query_live'],
+    
self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra '
+                                        '--enable_workload_mgmt '
+                                        '--query_log_write_interval_s=300 '
+                                        '--shutdown_grace_period_s=0 '
+                                        '--shutdown_deadline_s=15 '
+                                        '--cluster_id=test_query_live '],
                                cluster_size=1,
                                add_executors=True,
                                expected_num_impalads=4)
@@ -322,9 +332,9 @@ class TestQueryLive(CustomClusterTestSuite):
     assert len(result.data) == 1
     self.assert_only_coordinators(result.runtime_profile, coords=[0, 1], 
execs=[2, 3])
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_query_entries_are_unique(self):
     """Asserts queries in the query live table are unique."""
@@ -364,9 +374,9 @@ class TestQueryLive(CustomClusterTestSuite):
     client2.close_query(handle2)
     client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
+                                                 "--cluster_id=test_query_live 
",
+                                    workload_mgmt=True,
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     disable_log_buffering=True)
@@ -377,10 +387,13 @@ class TestQueryLive(CustomClusterTestSuite):
     handle = self.execute_query_async(query, query_options={
         'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'})
 
-    # Wait for query to compile and assign ranges, then kill impalad during 
debug delay.
+    # Wait for query to compile and assign ranges, then kill impalad
+    # in non-graceful manner during debug delay.
     self.client.wait_for_impala_state(handle, PENDING, 3)
     self.cluster.impalads[1].kill()
 
+    # Wait for query to pass admission control before fetching.
+    self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
     result = self.client.fetch(query, handle)
     assert len(result.data) == 1
     expected_message = 'is no longer available for system table scan 
assignment'
@@ -389,9 +402,9 @@ class TestQueryLive(CustomClusterTestSuite):
 
     self.close_query(handle)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
                                                  
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_shutdown_coordinator(self):
     """Asserts query fails if a coordinator disappears after scheduling. 
Depends on
@@ -404,19 +417,24 @@ class TestQueryLive(CustomClusterTestSuite):
     self.client.wait_for_impala_state(handle, PENDING, 3)
     # Ensure enough time for scheduling to assign ranges.
     sleep(1)
-    # Kill impalad during debug delay.
+    # Kill impalad in non-graceful manner during debug delay.
     self.cluster.impalads[1].kill()
 
     try:
+      # Wait for query to pass admission control before fetching.
+      self.client.wait_for_any_impala_state(handle, [FINISHED, ERROR], 60)
       self.client.fetch(query, handle)
       assert False, "fetch should fail"
     except Exception as e:
       assert "Network error: Client connection negotiation failed" in str(e)
-    # Beeswax client closes the query on failure.
+    # client closes the query on failure.
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
+                                                 "--cluster_id=test_query_live 
"
+                                                 # Override flag set by
+                                                 # workload_mgmt arg.
+                                                 
"--shutdown_grace_period_s=120",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_graceful_shutdown_coordinator(self):
     """Asserts query succeeds if another coordinator is shutdown gracefully 
after
@@ -445,11 +463,8 @@ class TestQueryLive(CustomClusterTestSuite):
     assert len(shutdown.data) == 2
     self.assert_impalads(shutdown.runtime_profile, present=[0, 2], absent=[1])
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_live 
",
+                                    workload_mgmt=True,
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     disable_log_buffering=True)
@@ -482,9 +497,9 @@ class TestQueryLive(CustomClusterTestSuite):
     # in scans.
     self.assert_fragment_instances(result.runtime_profile, [3, 2, 2])
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--cluster_id=test_query_live",
-                                    catalogd_args="--enable_workload_mgmt",
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=300
 "
+                                                 "--cluster_id=test_query_live 
",
+                                    workload_mgmt=True,
                                     cluster_size=3,
                                     num_exclusive_coordinators=2,
                                     disable_log_buffering=True)
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index c5d2a9ffa..afe1a6501 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -33,7 +33,9 @@ from tests.common.cluster_config import 
impalad_admission_ctrl_config_args
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_connection import FINISHED
 from tests.common.impala_test_suite import IMPALAD_HS2_HOST_PORT
+from tests.common.test_dimensions import hs2_client_protocol_dimension
 from tests.common.test_vector import ImpalaTestDimension
+from tests.common.wm_test_suite import WorkloadManagementTestSuite
 from tests.util.retry import retry
 from tests.util.workload_management import (
     assert_query,
@@ -42,42 +44,14 @@ from tests.util.workload_management import (
     redaction_rules_file)
 
 
-class TestQueryLogTableBase(CustomClusterTestSuite):
-  """Base class for all query log tests. Sets up the tests to use the Beeswax 
and HS2
-     client protocols."""
-
-  PROTOCOL_BEESWAX = "beeswax"
-  PROTOCOL_HS2 = "hs2"
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestQueryLogTableBase, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol',
-        cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2))
-
-  def setup_method(self, method):
-    super(TestQueryLogTableBase, self).setup_method(method)
-    self.wait_for_wm_init_complete()
-
-  def get_client(self, protocol):
-    """Retrieves the default Impala client for the specified protocol. This 
client is
-       automatically closed after the test completes."""
-    if protocol == self.PROTOCOL_BEESWAX:
-      return self.client
-    elif protocol == self.PROTOCOL_HS2:
-      return self.hs2_client
-    raise Exception("unknown protocol: {0}".format(protocol))
-
-
-class TestQueryLogTableBeeswax(TestQueryLogTableBase):
+class TestQueryLogTableBasic(WorkloadManagementTestSuite):
   """Tests to assert the query log table is correctly populated when using the 
Beeswax
      client protocol."""
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestQueryLogTableBeeswax, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('protocol') == 'beeswax')
+    super(TestQueryLogTableBasic, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
   @classmethod
   def get_workload(self):
@@ -85,17 +59,12 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
 
   MAX_SQL_PLAN_LEN = 2000
   LOG_DIR_MAX_WRITES = 'max_attempts_exceeded'
-  FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + 
str(int(time()))
-  FLUSH_MAX_RECORDS_QUERY_COUNT = 30
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--cluster_id=test_max_select 
"
+  @CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_max_select 
"
                                                  
"--query_log_max_sql_length={0} "
                                                  
"--query_log_max_plan_length={0}"
                                                  .format(MAX_SQL_PLAN_LEN),
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_lower_max_sql_plan(self, vector):
     """Asserts that length limits on the sql and plan columns in the completed 
queries
@@ -128,11 +97,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     assert len(data[1]) == self.MAX_SQL_PLAN_LEN - data[1].count("\n") - 1, \
         "incorrect plan length"
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_max_select",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_max_select",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_sql_plan_too_long(self, vector):
     """Asserts that very long queries have their corresponding plan and sql 
columns
@@ -164,13 +130,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     # Newline characters are not counted by Impala's length function.
     assert len(data[1]) == 16777216 - data[1].count("\n") - 1
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_1 "
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_1 "
                                                  "--query_log_size=0 "
                                                  "--query_log_size_in_bytes=0",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_no_query_log(self, vector):
     """Asserts queries are written to the completed queries table when the 
in-memory
@@ -194,13 +157,10 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     assert len(actual.data) == 1
     assert actual.data[0] == select_sql
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_2 "
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2 "
                                                  "--always_use_data_cache "
                                                  
"--data_cache={query_data_cache}:5GB",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     cluster_size=1,
                                     tmp_dir_placeholders=['query_data_cache'],
                                     disable_log_buffering=True)
@@ -240,11 +200,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     assert data["BYTES_READ_CACHE_TOTAL"] != "0", "bytes read from cache total 
was " \
         "zero, test did not assert anything"
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=5",
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=5",
                                     impala_log_dir=("{" + LOG_DIR_MAX_WRITES + 
"}"),
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     tmp_dir_placeholders=[LOG_DIR_MAX_WRITES],
                                     disable_log_buffering=True)
   def test_max_attempts_exceeded(self, vector):
@@ -290,80 +248,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     assert impalad.service.get_metric_value(
       "impala-server.completed-queries.written") == 0
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 "--query_log_max_queued={0} "
-                                                 
"--query_log_write_interval_s=9999 "
-                                                 "--cluster_id={1} "
-                                                 
"--query_log_expression_limit=5000"
-                                                 
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
-                                                 FLUSH_MAX_RECORDS_CLUSTER_ID),
-                                    catalogd_args="--enable_workload_mgmt",
-                                    default_query_options=[
-                                      ('statement_expression_limit', 1024)],
-                                    impalad_graceful_shutdown=True,
-                                    disable_log_buffering=True)
-  def test_flush_on_queued_count_exceeded(self, vector):
-    """Asserts that queries that have completed are written to the query log 
table when
-       the maximum number of queued records is reached. Also verifies that 
writing
-       completed queries is not limited by default 
statement_expression_limit."""
-
-    impalad = self.cluster.get_first_impalad()
-    client = self.get_client(vector.get_value('protocol'))
-
-    rand_str = "{0}-{1}".format(vector.get_value('protocol'), time())
-
-    test_sql = "select '{0}','{1}'".format(rand_str,
-        self.FLUSH_MAX_RECORDS_CLUSTER_ID)
-    test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format(
-        rand_str, QUERY_TBL_LOG, test_sql.replace("'", r"\'"))
-
-    for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT):
-      res = client.execute(test_sql)
-      assert res.success
-
-    # Running this query results in the number of queued completed queries to 
exceed
-    # the max and thus all completed queries will be written to the query log 
table.
-    res = client.execute(test_sql_assert)
-    assert res.success
-    assert 1 == len(res.data)
-    assert "0" == res.data[0].split("\t")[1]
-
-    # Wait until the completed queries have all been written out because the 
max queued
-    # count was exceeded.
-    impalad.service.wait_for_metric_value(
-        "impala-server.completed-queries.max-records-writes", 1, 60)
-    self.cluster.get_first_impalad().service.wait_for_metric_value(
-        "impala-server.completed-queries.written",
-        self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60)
-
-    # Force Impala to process the inserts to the completed queries table.
-    client.execute("refresh " + QUERY_TBL_LOG)
-
-    # This query will remain queued due to the long write interval and max 
queued
-    # records limit not being reached.
-    res = client.execute(r"select count(*) from {0} where sql like 'select 
\'{1}\'%'"
-        .format(QUERY_TBL_LOG, rand_str))
-    assert res.success
-    assert 1 == len(res.data)
-    assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0]
-    impalad.service.wait_for_metric_value(
-        "impala-server.completed-queries.queued", 2, 60)
-
-    assert impalad.service.get_metric_value(
-        "impala-server.completed-queries.max-records-writes") == 1
-    assert impalad.service.get_metric_value(
-        "impala-server.completed-queries.scheduled-writes") == 0
-    assert 
impalad.service.get_metric_value("impala-server.completed-queries.written") \
-        == self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1
-    assert impalad.service.get_metric_value(
-        "impala-server.completed-queries.queued") == 2
-
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1",
-                                    cluster_size=3,
+  @CustomClusterTestSuite.with_args(cluster_size=3,
                                     num_exclusive_coordinators=2,
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_dedicated_coordinator_no_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
@@ -386,12 +273,9 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1",
-                                    cluster_size=3,
+  @CustomClusterTestSuite.with_args(cluster_size=3,
                                     num_exclusive_coordinators=2,
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_dedicated_coordinator_with_mt_dop(self, vector):
     """Asserts the values written to the query log table match the values from 
the
@@ -416,26 +300,24 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--redaction_rules_file={}"
+  @CustomClusterTestSuite.with_args(impalad_args="--redaction_rules_file={}"
                                                  
.format(redaction_rules_file()),
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
-  def test_redaction(self):
+  def test_redaction(self, vector):
     """Asserts the query log table redacts the statement."""
-    result = self.client.execute(
+    client = self.get_client(vector.get_value('protocol'))
+    result = client.execute(
         "select *, 'supercalifragilisticexpialidocious' from 
functional.alltypes",
         fetch_profile_after_close=True)
     assert result.success
 
     self.cluster.get_first_impalad().service.wait_for_metric_value(
         "impala-server.completed-queries.written", 1, 60)
-    assert_query(QUERY_TBL_LOG, self.client, 
raw_profile=result.runtime_profile)
+    assert_query(QUERY_TBL_LOG, client, raw_profile=result.runtime_profile)
 
 
-class TestQueryLogOtherTable(TestQueryLogTableBase):
+class TestQueryLogOtherTable(WorkloadManagementTestSuite):
   """Tests to assert that query_log_table_name works with non-default value."""
 
   OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
@@ -443,19 +325,15 @@ class TestQueryLogOtherTable(TestQueryLogTableBase):
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryLogOtherTable, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('protocol') == 'beeswax')
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--blacklisted_dbs=information_schema "
+  
@CustomClusterTestSuite.with_args(impalad_args="--blacklisted_dbs=information_schema
 "
                                                  "--query_log_table_name={0}"
                                                  .format(OTHER_TBL),
-                                    catalogd_args="--enable_workload_mgmt "
-                                                  
"--blacklisted_dbs=information_schema "
+                                    
catalogd_args="--blacklisted_dbs=information_schema "
                                                   "--query_log_table_name={0}"
                                                   .format(OTHER_TBL),
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_renamed_log_table(self, vector):
     """Asserts that the completed queries table can be renamed."""
@@ -479,7 +357,7 @@ class TestQueryLogOtherTable(TestQueryLogTableBase):
       client.execute("drop table {0}.{1} purge".format(WM_DB, self.OTHER_TBL))
 
 
-class TestQueryLogTableHS2(TestQueryLogTableBase):
+class TestQueryLogTableHS2(WorkloadManagementTestSuite):
   """Tests to assert the query log table is correctly populated when using the 
HS2
      client protocol."""
 
@@ -488,16 +366,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryLogTableHS2, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('protocol') == 'hs2')
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 "--cluster_id={}"
+  @CustomClusterTestSuite.with_args(impalad_args="--cluster_id={}"
                                                  
.format(HS2_OPERATIONS_CLUSTER_ID),
-                                    catalogd_args="--enable_workload_mgmt",
                                     cluster_size=2,
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_hs2_metadata_operations(self, vector):
     """Certain HS2 operations appear to Impala as a special kind of query. 
Specifically,
@@ -620,12 +494,9 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     assert assert_results.success
     assert assert_results.data[0] == "1"
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_mult",
-                                    catalogd_args="--enable_workload_mgmt",
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_mult",
                                     cluster_size=2,
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_query_multiple_tables(self, vector):
     """Asserts the values written to the query log table match the values from 
the
@@ -650,11 +521,8 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_3",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_3",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_insert_select(self, vector, unique_database,
       unique_name):
@@ -684,117 +552,12 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=15",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
-                                    disable_log_buffering=True)
-  def test_flush_on_interval(self, vector):
-    """Asserts that queries that have completed are written to the query log 
table
-       after the specified write interval elapses."""
-
-    client = self.get_client(vector.get_value('protocol'))
-
-    query_count = 10
-
-    for i in range(query_count):
-      res = client.execute("select sleep(1000)")
-      assert res.success
-
-    # Wait for at least one iteration of the workload management processing 
loop to write
-    # to the completed queries table.
-    self.cluster.get_first_impalad().service.wait_for_metric_value(
-      "impala-server.completed-queries.written", query_count, 20)
-
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=9999 "
-                                                 "--shutdown_grace_period_s=0 "
-                                                 "--shutdown_deadline_s=15 "
-                                                 "--debug_actions="
-                                                 
"WM_SHUTDOWN_DELAY:SLEEP@5000",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    disable_log_buffering=True)
-  def test_flush_on_shutdown(self, vector):
-    """Asserts that queries that have completed but are not yet written to the 
query
-       log table are flushed to the table before the coordinator exits. 
Graceful shutdown
-       for 2nd coordinator not needed because query_log_write_interval_s is 
very long."""
-
-    impalad = self.cluster.get_first_impalad()
-    client = self.get_client(vector.get_value('protocol'))
-
-    # Execute sql statements to ensure all get written to the query log table.
-    sql1 = client.execute("select 1")
-    assert sql1.success
-
-    sql2 = client.execute("select 2")
-    assert sql2.success
-
-    sql3 = client.execute("select 3")
-    assert sql3.success
-
-    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
-        60)
-
-    impalad.kill_and_wait_for_exit(SIGRTMIN)
-    self.assert_impalad_log_contains("INFO", r'Workload management shutdown 
successful',
-      timeout_s=60)
-
-    client2 = self.create_client_for_nth_impalad(1, 
vector.get_value('protocol'))
-
-    try:
-      def assert_func():
-        results = client2.execute("select query_id,sql from {0} where query_id 
in "
-                                  "('{1}','{2}','{3}')".format(QUERY_TBL_LOG,
-                                  sql1.query_id, sql2.query_id, sql3.query_id))
-
-        return len(results.data) == 3
-
-      assert retry(func=assert_func, max_attempts=5, sleep_time_s=3)
-    finally:
-      client2.close()
-
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=9999 "
-                                                 "--shutdown_grace_period_s=0 "
-                                                 
"--query_log_shutdown_timeout_s=3 "
-                                                 "--shutdown_deadline_s=15 "
-                                                 "--debug_actions="
-                                                 
"WM_SHUTDOWN_DELAY:SLEEP@10000",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    disable_log_buffering=True)
-  def test_shutdown_flush_timed_out(self, vector):
-    """Asserts that queries that have completed but are not yet written to the 
query
-       log table are lost if the completed queries queue drain takes too long 
and that
-       the coordinator logs the estimated number of queries lost."""
-
-    impalad = self.cluster.get_first_impalad()
-    client = self.get_client(vector.get_value('protocol'))
-
-    # Execute sql statements to ensure all get written to the query log table.
-    sql1 = client.execute("select 1")
-    assert sql1.success
-
-    sql2 = client.execute("select 2")
-    assert sql2.success
-
-    sql3 = client.execute("select 3")
-    assert sql3.success
-
-    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
-        60)
-
-    impalad.kill_and_wait_for_exit(SIGRTMIN)
-    self.assert_impalad_log_contains("INFO", r"Workload management shutdown 
timed out. "
-      r"Up to '3' queries may have been lost",
-      timeout_s=60)
-
   @CustomClusterTestSuite.with_args(
-      impalad_args="--enable_workload_mgmt "
-      "--query_log_write_interval_s=3 "
+      impalad_args="--query_log_write_interval_s=3 "
       "--query_log_dml_exec_timeout_s=1 "
       "--debug_actions=INTERNAL_SERVER_AFTER_SUBMIT:SLEEP@2000",
-      catalogd_args="--enable_workload_mgmt",
-      impalad_graceful_shutdown=True, cluster_size=1, 
disable_log_buffering=True)
+      workload_mgmt=True,
+      cluster_size=1, disable_log_buffering=True)
   def test_exec_timeout(self, vector):
     """Asserts the --query_log_dml_exec_timeout_s startup flag is added to the 
workload
        management insert DML and the DML will be cancelled when its execution 
time exceeds
@@ -820,15 +583,17 @@ class TestQueryLogTableHS2(TestQueryLogTableBase):
                                      "limit of 
1s000ms".format(self.insert_query_id))
 
 
-class TestQueryLogTableAll(TestQueryLogTableBase):
+class TestQueryLogTableAll(WorkloadManagementTestSuite):
   """Tests to assert the query log table is correctly populated when using all 
the
      client protocols."""
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_2",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestQueryLogTableAll, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
+
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_ddl(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
@@ -851,11 +616,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_3",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_3",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_dml(self, vector, unique_database, unique_name):
     """Asserts the values written to the query log table match the values from 
the
@@ -885,11 +647,8 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     finally:
       client2.close()
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_2",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_2",
+                                    workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_invalid_query(self, vector):
     """Asserts correct values are written to the completed queries table for a 
failed
@@ -918,10 +677,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     assert_query(query_tbl=QUERY_TBL_LOG, client=client,
         expected_cluster_id="test_query_hist_2", impalad=impalad, 
query_id=result.data[0])
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  @CustomClusterTestSuite.with_args(workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_ignored_sqls_not_written(self, vector):
     """Asserts that expected queries are not written to the query log table."""
@@ -1013,10 +769,7 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
     assert self.cluster.get_first_impalad().service.get_metric_value(
         "impala-server.completed-queries.failure") == 0
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+  @CustomClusterTestSuite.with_args(workload_mgmt=True,
                                     disable_log_buffering=True)
   def test_sql_injection_attempts(self, vector):
     client = self.get_client(vector.get_value('protocol'))
@@ -1089,21 +842,19 @@ class TestQueryLogTableAll(TestQueryLogTableBase):
                                         .format(esc_sql, test_case)
 
 
-class TestQueryLogTableBufferPool(TestQueryLogTableBase):
+class TestQueryLogTableBufferPool(WorkloadManagementTestSuite):
   """Base class for all query log tests that set the buffer pool query 
option."""
 
   @classmethod
   def add_test_dimensions(cls):
     super(TestQueryLogTableBufferPool, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('buffer_pool_limit',
         None, "14.97MB"))
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
-                                                 
"--query_log_write_interval_s=1 "
-                                                 
"--cluster_id=test_query_hist_1 "
+  
@CustomClusterTestSuite.with_args(impalad_args="--cluster_id=test_query_hist_1 "
                                                  
"--scratch_dirs={scratch_dir}:5G",
-                                    catalogd_args="--enable_workload_mgmt",
-                                    impalad_graceful_shutdown=True,
+                                    workload_mgmt=True,
                                     tmp_dir_placeholders=['scratch_dir'],
                                     disable_log_buffering=True)
   def test_select(self, vector):
@@ -1152,16 +903,21 @@ class TestQueryLogTableBufferPool(TestQueryLogTableBase):
           "was zero, test did not assert anything"
 
 
-class TestQueryLogQueuedQueries(CustomClusterTestSuite):
+class TestQueryLogQueuedQueries(WorkloadManagementTestSuite):
   """Simulates a cluster that is under load and has queries that are queueing 
in
      admission control."""
 
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestQueryLogQueuedQueries, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
+
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_config_args(
           fs_allocation_file="fair-scheduler-one-query.xml",
           llama_site_file="llama-site-one-query.xml",
           additional_args="--query_log_write_interval_s=5"),
-      impalad_graceful_shutdown=True, workload_mgmt=True,
+      workload_mgmt=True,
       num_exclusive_coordinators=1, cluster_size=2,
       default_query_options=[('fetch_rows_timeout_ms', '1000')])
   def test_query_queued(self):
@@ -1233,6 +989,183 @@ class TestQueryLogQueuedQueries(CustomClusterTestSuite):
                  backoff=1)
 
 
+class TestQueryLogTableFlush(CustomClusterTestSuite):
+  """Tests to assert the query log table flush correctly under some shutdown
+  scenario. They are separated from others because test may stop impalad 
individually.
+  This test class does not extend from WorkloadManagementTestSuite because it 
must not
+  wait until workload management is idle to begin graceful shutdown."""
+
+  FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_flush_max_records_" + 
str(int(time()))
+  FLUSH_MAX_RECORDS_QUERY_COUNT = 30
+
+  def setup_method(self, method):
+    super(TestQueryLogTableFlush, self).setup_method(method)
+    self.wait_for_wm_init_complete()
+
+  @CustomClusterTestSuite.with_args(impalad_args="--query_log_max_queued={0} "
+                                                 
"--query_log_write_interval_s=9999 "
+                                                 "--cluster_id={1} "
+                                                 
"--query_log_expression_limit=5000"
+                                                 
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
+                                                 FLUSH_MAX_RECORDS_CLUSTER_ID),
+                                    default_query_options=[
+                                      ('statement_expression_limit', 1024)],
+                                    workload_mgmt=True,
+                                    disable_log_buffering=True)
+  def test_flush_on_queued_count_exceeded(self):
+    """Asserts that queries that have completed are written to the query log 
table when
+       the maximum number of queued records is reached. Also verifies that 
writing
+       completed queries is not limited by default 
statement_expression_limit."""
+
+    impalad = self.cluster.get_first_impalad()
+    client = impalad.service.create_hs2_client()
+
+    rand_str = "{0}-{1}".format(client.get_test_protocol(), time())
+
+    test_sql = "select '{0}','{1}'".format(rand_str,
+        self.FLUSH_MAX_RECORDS_CLUSTER_ID)
+    test_sql_assert = "select '{0}', count(*) from {1} where sql='{2}'".format(
+        rand_str, QUERY_TBL_LOG, test_sql.replace("'", r"\'"))
+
+    for _ in range(0, self.FLUSH_MAX_RECORDS_QUERY_COUNT):
+      res = client.execute(test_sql)
+      assert res.success
+
+    # Running this query results in the number of queued completed queries to 
exceed
+    # the max and thus all completed queries will be written to the query log 
table.
+    res = client.execute(test_sql_assert)
+    assert res.success
+    assert 1 == len(res.data)
+    assert "0" == res.data[0].split("\t")[1]
+
+    # Wait until the completed queries have all been written out because the 
max queued
+    # count was exceeded.
+    impalad.service.wait_for_metric_value(
+        "impala-server.completed-queries.max-records-writes", 1, 60)
+    self.cluster.get_first_impalad().service.wait_for_metric_value(
+        "impala-server.completed-queries.written",
+        self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60)
+
+    # Force Impala to process the inserts to the completed queries table.
+    client.execute("refresh " + QUERY_TBL_LOG)
+
+    # This query will remain queued due to the long write interval and max 
queued
+    # records limit not being reached.
+    res = client.execute(r"select count(*) from {0} where sql like 'select 
\'{1}\'%'"
+        .format(QUERY_TBL_LOG, rand_str))
+    assert res.success
+    assert 1 == len(res.data)
+    assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0]
+    impalad.service.wait_for_metric_value(
+        "impala-server.completed-queries.queued", 2, 60)
+
+    assert impalad.service.get_metric_value(
+        "impala-server.completed-queries.max-records-writes") == 1
+    assert impalad.service.get_metric_value(
+        "impala-server.completed-queries.scheduled-writes") == 0
+    assert 
impalad.service.get_metric_value("impala-server.completed-queries.written") \
+        == self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1
+    assert impalad.service.get_metric_value(
+        "impala-server.completed-queries.queued") == 2
+
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=15",
+                                    workload_mgmt=True,
+                                    disable_log_buffering=True)
+  def test_flush_on_interval(self):
+    """Asserts that queries that have completed are written to the query log 
table
+       after the specified write interval elapses."""
+
+    impalad = self.cluster.get_first_impalad()
+    client = impalad.service.create_hs2_client()
+
+    query_count = 10
+
+    for i in range(query_count):
+      res = client.execute("select sleep(1000)")
+      assert res.success
+
+    # Wait for at least one iteration of the workload management processing 
loop to write
+    # to the completed queries table.
+    self.cluster.get_first_impalad().service.wait_for_metric_value(
+      "impala-server.completed-queries.written", query_count, 20)
+
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999
 "
+                                                 "--shutdown_grace_period_s=0 "
+                                                 "--shutdown_deadline_s=15 "
+                                                 "--debug_actions="
+                                                 
"WM_SHUTDOWN_DELAY:SLEEP@5000",
+                                    workload_mgmt=True,
+                                    disable_log_buffering=True)
+  def test_flush_on_shutdown(self):
+    """Asserts that queries that have completed but are not yet written to the 
query
+       log table are flushed to the table before the coordinator exits. 
Graceful shutdown
+       for 2nd coordinator not needed because query_log_write_interval_s is 
very long."""
+
+    impalad = self.cluster.get_first_impalad()
+    client = impalad.service.create_hs2_client()
+
+    # Execute sql statements to ensure all get written to the query log table.
+    sql1 = client.execute("select 1")
+    assert sql1.success
+
+    sql2 = client.execute("select 2")
+    assert sql2.success
+
+    sql3 = client.execute("select 3")
+    assert sql3.success
+
+    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
+        60)
+
+    impalad.kill_and_wait_for_exit(SIGRTMIN)
+    self.assert_impalad_log_contains("INFO", r'Workload management shutdown 
successful',
+      timeout_s=60)
+
+    with self.cluster.impalads[1].service.create_hs2_client() as client2:
+      def assert_func():
+        results = client2.execute("select query_id,sql from {0} where query_id 
in "
+                                  "('{1}','{2}','{3}')".format(QUERY_TBL_LOG,
+                                  sql1.query_id, sql2.query_id, sql3.query_id))
+
+        return len(results.data) == 3
+
+      assert retry(func=assert_func, max_attempts=5, sleep_time_s=3)
+
+  
@CustomClusterTestSuite.with_args(impalad_args="--query_log_write_interval_s=9999
 "
+                                                 "--shutdown_grace_period_s=0 "
+                                                 
"--query_log_shutdown_timeout_s=3 "
+                                                 "--shutdown_deadline_s=15 "
+                                                 "--debug_actions="
+                                                 
"WM_SHUTDOWN_DELAY:SLEEP@10000",
+                                    workload_mgmt=True,
+                                    disable_log_buffering=True)
+  def test_shutdown_flush_timed_out(self):
+    """Asserts that queries that have completed but are not yet written to the 
query
+       log table are lost if the completed queries queue drain takes too long 
and that
+       the coordinator logs the estimated number of queries lost."""
+
+    impalad = self.cluster.get_first_impalad()
+    client = impalad.service.create_hs2_client()
+
+    # Execute sql statements to ensure all get written to the query log table.
+    sql1 = client.execute("select 1")
+    assert sql1.success
+
+    sql2 = client.execute("select 2")
+    assert sql2.success
+
+    sql3 = client.execute("select 3")
+    assert sql3.success
+
+    
impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 
3,
+        60)
+
+    impalad.kill_and_wait_for_exit(SIGRTMIN)
+    self.assert_impalad_log_contains("INFO", r"Workload management shutdown 
timed out. "
+      r"Up to '3' queries may have been lost",
+      timeout_s=60)
+
+
 # Helper function to determine if a query from the debug UI is a workload 
management
 # insert DML.
 def _is_insert_query(query):
diff --git a/tests/custom_cluster/test_workload_mgmt_init.py 
b/tests/custom_cluster/test_workload_mgmt_init.py
index 76c16d7f2..45ef09bd5 100644
--- a/tests/custom_cluster/test_workload_mgmt_init.py
+++ b/tests/custom_cluster/test_workload_mgmt_init.py
@@ -24,7 +24,11 @@ from subprocess import CalledProcessError
 from logging import getLogger
 
 from SystemTables.ttypes import TQueryTableColumn
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.custom_cluster_test_suite import (
+    WORKLOAD_MGMT_IMPALAD_FLAGS,
+    CustomClusterTestSuite)
+from tests.common.test_dimensions import hs2_client_protocol_dimension
+from tests.common.test_vector import HS2
 from tests.util.workload_management import (
     assert_query,
     WM_DB,
@@ -35,27 +39,33 @@ from tests.util.workload_management import (
 
 LOG = getLogger(__name__)
 QUERY_TBL_ALL = "{},{}".format(QUERY_TBL_LOG_NAME, QUERY_TBL_LIVE_NAME)
+# Latest workload_mgmt_schema_version.
+LATEST_SCHEMA = "1.2.0"
 
 
 class TestWorkloadManagementInitBase(CustomClusterTestSuite):
 
-  """Defines common setup and methods for all workload management init 
tests."""
-
-  LATEST_SCHEMA = "1.2.0"
+  """Defines common setup and methods for all workload management init tests.
+  This test class does not extend WorkloadManagementTestSuite because its 
subclasses
+  define its own setup_method and teardown_method."""
 
   @classmethod
   def get_workload(self):
     return 'functional-query'
 
+  @classmethod
+  def default_test_protocol(cls):
+    return HS2
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestWorkloadManagementInitBase, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('protocol') == 
'beeswax')
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
   def setup_method(self, method):
     super(TestWorkloadManagementInitBase, self).setup_method(method)
 
-  def restart_cluster(self, vector, schema_version="", 
wait_for_init_complete=True,
+  def restart_cluster(self, schema_version="", wait_for_init_complete=True,
       cluster_size=3, additional_impalad_opts="", wait_for_backends=True,
       additional_catalogd_opts="", expect_startup_err=False, 
log_symlinks=False):
     """Wraps the existing custom cluster _start_impala_cluster function to 
restart the
@@ -64,7 +74,8 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite):
        function blocks until the workload management init process completes. If
        additional_impalad_opts is specified, that string is appended to the 
impala_args
        startup flag."""
-    coord_opts = "--impalad_args=--enable_workload_mgmt --logbuflevel=-1 "
+    coord_opts = "--impalad_args=--logbuflevel=-1 {} ".format(
+      WORKLOAD_MGMT_IMPALAD_FLAGS)
     coord_opts += additional_impalad_opts
 
     catalog_opts = "--catalogd_args=--enable_workload_mgmt --logbuflevel=-1 "
@@ -123,7 +134,7 @@ class 
TestWorkloadManagementInitBase(CustomClusterTestSuite):
       table. The regex is passed the fully qualified table name using python 
string
       substitution."""
     for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE):
-      self.assert_catalogd_log_contains("INFO", line_regex.format(table))
+      self.assert_catalogd_log_contains(level, line_regex.format(table))
 
   def check_schema(self, schema_ver, vector, multiple_impalad=False):
     """Asserts that all workload management tables have the correct columns 
and are at the
@@ -143,55 +154,57 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     super(TestWorkloadManagementInitWait, self).setup_method(method)
     self.wait_for_wm_init_complete()
 
+  def teardown_method(self, method):
+    self.wait_for_wm_idle()
+    super(TestWorkloadManagementInitWait, self).teardown_method(method)
+
   @CustomClusterTestSuite.with_args(
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
+      workload_mgmt=True,
       disable_log_buffering=True)
   def test_no_upgrade(self, vector):
     """Tests that no upgrade happens when starting a cluster where the 
workload management
        tables are already at the latest version."""
-    self.restart_cluster(vector, schema_version=self.LATEST_SCHEMA, 
log_symlinks=True)
-    self.check_schema(self.LATEST_SCHEMA, vector)
+    self.restart_cluster(schema_version=LATEST_SCHEMA, log_symlinks=True)
+    self.check_schema(LATEST_SCHEMA, vector)
 
     self.assert_catalogd_log_contains("INFO", r"Workload management table .*? 
will be "
         r"upgraded", expected_count=0)
 
-  @CustomClusterTestSuite.with_args(cluster_size=10, 
disable_log_buffering=True,
-      log_symlinks=True,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_schema_version=1.0.0 "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=10, disable_log_buffering=True,
+      log_symlinks=True, workload_mgmt=True,
+      impalad_args="--workload_mgmt_schema_version=1.0.0",
+      catalogd_args="--workload_mgmt_schema_version=1.0.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_0_0(self, vector):
     """Asserts that workload management tables are properly created on version 
1.0.0 using
        a 10 node cluster when no tables exist."""
     self.check_schema("1.0.0", vector, multiple_impalad=True)
 
-  @CustomClusterTestSuite.with_args(cluster_size=10, 
disable_log_buffering=True,
-      log_symlinks=True,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_schema_version=1.1.0 "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=10, disable_log_buffering=True,
+      log_symlinks=True, workload_mgmt=True,
+      impalad_args="--workload_mgmt_schema_version=1.1.0",
+      catalogd_args="--workload_mgmt_schema_version=1.1.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_1_0(self, vector):
     """Asserts that workload management tables are properly created on version 
1.1.0 using
        a 10 node cluster when no tables exist."""
     self.check_schema("1.1.0", vector, multiple_impalad=True)
 
-  @CustomClusterTestSuite.with_args(cluster_size=10, 
disable_log_buffering=True,
-      log_symlinks=True,
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
+  @CustomClusterTestSuite.with_args(
+      cluster_size=10, disable_log_buffering=True,
+      log_symlinks=True, workload_mgmt=True,
+      catalogd_args="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_on_version_1_2_0(self, vector):
     """Asserts that workload management tables are properly created on the 
latest version
        using a 10 node cluster when no tables exist."""
     self.check_schema("1.2.0", vector, multiple_impalad=True)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_schema_version=1.0.0 "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, workload_mgmt=True,
+      impalad_args="--workload_mgmt_schema_version=1.0.0",
+      catalogd_args="--workload_mgmt_schema_version=1.0.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL),
       disable_log_buffering=True)
   def test_upgrade_1_0_0_to_1_1_0(self, vector):
@@ -203,8 +216,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.0.0' is "
         r"not the latest schema version '\d+\.\d+\.\d+'")
 
-    self.restart_cluster(vector, schema_version="1.1.0", cluster_size=1,
-        log_symlinks=True)
+    self.restart_cluster(schema_version="1.1.0", cluster_size=1, 
log_symlinks=True)
 
     # Assert the upgrade process ran.
     self.assert_catalogd_all_tables(r"Workload management table '{}' is at 
version "
@@ -212,10 +224,10 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
     self.check_schema("1.1.0", vector)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_schema_version=1.1.0 "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, workload_mgmt=True,
+      impalad_args="--workload_mgmt_schema_version=1.1.0",
+      catalogd_args="--workload_mgmt_schema_version=1.1.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL),
       disable_log_buffering=True)
   def test_upgrade_1_1_0_to_1_2_0(self, vector):
@@ -227,8 +239,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.1.0' is "
         r"not the latest schema version '\d+\.\d+\.\d+'")
 
-    self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1,
-        log_symlinks=True)
+    self.restart_cluster(schema_version="1.2.0", cluster_size=1, 
log_symlinks=True)
 
     # Assert the upgrade process ran.
     self.assert_catalogd_all_tables(r"Workload management table '{}' is at 
version "
@@ -236,10 +247,10 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
     self.check_schema("1.2.0", vector)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_schema_version=1.0.0 "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, workload_mgmt=True,
+      impalad_args="--workload_mgmt_schema_version=1.0.0",
+      catalogd_args="--workload_mgmt_schema_version=1.0.0 "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL),
       disable_log_buffering=True)
   def test_upgrade_1_0_0_to_1_2_0(self, vector):
@@ -251,8 +262,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.0.0' is "
         r"not the latest schema version '\d+\.\d+\.\d+'")
 
-    self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1,
-        log_symlinks=True)
+    self.restart_cluster(schema_version="1.2.0", cluster_size=1, 
log_symlinks=True)
 
     # Assert the upgrade process ran.
     self.assert_catalogd_all_tables(r"Workload management table '{}' is at 
version "
@@ -260,20 +270,21 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
     self.check_schema("1.2.0", vector)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, 
impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt", disable_log_buffering=True)
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, workload_mgmt=True, disable_log_buffering=True)
   def test_log_table_newer_schema_version(self, vector):
     """Asserts a catalog startup flag version that is older than the workload
        management table schema version will write only the fields associated 
with the
        startup flag version."""
-    self.restart_cluster(vector, schema_version="1.0.0", cluster_size=1,
-        log_symlinks=True, 
additional_impalad_opts="--query_log_write_interval_s=15")
+    self.restart_cluster(
+      schema_version="1.0.0", cluster_size=1, log_symlinks=True,
+      additional_impalad_opts="--query_log_write_interval_s=15")
 
     self.assert_catalogd_log_contains("WARNING", "Target schema version 
'1.0.0' is not "
-        "the latest schema version '{}'".format(self.LATEST_SCHEMA))
+        "the latest schema version '{}'".format(LATEST_SCHEMA))
 
     # The workload management tables will be on the latest schema version.
-    self.check_schema(self.LATEST_SCHEMA, vector)
+    self.check_schema(LATEST_SCHEMA, vector)
 
     # The workload management processing will be running on schema version 
1.0.0.
     self.assert_catalogd_all_tables(r"Target schema version '1.0.0' of the 
'{}' table is "
@@ -309,11 +320,10 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
           TQueryTableColumn.COORDINATOR_SLOTS: "NULL",
           TQueryTableColumn.EXECUTOR_SLOTS: "NULL"})
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True,
-      log_symlinks=True,
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt "
-                    "--query_log_table_props=\"foo=bar,foo1=bar1\" "
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, disable_log_buffering=True,
+      log_symlinks=True, workload_mgmt=True,
+      catalogd_args="--query_log_table_props=\"foo=bar,foo1=bar1\" "
                     "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_table_with_custom_props(self):
     """Asserts that creating workload management tables with additional 
properties
@@ -322,21 +332,20 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     self.assert_table_prop(QUERY_TBL_LOG, "foo", "bar")
     self.assert_table_prop(QUERY_TBL_LIVE, "foo", "bar")
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True,
-      log_symlinks=True,
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, disable_log_buffering=True,
+      log_symlinks=True, workload_mgmt=True,
+      catalogd_args="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
   def test_create_from_scratch(self, vector):
     """Tests the conditions that exist when workload management is first 
started by
        deleteing the workload management tables and the sys db and 
restarting."""
     assert self.client.execute("drop database {} cascade"
         .format(WM_DB)).success
 
-    self.restart_cluster(vector, log_symlinks=True)
-    self.check_schema(self.LATEST_SCHEMA, vector)
+    self.restart_cluster(log_symlinks=True)
+    self.check_schema(LATEST_SCHEMA, vector)
 
-  def _run_invalid_table_prop_test(self, table, prop_name, vector, 
expect_success=False):
+  def _run_invalid_table_prop_test(self, table, prop_name, 
expect_success=False):
     """Runs a test where one of the workload management schema version table 
properties on
        a workload management table has been reset to an invalid value."""
     try:
@@ -347,7 +356,8 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
           "{}".format(table))
 
       tmp_dir = self.get_tmp_dir('invalid_schema')
-      self.restart_cluster(vector, wait_for_init_complete=False, 
cluster_size=1,
+      self.restart_cluster(
+          wait_for_init_complete=False, cluster_size=1,
           wait_for_backends=False, expect_startup_err=True, log_symlinks=True,
           additional_catalogd_opts="--minidump_path={}".format(tmp_dir),
           additional_impalad_opts="--minidump_path={}".format(tmp_dir))
@@ -362,54 +372,59 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         assert len(os.listdir("{}/catalogd".format(tmp_dir))) == 0, \
             "Found minidumps but none should exist."
     finally:
-      self.restart_cluster(vector, cluster_size=1,
-          
additional_catalogd_opts="--workload_mgmt_drop_tables={}".format(QUERY_TBL_ALL))
+      self.restart_cluster(
+          cluster_size=1,
+          additional_catalogd_opts="--workload_mgmt_drop_tables={}".format(
+              QUERY_TBL_ALL))
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, log_symlinks=True, workload_mgmt=True,
+      impalad_args="--minidump_path={invalid_schema}",
+      catalogd_args="--minidump_path={invalid_schema}",
       tmp_dir_placeholders=['invalid_schema'],
       disable_log_buffering=True)
-  def test_invalid_schema_version_log_table_prop(self, vector):
+  def test_invalid_schema_version_log_table_prop(self):
     """Tests that startup succeeds when the 'schema_version' table property on 
the
        sys.impala_query_log table contains an invalid value but the 
wm_schema_version
        table property contains a valid value."""
-    self._run_invalid_table_prop_test(QUERY_TBL_LOG, "schema_version", vector, 
True)
+    self._run_invalid_table_prop_test(QUERY_TBL_LOG, "schema_version", True)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, log_symlinks=True, workload_mgmt=True,
+      impalad_args="--minidump_path={invalid_schema}",
+      catalogd_args="--minidump_path={invalid_schema}",
       tmp_dir_placeholders=['invalid_schema'],
       disable_log_buffering=True)
-  def test_invalid_wm_schema_version_log_table_prop(self, vector):
+  def test_invalid_wm_schema_version_log_table_prop(self):
     """Tests that startup fails when the 'wm_schema_version' table property on 
the
        sys.impala_query_log table contains an invalid value."""
-    self._run_invalid_table_prop_test(QUERY_TBL_LOG, "wm_schema_version", 
vector)
+    self._run_invalid_table_prop_test(QUERY_TBL_LOG, "wm_schema_version")
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, log_symlinks=True, workload_mgmt=True,
+      impalad_args="--minidump_path={invalid_schema}",
+      catalogd_args="--minidump_path={invalid_schema}",
       tmp_dir_placeholders=['invalid_schema'],
       disable_log_buffering=True)
-  def test_invalid_schema_version_live_table_prop(self, vector):
+  def test_invalid_schema_version_live_table_prop(self):
     """Tests that startup succeeds when the 'schema_version' table property on 
the
        sys.impala_query_live table contains an invalid value but the 
wm_schema_version
        table property contains a valid value."""
-    self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "schema_version", 
vector, True)
+    self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "schema_version", True)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
-      catalogd_args="--enable_workload_mgmt --minidump_path={invalid_schema}",
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, log_symlinks=True, workload_mgmt=True,
+      impalad_args="--minidump_path={invalid_schema}",
+      catalogd_args="--minidump_path={invalid_schema}",
       tmp_dir_placeholders=['invalid_schema'],
       disable_log_buffering=True)
-  def test_invalid_wm_schema_version_live_table_prop(self, vector):
+  def test_invalid_wm_schema_version_live_table_prop(self):
     """Tests that startup fails when the 'wm_schema_version' table property on 
the
        sys.impala_query_live table contains an invalid value."""
-    self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "wm_schema_version", 
vector)
+    self._run_invalid_table_prop_test(QUERY_TBL_LIVE, "wm_schema_version")
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt")
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, disable_log_buffering=True, workload_mgmt=True)
   def test_upgrade_to_latest_from_previous_binary(self, vector):
     """Simulated an upgrade situation from workload management tables created 
by previous
        builds of Impala."""
@@ -423,9 +438,10 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         create_sql = f.read()
         assert self.client.execute(create_sql).success
 
-    self.restart_cluster(vector, cluster_size=1, log_symlinks=True,
-        additional_impalad_opts="--query_log_write_interval_s=30")
-    self.check_schema(self.LATEST_SCHEMA, vector)
+    self.restart_cluster(
+      cluster_size=1, log_symlinks=True,
+      additional_impalad_opts="--query_log_write_interval_s=30")
+    self.check_schema(LATEST_SCHEMA, vector)
 
     # Run a query and ensure it does not populate fields from the latest 
schema.
     res = self.client.execute("select * from functional.alltypes")
@@ -441,10 +457,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         "impala-server.completed-queries.written", 2, 60)
     assert_query(QUERY_TBL_LOG, self.client, impalad=impalad, 
query_id=res.query_id)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt")
-  def test_start_at_1_0_0(self, vector):
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, disable_log_buffering=True, workload_mgmt=True)
+  def test_start_at_1_0_0(self):
     """Tests the situation where workload management tables were created by 
the original
        workload management code, and the current code is started at workload 
management
        schema version 1.0.0 (even though that version is not the latest)."""
@@ -458,8 +473,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         create_sql = f.read()
         assert self.client.execute(create_sql).success
 
-    self.restart_cluster(vector, schema_version="1.0.0", log_symlinks=True,
-        additional_impalad_opts="--query_log_write_interval_s=15")
+    self.restart_cluster(
+      schema_version="1.0.0", log_symlinks=True,
+      additional_impalad_opts="--query_log_write_interval_s=15")
 
     for table in (QUERY_TBL_LOG, QUERY_TBL_LIVE):
       self.assert_table_prop(table, "schema_version", "1.0.0")
@@ -491,11 +507,10 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     data = log_results.data[0].split("\t")
     assert len(data) == len(log_results.column_labels)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt",
+  @CustomClusterTestSuite.with_args(
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
       start_args="--enable_statestored_ha",
-      disable_log_buffering=True, log_symlinks=True)
+      disable_log_buffering=True, log_symlinks=True, workload_mgmt=True)
   def test_statestore_ha(self):
     """Asserts workload management initialization completes successfully when 
statestore
        ha is enabled."""
@@ -514,12 +529,17 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
   def setup_method(self, method):
     super(TestWorkloadManagementInitNoWait, self).setup_method(method)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=3",
-      catalogd_args="--enable_workload_mgmt "
-                    "--workload_mgmt_drop_tables={} "
+  def teardown_method(self, method):
+    self.wait_for_wm_idle()
+    super(TestWorkloadManagementInitNoWait, self).teardown_method(method)
+
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, log_symlinks=True,
+      impalad_args="--query_log_write_interval_s=3",
+      catalogd_args="--workload_mgmt_drop_tables={} "
                     "--debug_actions=CATALOG_WORKLOADMGMT_STARTUP:SLEEP@15000"
                     .format(QUERY_TBL_ALL),
+      workload_mgmt=True,
       disable_log_buffering=True)
   def test_catalog_init_delay(self):
     # Workload management init is slightly delayed after catalogd startup, 
wait for the
@@ -543,12 +563,14 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
     assert res.success
     impalad.wait_for_metric_value("impala-server.completed-queries.written", 
1, 15)
 
-  @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True,
+  @CustomClusterTestSuite.with_args(
+      cluster_size=1, expect_startup_fail=True,
       impalad_timeout_s=60, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo "
+      impalad_args="--workload_mgmt_schema_version=foo "
                    "--minidump_path={minidumps}",
-      catalogd_args="--enable_workload_mgmt --workload_mgmt_schema_version=foo 
"
+      catalogd_args="--workload_mgmt_schema_version=foo "
                     "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'],
+      workload_mgmt=True,
       disable_log_buffering=True)
   def test_start_invalid_version(self):
     """Asserts that starting a cluster with an invalid workload management 
version
@@ -563,10 +585,11 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
 
   @CustomClusterTestSuite.with_args(cluster_size=1, expect_startup_fail=True,
       impalad_timeout_s=60, log_symlinks=True,
-      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=0.0.1 "
+      impalad_args="--workload_mgmt_schema_version=0.0.1 "
                    "--minidump_path={minidumps}",
-      catalogd_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=0.0.1 "
+      catalogd_args="--workload_mgmt_schema_version=0.0.1 "
                     "--minidump_path={minidumps}", 
tmp_dir_placeholders=['minidumps'],
+      workload_mgmt=True,
       disable_log_buffering=True)
   def test_start_unknown_version(self):
     """Asserts that starting a cluster with an unknown workload management 
version errors.
@@ -579,7 +602,8 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
     self.assert_catalogd_log_contains("FATAL", r"Workload management schema 
version "
         r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0', 
'1.2.0'$")
 
-  @CustomClusterTestSuite.with_args(start_args="--enable_catalogd_ha",
+  @CustomClusterTestSuite.with_args(
+      start_args="--enable_catalogd_ha",
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
       disable_log_buffering=True, log_symlinks=True)
   def test_catalog_ha_no_workload_mgmt(self):
@@ -628,10 +652,14 @@ class 
TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase):
         r"Skipping workload management initialization since catalogd HA is 
enabled and "
         r"this catalogd is not active", node_index=self.standby_catalog)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt", 
start_args="--enable_catalogd_ha",
+  def teardown_method(self, method):
+    self.wait_for_wm_idle()
+    super(TestWorkloadManagementCatalogHA, self).teardown_method(method)
+
+  @CustomClusterTestSuite.with_args(
+      start_args="--enable_catalogd_ha",
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
-      disable_log_buffering=True, log_symlinks=True)
+      disable_log_buffering=True, log_symlinks=True, workload_mgmt=True)
   def test_catalog_ha_failover(self):
     """Asserts workload management initialization is not run a second time 
when catalogd
        failover happens."""
@@ -652,11 +680,10 @@ class 
TestWorkloadManagementCatalogHA(TestWorkloadManagementInitBase):
     self.assert_catalogd_log_contains("INFO", r"Starting workload management "
         r"initialization", expected_count=0, node_index=self.standby_catalog)
 
-  @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt",
-      catalogd_args="--enable_workload_mgmt",
+  @CustomClusterTestSuite.with_args(
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
       start_args="--enable_catalogd_ha --enable_statestored_ha",
-      disable_log_buffering=True, log_symlinks=True)
+      disable_log_buffering=True, log_symlinks=True, workload_mgmt=True)
   def test_catalog_statestore_ha(self):
     """Asserts workload management initialization is only done on the active 
catalogd
        when both catalog and statestore ha is enabled."""
diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py 
b/tests/custom_cluster/test_workload_mgmt_sql_details.py
index cddb18fd5..7938fc03c 100644
--- a/tests/custom_cluster/test_workload_mgmt_sql_details.py
+++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py
@@ -21,11 +21,15 @@ import os
 
 from SystemTables.ttypes import TQueryTableColumn
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from test_query_log import TestQueryLogTableBase
+from tests.common.test_dimensions import hs2_client_protocol_dimension
+from tests.common.wm_test_suite import WorkloadManagementTestSuite
 from tests.util.workload_management import assert_csv_col, QUERY_TBL_LOG
 
 
-class TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
[email protected]_args(
+  cluster_size=1, disable_log_buffering=True, workload_mgmt=True,
+  impalad_args="--query_log_max_queued=1")
+class TestWorkloadManagementSQLDetails(WorkloadManagementTestSuite):
   """Tests that ensure the workload management data describing the details of 
the sql
      statement values (columns tables_queried, select_columns, where_columns,
      join_columns, aggregate_columns, and orderby_columns) in the completed
@@ -37,12 +41,7 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
   @classmethod
   def add_test_dimensions(cls):
     super(TestWorkloadManagementSQLDetails, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value("protocol") == "beeswax")
-
-  def setup_method(self, method):
-    super(TestWorkloadManagementSQLDetails, self).setup_method(method)
-    self.wait_for_wm_init_complete()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
     # Assert tables queried.
   def _assert_all(self, vector, query, expected_tables_queried, 
expected_select_cols,
@@ -60,7 +59,7 @@ class TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
 
     # Wait for the query to be written to the completed queries table.
     self.cluster.get_first_impalad().service.wait_for_metric_value(
-        "impala-server.completed-queries.written", 1, 60)
+        "impala-server.completed-queries.queued", 0, 60)
 
     # Assert tables queried.
     assert_csv_col(client, QUERY_TBL_LOG, TQueryTableColumn.TABLES_QUERIED, 
res.query_id,
@@ -101,10 +100,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
     self._assert_all(vector, query_text, expected_tables_queried, 
expected_select,
         expected_where, expected_join, expected_aggregate, expected_orderby)
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_1(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 1.
@@ -134,10 +129,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["customer.c_customer_id"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_3(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 3.
@@ -165,10 +156,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["date_dim.d_year", "store_sales.ss_ext_sales_price", 
"item.i_brand_id"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_51(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 51.
@@ -198,10 +185,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["web_sales.ws_item_sk", "store_sales.ss_item_sk", "date_dim.d_date"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_62(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 62.
@@ -232,10 +215,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["warehouse.w_warehouse_name", "ship_mode.sm_type", 
"web_site.web_name"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_64(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 64.
@@ -303,10 +282,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["item.i_product_name", "store.s_store_name", 
"store_sales.ss_wholesale_cost"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_66(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 66.
@@ -349,10 +324,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         # Expected order by columns.
         ["warehouse.w_warehouse_name"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_tpcds_75(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected for TPCDS query 75.
@@ -404,10 +375,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
          "web_returns.wr_return_amt", "web_returns.wr_return_quantity",
          "web_sales.ws_ext_sales_price", "web_sales.ws_quantity"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_sql_having(self, vector):
     """Asserts the values inserted into the workload management table columns 
match
        what is expected when the sql contains both a group by and a having 
clause that
@@ -422,10 +389,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         ["store_sales.ss_item_sk", "store_sales.ss_quantity"],
         ["store_sales.ss_quantity"])
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_complex_types(self, vector):
     """Asserts that queries using subtypes of complex type columns only report 
the column
        name and do not also report the subtype."""
@@ -439,10 +402,6 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         [],
         "functional")
 
-  @CustomClusterTestSuite.with_args(
-      cluster_size=1, impalad_graceful_shutdown=True, 
disable_log_buffering=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
   def test_arithmetic(self, vector):
     """Asserts that arithmetic operations record the columns used in them."""
     self._assert_all(vector, "select (tinyint_col + smallint_col), 
sum(float_col) from "
@@ -456,10 +415,17 @@ class 
TestWorkloadManagementSQLDetails(TestQueryLogTableBase):
         [],
         "functional")
 
+
+class TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite):
+  """Variant of TestWorkloadManagementSQLDetails using calcite planner."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestWorkloadManagementSQLDetailsCalcite, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
+
   @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true",
-      cluster_size=1, impalad_graceful_shutdown=True,
-      impalad_args="--enable_workload_mgmt --query_log_write_interval_s=1",
-      catalogd_args="--enable_workload_mgmt")
+                                    cluster_size=1, workload_mgmt=True)
   def test_tpcds_8_decimal(self, vector):
     """Runs the tpcds-decimal_v2-q8 query using the calcite planner and 
asserts the query
        completes successfully. See IMPALA-13505 for details on why this query 
in

Reply via email to