This is an automated email from the ASF dual-hosted git repository.

prozsa pushed a commit to branch branch-4.5.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b47830268949e227b1663158988f291a9b4ed277
Author: jasonmfehr <jf...@cloudera.com>
AuthorDate: Tue Jan 14 15:24:48 2025 -0800

    IMPALA-13201: System Table Queries Execute When Admission Queues are Full
    
    Queries that run only against in-memory system tables are currently
    subject to the same admission control process as all other queries.
    Since these queries do not use any resources on executors, admission
    control does not need to consider the state of executors when
    deciding to admit these queries.
    
    This change adds a boolean configuration option 'onlyCoordinators'
    to the fair-scheduler.xml file for specifying a request pool only
    applies to the coordinators. When a query is submitted to a
    coordinator only request pool, then no executors are required to be
    running. Instead, all fragment instances are executed exclusively on
    the coordinators.
    
    A new member was added to the ClusterMembershipMgr::Snapshot struct
    to hold the ExecutorGroup of all coordinators. This object is kept up
    to date by processing statestore messages and is used when executing
    queries that either require the coordinators (such as queries against
    sys.impala_query_live) or that use an only coordinators request pool.
    
    Testing was accomplished by:
    1. Adding cluster membership manager ctests to assert cluster
       membership manager correctly builds the list of non-quiescing
       coordinators.
    2. RequestPoolService JUnit tests to assert the new optional
       <onlyCoords> config in the fair scheduler xml file is correctly
       parsed.
    3. ExecutorGroup ctests modified to assert the new function.
    4. Custom cluster admission controller tests to assert queries with a
       coordinator only request pool only run on the active coordinators.
    
    Change-Id: I5e0e64db92bdbf80f8b5bd85d001ffe4c8c9ffda
    Reviewed-on: http://gerrit.cloudera.org:8080/22249
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/scheduling/admission-controller.cc          |  12 +-
 be/src/scheduling/cluster-membership-mgr-test.cc   |  52 ++++-
 be/src/scheduling/cluster-membership-mgr.cc        |  88 +++++---
 be/src/scheduling/cluster-membership-mgr.h         |   8 +-
 be/src/scheduling/cluster-membership-test-util.cc  |  13 ++
 be/src/scheduling/cluster-membership-test-util.h   |   5 +
 be/src/scheduling/executor-group-test.cc           |  32 ++-
 be/src/scheduling/executor-group.cc                |  13 ++
 be/src/scheduling/executor-group.h                 |   6 +
 be/src/service/client-request-state.cc             |   2 +-
 be/src/util/webserver.cc                           |   4 +-
 bin/start-impala-cluster.py                        |   4 +-
 common/thrift/ImpalaInternalService.thrift         |   6 +
 docs/topics/impala_admission_config.xml            |  43 +++-
 .../java/org/apache/impala/service/Frontend.java   |  29 ++-
 .../org/apache/impala/util/RequestPoolService.java |   2 +
 .../apache/impala/util/TestRequestPoolService.java |  12 +-
 .../test/resources/fair-scheduler-onlycoords.xml   |  25 +++
 fe/src/test/resources/fair-scheduler-test.xml      |   2 +
 fe/src/test/resources/llama-site-onlycoords.xml    |  66 ++++++
 .../scheduler/fair/AllocationConfiguration.java    |  10 +
 .../fair/AllocationFileLoaderService.java          |  58 ++---
 tests/common/custom_cluster_test_suite.py          |  57 +++--
 tests/common/impala_cluster.py                     |   8 +
 tests/common/impala_test_suite.py                  |  31 +++
 tests/custom_cluster/test_admission_controller.py  | 234 +++++++++++++++++++++
 26 files changed, 719 insertions(+), 103 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 4a75ef382..c25ebbfdc 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -2250,8 +2250,14 @@ Status AdmissionController::ComputeGroupScheduleStates(
   }
   const BackendDescriptorPB& coord_desc = it->second;
 
-  vector<const ExecutorGroup*> executor_groups =
-      GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request);
+  vector<const ExecutorGroup*> executor_groups;
+
+  if (UNLIKELY(queue_node->pool_cfg.only_coordinators)) {
+    executor_groups = {&membership_snapshot->all_coordinators};
+  } else {
+    executor_groups =
+        GetExecutorGroupsForQuery(membership_snapshot->executor_groups, 
request);
+  }
 
   if (executor_groups.empty()) {
     queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS;
@@ -2261,7 +2267,7 @@ Status AdmissionController::ComputeGroupScheduleStates(
 
   // Collect all coordinators if needed for the request.
   ExecutorGroup coords = request.request.include_all_coordinators ?
-      membership_snapshot->GetCoordinators() : 
ExecutorGroup("all-coordinators");
+      membership_snapshot->all_coordinators : 
ExecutorGroup("all-coordinators");
 
   // We loop over the executor groups in a deterministic order. If
   // --balance_queries_across_executor_groups set to true, executor groups 
with more
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc 
b/be/src/scheduling/cluster-membership-mgr-test.cc
index 625c9e8f0..f67d508d3 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -15,8 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <memory>
 #include <deque>
+#include <memory>
+#include <sstream>
+#include <vector>
 
 #include "common/logging.h"
 #include "common/names.h"
@@ -272,9 +274,38 @@ class ClusterMembershipMgrTest : public testing::Test {
   }
 };
 
+/// Asserts a provided list of expected hostnames are in the all_coordinators
+/// ExecutorGroup of the provided ClusterMembershipMgr.
+void _assertCoords(
+    ClusterMembershipMgr& cmm, const std::vector<const char*> 
expected_hostnames) {
+  ExecutorGroup::Executors actual_coords =
+    cmm.GetSnapshot()->all_coordinators.GetAllExecutorDescriptors();
+
+  std::ostringstream actual_hostnames;
+  for (const auto& actual : actual_coords) {
+    actual_hostnames << " " << actual.address().hostname();
+  }
+
+  ASSERT_EQ(expected_hostnames.size(), actual_coords.size())
+      << "Actual hostnames:" << actual_hostnames.str();
+
+  for (auto expected : expected_hostnames) {
+    bool found = false;
+    for (const auto& actual : actual_coords) {
+      if (actual.address().hostname() == expected) {
+        found = true;
+        break;
+      }
+    }
+
+    EXPECT_TRUE(found) << "did not find expected coordinator '" << expected
+                       << "' in actual coordinators:" << 
actual_hostnames.str();
+  }
+}
+
 /// This test takes two instances of the ClusterMembershipMgr through a common 
lifecycle.
 /// It also serves as an example for how to craft statestore messages and pass 
them to
-/// UpdaUpdateMembership().
+/// UpdateMembership().
 TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   auto b1 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(1));
   auto b2 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(2));
@@ -306,6 +337,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
 
   // First manager now has one BE
   ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size());
+  _assertCoords(cmm1, {"host_1"});
+  _assertCoords(cmm2, {});
 
   // Hook up second callback and iterate with the result of the first manager
   cmm2.SetLocalBeDescFn([b2]() { return b2; });
@@ -314,6 +347,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
   ASSERT_EQ(1, returned_topic_deltas.size());
   ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
+  _assertCoords(cmm1, {"host_1"});
+  _assertCoords(cmm2, {"host_1", "host_2"});
 
   // Send the returned update to the first manager, this time no deltas will 
be returned
   *ss_topic_delta = returned_topic_deltas[0];
@@ -321,6 +356,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas);
   ASSERT_EQ(0, returned_topic_deltas.size());
   ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
+  _assertCoords(cmm1, {"host_1", "host_2"});
+  _assertCoords(cmm2, {"host_1", "host_2"});
 
   // Both managers now have the same state. Shutdown one of them and step 
through
   // propagating the update.
@@ -334,6 +371,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   // It will also remove itself from the executor group (but not the current 
backends).
   ASSERT_EQ(1, GetDefaultGroupSize(cmm1));
   ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
+  _assertCoords(cmm1, {"host_1", "host_2"});
+  _assertCoords(cmm2, {"host_1", "host_2"});
 
   // Propagate the quiescing to the 2nd mgr
   *ss_topic_delta = returned_topic_deltas[0];
@@ -343,6 +382,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   ASSERT_EQ(0, returned_topic_deltas.size());
   ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
   ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
+  _assertCoords(cmm1, {"host_1", "host_2"});
+  _assertCoords(cmm2, {"host_2"});
 
   // Delete the 1st backend from the 2nd one
   ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
@@ -351,6 +392,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   ASSERT_EQ(0, returned_topic_deltas.size());
   ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
   ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
+  _assertCoords(cmm1, {"host_1", "host_2"});
+  _assertCoords(cmm2, {"host_2"});
 }
 
 TEST_F(ClusterMembershipMgrTest, IsBlacklisted) {
@@ -447,6 +490,11 @@ TEST_F(ClusterMembershipMgrTest, ExecutorBlacklist) {
   DeleteBackend(backends_[2].get());
   EXPECT_EQ(NUM_BACKENDS - 1, 
backends_[0]->cmm->GetSnapshot()->current_backends.size());
   EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm));
+
+  // Assert blacklisting executors does not impact the all_coordinators 
ExecutorGroup.
+  // host_1 was quiesced and host_2 was deleted, both actions do impact 
all_coordinators.
+  _assertCoords(*backends_[0]->cmm, {"host_0"});
+  _assertCoords(*backends_[1]->cmm, {"host_0", "host_1"});
 }
 
 // This test runs a group of 20 backends through their full lifecycle, 
validating that
diff --git a/be/src/scheduling/cluster-membership-mgr.cc 
b/be/src/scheduling/cluster-membership-mgr.cc
index e48ea98c3..ab8bf9a61 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -170,21 +170,11 @@ ClusterMembershipMgr::SnapshotPtr 
ClusterMembershipMgr::GetSnapshot() const {
   return state;
 }
 
-static bool is_active_coordinator(const BackendDescriptorPB& be) {
+static inline bool is_active_coordinator(const BackendDescriptorPB& be) {
   return be.has_is_coordinator() && be.is_coordinator() &&
       !(be.has_is_quiescing() && be.is_quiescing());
 }
 
-ExecutorGroup ClusterMembershipMgr::Snapshot::GetCoordinators() const {
-  ExecutorGroup coordinators("all-coordinators");
-  for (const auto& it : current_backends) {
-    if (is_active_coordinator(it.second)) {
-      coordinators.AddExecutor(it.second);
-    }
-  }
-  return coordinators;
-}
-
 vector<TNetworkAddress> 
ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses() const {
   vector<TNetworkAddress> coordinators;
   for (const auto& it : current_backends) {
@@ -197,6 +187,19 @@ vector<TNetworkAddress> 
ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses(
   return coordinators;
 }
 
+static inline void _removeCoordIfExists(
+    const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state,
+    const BackendDescriptorPB& be) {
+
+  // The BackendDescriptorPB may be incomplete. Use the backend id to retrieve 
the actual
+  // backend descriptor so the backend can be removed.
+  const BackendDescriptorPB* actual_be =
+      state->all_coordinators.LookUpBackendDesc(be.backend_id());
+  if (actual_be != nullptr) {
+    state->all_coordinators.RemoveExecutor(*actual_be);
+  }
+}
+
 void ClusterMembershipMgr::UpdateMembership(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
@@ -298,6 +301,11 @@ void ClusterMembershipMgr::UpdateMembership(
           }
         }
         new_backend_map->erase(item.key);
+
+        // If a coordinator is not shutdown gracefully, then it will be 
deleted here.
+        if (be_desc.is_coordinator()) {
+          _removeCoordIfExists(new_state, be_desc);
+        }
       }
       continue;
     }
@@ -346,32 +354,45 @@ void ClusterMembershipMgr::UpdateMembership(
       if (existing.is_quiescing()) DCHECK(be_desc.is_quiescing());
 
       // If the node starts quiescing
-      if (be_desc.is_quiescing() && !existing.is_quiescing() && 
existing.is_executor()) {
-        // If the backend starts quiescing and it is present in the blacklist, 
remove it
-        // from the blacklist. If the backend is present in the blacklist, 
there is no
-        // need to remove it from the executor group because it has already 
been removed
-        bool blacklisted = new_blacklist->FindAndRemove(be_desc)
-            == ExecutorBlacklist::State::BLACKLISTED;
-        if (blacklisted) {
-          VLOG(1) << "Removing backend " << item.key << " from blacklist 
(quiescing)";
-          DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
-        } else {
-          // Executor needs to be removed from its groups
-          for (const auto& group : be_desc.executor_groups()) {
-            VLOG(1) << "Removing backend " << item.key << " from group "
-                    << group.DebugString() << " (quiescing)";
-            RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
+      if (be_desc.is_quiescing() && !existing.is_quiescing()) {
+        if (existing.is_executor()) {
+          // If the backend starts quiescing and it is present in the 
blacklist, remove it
+          // from the blacklist. If the backend is present in the blacklist, 
there is no
+          // need to remove it from the executor group because it has already 
been removed
+          bool blacklisted = new_blacklist->FindAndRemove(be_desc)
+              == ExecutorBlacklist::State::BLACKLISTED;
+          if (blacklisted) {
+            VLOG(1) << "Removing backend " << item.key << " from blacklist 
(quiescing)";
+            DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
+          } else {
+            // Executor needs to be removed from its groups
+            for (const auto& group : be_desc.executor_groups()) {
+              VLOG(1) << "Removing backend " << item.key << " from group "
+                      << group.DebugString() << " (quiescing)";
+              RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
+            }
           }
         }
+
+        if (existing.is_coordinator()) {
+          _removeCoordIfExists(new_state, be_desc);
+        }
       }
       existing = be_desc;
     } else {
       // Create
       new_backend_map->insert(make_pair(item.key, be_desc));
-      if (!be_desc.is_quiescing() && be_desc.is_executor()) {
-        for (const auto& group : be_desc.executor_groups()) {
-          VLOG(1) << "Adding backend " << item.key << " to group " << 
group.DebugString();
-          FindOrInsertExecutorGroup(group, 
new_executor_groups)->AddExecutor(be_desc);
+      if (!be_desc.is_quiescing()) {
+        if (be_desc.is_executor()) {
+          for (const auto& group : be_desc.executor_groups()) {
+            VLOG(1) << "Adding backend " << item.key << " to group " <<
+                group.DebugString();
+            FindOrInsertExecutorGroup(group, 
new_executor_groups)->AddExecutor(be_desc);
+          }
+        }
+
+        if (is_active_coordinator(be_desc)) {
+          new_state->all_coordinators.AddExecutor(be_desc);
         }
       }
       // Since this backend is new, it cannot already be on the blacklist or 
probation.
@@ -415,6 +436,13 @@ void ClusterMembershipMgr::UpdateMembership(
         }
       }
     }
+
+    // Add ourself to the list of all coordinators.
+    if (is_active_coordinator(*local_be_desc.get())) {
+      _removeCoordIfExists(new_state, *local_be_desc);
+      new_state->all_coordinators.AddExecutor(*local_be_desc);
+    }
+
     AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
     DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, 
*new_blacklist));
   }
diff --git a/be/src/scheduling/cluster-membership-mgr.h 
b/be/src/scheduling/cluster-membership-mgr.h
index 37e42eced..848067b47 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -86,10 +86,8 @@ class ClusterMembershipMgr {
   // Clients can obtain an immutable copy. Class instances can be created 
through the
   // implicitly-defined default and copy constructors.
   struct Snapshot {
-    Snapshot() = default;
+    Snapshot() : all_coordinators("all-coordinators") {};
     Snapshot(const Snapshot&) = default;
-    /// Returns an executor group of all non-quiescing coordinators in the 
cluster.
-    ExecutorGroup GetCoordinators() const;
     /// Returns the addresses of all non-quiescing coordinators in the cluster.
     std::vector<TNetworkAddress> GetCoordinatorAddresses() const;
     /// The current backend descriptor of the local backend.
@@ -111,6 +109,10 @@ class ClusterMembershipMgr {
     /// The version of this Snapshot. It is incremented every time the cluster 
membership
     /// changes.
     int64_t version = 0;
+
+    // Executor group of all non-quiescing coordinators in the cluster. Set 
during the
+    // SetState() function.
+    ExecutorGroup all_coordinators;
   };
 
   /// An immutable shared membership snapshot.
diff --git a/be/src/scheduling/cluster-membership-test-util.cc 
b/be/src/scheduling/cluster-membership-test-util.cc
index b34d03301..f3b04bb9b 100644
--- a/be/src/scheduling/cluster-membership-test-util.cc
+++ b/be/src/scheduling/cluster-membership-test-util.cc
@@ -81,5 +81,18 @@ BackendDescriptorPB MakeBackendDescriptor(int idx, int 
port_offset,
   return MakeBackendDescriptor(idx, group_desc, port_offset, admit_mem_limit);
 }
 
+void AssertLookupById(const BackendDescriptorPB& exec1, const 
BackendDescriptorPB& exec2,
+    const ExecutorGroup& group) {
+  const BackendDescriptorPB* actual_exec1 = 
group.LookUpBackendDesc(exec1.backend_id());
+  ASSERT_NE(nullptr, actual_exec1);
+  ASSERT_EQ(exec1.address().hostname(), actual_exec1->address().hostname());
+  ASSERT_EQ(exec1.address().port(), actual_exec1->address().port());
+
+  const BackendDescriptorPB* actual_exec2 = 
group.LookUpBackendDesc(exec2.backend_id());
+  ASSERT_NE(nullptr, actual_exec2);
+  ASSERT_EQ(exec2.address().hostname(), actual_exec2->address().hostname());
+  ASSERT_EQ(exec2.address().port(), actual_exec2->address().port());
+}
+
 }  // end namespace test
 }  // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-test-util.h 
b/be/src/scheduling/cluster-membership-test-util.h
index 63a016bda..a8dfdfe19 100644
--- a/be/src/scheduling/cluster-membership-test-util.h
+++ b/be/src/scheduling/cluster-membership-test-util.h
@@ -55,5 +55,10 @@ BackendDescriptorPB MakeBackendDescriptor(
 BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset = 0,
     int64_t admit_mem_limit = 4L * MEGABYTE);
 
+/// Assert the LookupBackendDesc() function returns correct results when 
passed a
+/// backend id.
+void AssertLookupById(const BackendDescriptorPB& exec1, const 
BackendDescriptorPB& exec2,
+    const ExecutorGroup& group);
+
 }  // end namespace test
 }  // end namespace impala
diff --git a/be/src/scheduling/executor-group-test.cc 
b/be/src/scheduling/executor-group-test.cc
index e2f926c48..9a6f9cadc 100644
--- a/be/src/scheduling/executor-group-test.cc
+++ b/be/src/scheduling/executor-group-test.cc
@@ -31,12 +31,17 @@ using namespace impala::test;
 TEST(ExecutorGroupTest, AddExecutors) {
   ExecutorGroup group1("group1");
   ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
+
   int64_t mem_limit_admission1 = 100L * MEGABYTE;
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
-      mem_limit_admission1));
+  BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* 
port_offset=*/0,
+      mem_limit_admission1);
+  group1.AddExecutor(exec1);
+
   int64_t mem_limit_admission2 = 120L * MEGABYTE;
-  group1.AddExecutor(MakeBackendDescriptor(2, group1, /* port_offset=*/0,
-      mem_limit_admission2));
+  BackendDescriptorPB exec2 = MakeBackendDescriptor(2, group1, /* 
port_offset=*/0,
+      mem_limit_admission2);
+  group1.AddExecutor(exec2);
+
   ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission());
   ASSERT_EQ(2, group1.NumExecutors());
   IpAddr backend_ip;
@@ -44,23 +49,32 @@ TEST(ExecutorGroupTest, AddExecutors) {
   EXPECT_EQ("10.0.0.1", backend_ip);
   ASSERT_TRUE(group1.LookUpExecutorIp("host_2", &backend_ip));
   EXPECT_EQ("10.0.0.2", backend_ip);
+
+  AssertLookupById(exec1, exec2, group1);
 }
 
 /// Test adding multiple backends on the same host.
 TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
   ExecutorGroup group1("group1");
+
   int64_t mem_limit_admission1 = 120L * MEGABYTE;
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0,
-      mem_limit_admission1));
+  const BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* 
port_offset=*/0,
+      mem_limit_admission1);
+  group1.AddExecutor(exec1);
+
   int64_t mem_limit_admission2 = 100L * MEGABYTE;
-  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1,
-      mem_limit_admission2));
+  const BackendDescriptorPB exec2 = MakeBackendDescriptor(1, group1, /* 
port_offset=*/1,
+      mem_limit_admission2);
+  group1.AddExecutor(exec2);
+
   ASSERT_EQ(mem_limit_admission2, group1.GetPerExecutorMemLimitForAdmission());
   IpAddr backend_ip;
   ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
   const ExecutorGroup::Executors& backend_list = 
group1.GetExecutorsForHost("10.0.0.1");
   EXPECT_EQ(2, backend_list.size());
+
+  AssertLookupById(exec1, exec2, group1);
 }
 
 /// Test removing a backend.
@@ -112,6 +126,8 @@ TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) {
   EXPECT_EQ(1, backend_list.size());
   group1.RemoveExecutor(executor1);
   ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission());
+
+  ASSERT_EQ(nullptr, group1.LookUpBackendDesc(executor2.backend_id()));
 }
 
 /// Test that exercises the size-based group health check.
diff --git a/be/src/scheduling/executor-group.cc 
b/be/src/scheduling/executor-group.cc
index b99eeed86..d50ea1344 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -196,6 +196,19 @@ const BackendDescriptorPB* 
ExecutorGroup::LookUpBackendDesc(
   return nullptr;
 }
 
+const BackendDescriptorPB* ExecutorGroup::LookUpBackendDesc(
+    const UniqueIdPB& be_id) const {
+  for (const auto& executor_list : executor_map_) {
+    for (const auto& backend : executor_list.second){
+      if (backend.backend_id().hi() == be_id.hi()
+          && backend.backend_id().lo() == be_id.lo()) {
+        return &backend;
+      }
+    }
+  }
+  return nullptr;
+}
+
 int ExecutorGroup::NumExecutors() const {
   int count = 0;
   for (const auto& executor_list : executor_map_) count += 
executor_list.second.size();
diff --git a/be/src/scheduling/executor-group.h 
b/be/src/scheduling/executor-group.h
index 1e390e020..f3fde1d19 100644
--- a/be/src/scheduling/executor-group.h
+++ b/be/src/scheduling/executor-group.h
@@ -96,6 +96,12 @@ class ExecutorGroup {
   /// change while it holds the pointer.
   const BackendDescriptorPB* LookUpBackendDesc(const NetworkAddressPB& host) 
const;
 
+  /// Looks up the backend descriptor for the executor with an id matching the 
provided
+  /// executor id. Returns nullptr if no executor is found. The returned 
descriptor should
+  /// not be retained beyond the lifetime of this ExecutorGroup and the caller 
must make
+  /// sure that the group does not change while it holds the pointer.
+  const BackendDescriptorPB* LookUpBackendDesc(const UniqueIdPB& be_id) const;
+
   /// Returns the hash ring associated with this executor group. It's owned by 
the group
   /// and the caller must not hold a reference beyond the groups lifetime.
   const HashRing* GetHashRing() const { return &executor_ip_hash_ring_; }
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 20cc7eca7..22bc65b40 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -2504,7 +2504,7 @@ Status ClientRequestState::TryKillQueryRemotely(
   // coordinator in the cluster, it will be the status to return.
   Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, 
PrintId(query_id));
   ExecutorGroup all_coordinators =
-      
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->GetCoordinators();
+      
ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators;
   // Skipping the current impalad.
   unique_ptr<ExecutorGroup> 
other_coordinators{ExecutorGroup::GetFilteredExecutorGroup(
       &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})};
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 8471cb545..6c9e4abf8 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -714,11 +714,11 @@ sq_callback_result_t 
Webserver::BeginRequestCallback(struct sq_connection* conne
         }
         if (!authenticated) {
           if (use_jwt_) {
-            LOG(INFO) << "Invalid JWT token provided: " << bearer_token;
+            LOG(INFO) << "Invalid JWT token provided";
             total_jwt_token_auth_failure_->Increment(1);
           }
           if (use_oauth_) {
-            LOG(INFO) << "Invalid OAuth token provided: " << bearer_token;
+            LOG(INFO) << "Invalid OAuth token provided";
             total_oauth_token_auth_failure_->Increment(1);
           }
         }
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index cbab2c9bc..9f56eeeb2 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -1147,7 +1147,7 @@ if __name__ == "__main__":
                                  use_exclusive_coordinators, 
existing_cluster_size)
       expected_cluster_size += existing_cluster_size
     elif options.add_impalads:
-      cluster_ops.start_impalads(options.cluster_size, 
options.num_coordinators,
+      cluster_ops.start_impalads(options.num_coordinators, 
options.num_coordinators,
                                  options.use_exclusive_coordinators,
                                  existing_cluster_size)
       expected_cluster_size += existing_cluster_size
@@ -1171,7 +1171,9 @@ if __name__ == "__main__":
     if options.add_impalads:
       # TODO: This is a hack to make the waiting logic work. We'd better add a 
dedicated
       # option for adding a new cluster using the existing catalogd and 
statestore.
+      # https://issues.apache.org/jira/browse/IMPALA-13755
       expected_num_ready_impalads = options.cluster_size
+      expected_cluster_size = options.cluster_size
     impala_cluster.wait_until_ready(expected_cluster_size, 
expected_num_ready_impalads)
   except Exception as e:
     LOG.exception("Error starting cluster")
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 77941eb7a..c01e537a4 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -232,6 +232,12 @@ struct TPoolConfig {
   // If a rule for the user is not present in user_query_limits, then these 
rules
   // are evaluated, if the user is a member of a group.
   13: required map<string, i32> group_query_limits
+
+  // Specifies the state of the onlyCoordinators configuration element for a 
request pool.
+  // When request pools are configured, they can be specified as being only 
coordinators
+  // which means that pool only considers resources from the coordinator nodes 
and queries
+  // for that pool schedule fragment instances on coordinator nodes only.
+  14: optional bool only_coordinators
 }
 
 struct TParseDateStringResult {
diff --git a/docs/topics/impala_admission_config.xml 
b/docs/topics/impala_admission_config.xml
index a1b179c13..c3c686d85 100644
--- a/docs/topics/impala_admission_config.xml
+++ b/docs/topics/impala_admission_config.xml
@@ -144,10 +144,10 @@ 
impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
         <p>
           Here are sample <filepath>fair-scheduler.xml</filepath> and
           <filepath>llama-site.xml</filepath> files that define resource pools
-          <codeph>root.default</codeph>, <codeph>root.development</codeph>, and
-          <codeph>root.production</codeph>. These files define resource pools 
for Impala
-          admission control and are separate from the similar
-          <codeph>fair-scheduler.xml</codeph>that defines resource pools for 
YARN.
+          <codeph>root.default</codeph>, <codeph>root.development</codeph>,
+          <codeph>root.production</codeph>, and <codeph>root.coords</codeph>.
+          These files define resource pools for Impala admission control and 
are separate
+          from the similar <codeph>fair-scheduler.xml</codeph>that defines 
resource pools for YARN.
         </p>
 
         <p>
@@ -173,6 +173,36 @@ 
impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
           to those pools.
         </p>
 
+        <p>
+          The <codeph>&lt;onlyCoordinators&gt;</codeph> element is a boolean 
that defaults
+          to <codeph>false</codeph>. If this value is set to 
<codeph>true</codeph>, the
+          named request pool will contain only the coordinators and none of 
the executors.
+          The main purpose of this setting is to enable running queries 
against the
+          <codeph>sys.impala_query_live</codeph> table from workload 
management. Since the
+          data for this table is stored in the memory of the coordinators, the 
executors
+          do not need to be involved if the query only selects from this table.
+        </p>
+
+        <p>
+          To use an <codeph>&lt;onlyCoordinators&gt;</codeph> request pool, 
set the
+          <codeph>REQUEST_POOL</codeph> query option to the name of the
+          <codeph>&lt;onlyCoordinators&gt;</codeph> request pool. 
<b>Caution</b> even though
+          these request pools do not contain executors, they can still run any 
query.
+          Thus, while the <codeph>REQUEST_POOL</codeph> query option is set to 
an only
+          coordinators request pool, queries have the potential to run the 
coordinators
+          out of resources.
+        </p>
+
+        <p>
+          Caution: care must be taken when naming the 
<codeph>&lt;onlyCoordinators&gt;</codeph>
+          request pool. If the name has the same prefix as a named executor 
group set, then
+          queries may be automatically routed to the request pool.  For 
example, if the
+          coordinator is configured with
+          <codeph>--expected_executor_group_sets=prefix1:10</codeph>, then an 
only coordinators
+          request pool named <codeph>prefix1-onlycoords<codeph> will 
potentially have
+          queries routed to it.
+        </p>
+
 <codeblock>&lt;allocations>
 
     &lt;queue name="root">
@@ -189,6 +219,11 @@ 
impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph
             &lt;maxResources>1000000 mb, 0 vcores&lt;/maxResources>
             &lt;aclSubmitApps> ops,admin&lt;/aclSubmitApps>
         &lt;/queue>
+        &lt;queue name="coords">
+            &lt;maxResources>1000000 mb, 0 vcores&lt;/maxResources>
+            &lt;aclSubmitApps>ops,admin&lt;/aclSubmitApps>
+            &lt;onlyCoordinators>true&lt;/onlyCoordinators>
+        &lt;/queue>
     &lt;/queue>
     &lt;queuePlacementPolicy>
         &lt;rule name="specified" create="false"/>
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 4f5146dfc..e677580d3 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -230,6 +230,8 @@ import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.RequestPoolService;
 import org.apache.impala.util.TResultRowBuilder;
 import org.apache.impala.util.TSessionStateUtil;
+import static org.apache.impala.yarn.server.resourcemanager.scheduler.fair.
+    AllocationFileLoaderService.ROOT_POOL_NAME;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduTransaction;
@@ -2196,8 +2198,28 @@ public class Frontend {
     Preconditions.checkState(
         !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty());
 
-    List<TExecutorGroupSet> originalExecutorGroupSets =
-        ExecutorMembershipSnapshot.getAllExecutorGroupSets();
+    boolean coordOnlyRequestPool = false;
+
+    if (clientSetRequestPool && RequestPoolService.getInstance() != null) {
+      final String pool_name = StringUtils.prependIfMissing(
+          queryOptions.getRequest_pool(), ROOT_POOL_NAME + ".");
+
+      coordOnlyRequestPool =
+          
RequestPoolService.getInstance().getPoolConfig(pool_name).only_coordinators;
+    }
+
+    List<TExecutorGroupSet> originalExecutorGroupSets;
+    if (coordOnlyRequestPool) {
+      // The query is set to use an only coordinators request pool which means 
that no
+      // executors will be involved in query execution. Thus, the planner must 
ignore
+      // all executor groups and select the default group instead. The backend 
will ensure
+      // the query is scheduled on the coordinators.
+      originalExecutorGroupSets = new ArrayList<>(1);
+      TExecutorGroupSet all_coords = new TExecutorGroupSet();
+      originalExecutorGroupSets.add(all_coords);
+    } else {
+      originalExecutorGroupSets = 
ExecutorMembershipSnapshot.getAllExecutorGroupSets();
+    }
 
     LOG.info("The original executor group sets from executor membership 
snapshot: "
         + originalExecutorGroupSets);
@@ -2314,6 +2336,9 @@ public class Frontend {
       } else if (!Frontend.canStmtBeAutoScaled(req)) {
         reason = "query is not auto-scalable";
         notScalable = true;
+      } else if (coordOnlyRequestPool) {
+        reason = "only coordinators request pool specified";
+        notScalable = true;
       }
 
       if (notScalable) {
diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java 
b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
index 291c14243..9d4fc2ac2 100644
--- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
+++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java
@@ -378,6 +378,7 @@ public class RequestPoolService {
       result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT);
       result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT);
       result.setDefault_query_options("");
+      result.setOnly_coordinators(false);
     } else {
       // Capture the current conf_ in case it changes while we're using it.
       Configuration currentConf = conf_;
@@ -403,6 +404,7 @@ public class RequestPoolService {
           getPoolConfigValue(currentConf, pool, 
MAX_QUERY_CPU_CORE_PER_NODE_LIMIT, 0L));
       result.setMax_query_cpu_core_coordinator_limit(getPoolConfigValue(
           currentConf, pool, MAX_QUERY_CPU_CORE_COORDINATOR_LIMIT, 0L));
+      
result.setOnly_coordinators(allocationConf_.get().isOnlyCoordinators(pool));
     }
     if (LOG.isTraceEnabled()) {
       LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, 
max_requests={},"
diff --git 
a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java 
b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
index ca748a3d0..f31a9c108 100644
--- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
+++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java
@@ -356,7 +356,7 @@ public class TestRequestPoolService {
         10000L, "mem_limit=1024m,query_timeout_s=10");
     checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m");
     checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 
30000L,
-        "mem_limit=1024m", 1000, 10, false, 8, 8, null, null);
+        "mem_limit=1024m", 1000, 10, false, 8, 8, null, null, true);
 
     Map<String, Integer> queueDUserQueryLimits = new HashMap<>();
     queueDUserQueryLimits.put("userA", 2);
@@ -377,7 +377,8 @@ public class TestRequestPoolService {
     createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY);
     Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA"));
     checkPoolAcls("root.userA", asList("userA", "userB", "userZ"), EMPTY_LIST);
-    checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, 
null, null);
+    checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, 
null, null,
+        false);
   }
 
   @Ignore("IMPALA-4868") @Test
@@ -684,7 +685,7 @@ public class TestRequestPoolService {
       String expectedQueryOptions, long max_query_mem_limit, long 
min_query_mem_limit,
       boolean clamp_mem_limit_query_option, long 
max_query_cpu_core_per_node_limit,
       long max_query_cpu_core_coordinator_limit, Map<String, Integer> 
userQueryLimits,
-      Map<String, Integer> groupQueryLimits) {
+      Map<String, Integer> groupQueryLimits, boolean onlyCoordinators) {
     TPoolConfig expectedResult = new TPoolConfig();
     expectedResult.setMax_requests(expectedMaxRequests);
     expectedResult.setMax_queued(expectedMaxQueued);
@@ -706,6 +707,9 @@ public class TestRequestPoolService {
         userQueryLimits != null ? userQueryLimits : Collections.emptyMap());
     expectedResult.setGroup_query_limits(
         groupQueryLimits != null ? groupQueryLimits : Collections.emptyMap());
+
+    expectedResult.setOnly_coordinators(onlyCoordinators);
+
     TPoolConfig poolConfig = poolService_.getPoolConfig(pool);
     Assert.assertEquals(
         "Unexpected config values for pool " + pool, expectedResult, 
poolConfig);
@@ -725,7 +729,7 @@ public class TestRequestPoolService {
       Map<String, Integer> groupQueryLimits) {
     checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued, 
expectedMaxMem,
         expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0, 0,
-        userQueryLimits, groupQueryLimits);
+        userQueryLimits, groupQueryLimits, false);
   }
 
   private void checkPoolConfigResult(String pool, long expectedMaxRequests,
diff --git a/fe/src/test/resources/fair-scheduler-onlycoords.xml 
b/fe/src/test/resources/fair-scheduler-onlycoords.xml
new file mode 100644
index 000000000..087d6b770
--- /dev/null
+++ b/fe/src/test/resources/fair-scheduler-onlycoords.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<allocations>
+    <queue name="group-set-small">
+      <weight>1.0</weight>
+      <schedulingPolicy>fair</schedulingPolicy>
+      <!-- Set a huge amount of memory to enable memory based admission 
control -->
+      <maxResources>50000000 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="group-set-large">
+      <weight>1.0</weight>
+      <schedulingPolicy>fair</schedulingPolicy>
+      <!-- Set a huge amount of memory to enable memory based admission 
control -->
+      <maxResources>50000000 mb, 0 vcores</maxResources>
+    </queue>
+    <queue name="onlycoords">
+        <maxResources>3000 mb, 0 vcores</maxResources>
+        <aclSubmitApps>* </aclSubmitApps>
+        <onlyCoordinators>true</onlyCoordinators>
+    </queue>
+    <queuePlacementPolicy>
+        <rule name="specified" create="false"/>
+        <rule name="reject" />
+    </queuePlacementPolicy>
+</allocations>
\ No newline at end of file
diff --git a/fe/src/test/resources/fair-scheduler-test.xml 
b/fe/src/test/resources/fair-scheduler-test.xml
index 9753c3f56..fe8f1eb9f 100644
--- a/fe/src/test/resources/fair-scheduler-test.xml
+++ b/fe/src/test/resources/fair-scheduler-test.xml
@@ -23,9 +23,11 @@
     <queue name="queueC">
       <aclSubmitApps>* </aclSubmitApps>
       <maxResources>1024 mb, 0 vcores</maxResources>
+      <onlyCoordinators>true</onlyCoordinators>
     </queue>
     <queue name="queueD">
       <aclSubmitApps>userA,userB </aclSubmitApps>
+      <onlyCoordinators>false</onlyCoordinators>
       <userQueryLimit>
         <user>*</user>
         <totalCount>3</totalCount>
diff --git a/fe/src/test/resources/llama-site-onlycoords.xml 
b/fe/src/test/resources/llama-site-onlycoords.xml
new file mode 100644
index 000000000..a8b20b427
--- /dev/null
+++ b/fe/src/test/resources/llama-site-onlycoords.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<configuration>
+  <property>
+    
<name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-small</name>
+    <value>12</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.max-query-mem-limit.root.group-set-small</name>
+    <value>53687091200</value>
+  </property>
+  <property>
+    <name>impala.admission-control.max-mt-dop.root.group-set-small</name>
+    <value>16</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.min-query-mem-limit.root.group-set-small</name>
+    <value>2147483648</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.pool-queue-timeout-ms.root.group-set-small</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    
<name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-large</name>
+    <value>12</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.max-query-mem-limit.root.group-set-large</name>
+    <value>53687091200</value>
+  </property>
+  <property>
+    <name>impala.admission-control.max-mt-dop.root.group-set-large</name>
+    <value>16</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.min-query-mem-limit.root.group-set-large</name>
+    <value>2147483648</value>
+  </property>
+  <property>
+    
<name>impala.admission-control.pool-queue-timeout-ms.root.group-set-large</name>
+    <value>600000</value>
+  </property>
+
+  <property>
+    <name>llama.am.throttling.maximum.placed.reservations.onlycoords</name>
+    <value>1</value>
+  </property>
+  <property>
+    <name>llama.am.throttling.maximum.queued.reservations.onlycoords</name>
+    <value>5</value>
+  </property>
+  <property>
+    <name>impala.admission-control.pool-queue-timeout-ms.onlycoords</name>
+    <value>30000</value>
+  </property>
+  <property>
+    <name>impala.admission-control.max-query-mem-limit.onlycoords</name>
+    <value>1610612736</value><!--1.5GB-->
+  </property>
+  <property>
+    <name>impala.admission-control.min-query-mem-limit.onlycoords</name>
+    <value>52428800</value><!--50MB-->
+  </property>
+</configuration>
diff --git 
a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
 
b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index 067b1ab9d..519fa661d 100644
--- 
a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ 
b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -56,6 +56,9 @@ public class AllocationConfiguration {
   private final Map<String, Map<String, Integer>> userQueryLimits;
   private final Map<String, Map<String, Integer>> groupQueryLimits;
 
+  // Specifies if each queue contains all nodes or only coordinators.
+  private final Map<String, Boolean> onlyCoordinators;
+
   // Policy for mapping apps to queues
   @VisibleForTesting
   QueuePlacementPolicy placementPolicy;
@@ -80,6 +83,7 @@ public class AllocationConfiguration {
       Map<String, Map<QueueACL, AccessControlList>> queueAcls,
       Map<String, Map<String, Integer>> userQueryLimits,
       Map<String, Map<String, Integer>> groupQueryLimits,
+      Map<String, Boolean> onlyCoordinators,
       QueuePlacementPolicy placementPolicy,
       Map<FSQueueType, Set<String>> configuredQueues,
       Set<String> nonPreemptableQueues) {
@@ -89,6 +93,7 @@ public class AllocationConfiguration {
     this.queueAcls = queueAcls;
     this.userQueryLimits = userQueryLimits;
     this.groupQueryLimits = groupQueryLimits;
+    this.onlyCoordinators = onlyCoordinators;
     this.placementPolicy = placementPolicy;
     this.configuredQueues = configuredQueues;
   }
@@ -100,6 +105,7 @@ public class AllocationConfiguration {
     queueAcls = new HashMap<>();
     userQueryLimits = new HashMap<>();
     groupQueryLimits = new HashMap<>();
+    onlyCoordinators = new HashMap<>();
     configuredQueues = new HashMap<>();
     for (FSQueueType queueType : FSQueueType.values()) {
       configuredQueues.put(queueType, new HashSet<String>());
@@ -202,4 +208,8 @@ public class AllocationConfiguration {
     Map<String, Integer> limits = groupQueryLimits.get(queueName);
     return limits != null ? limits : Collections.emptyMap();
   }
+
+  public boolean isOnlyCoordinators(String queueName) {
+    return onlyCoordinators.getOrDefault(queueName, 
Boolean.FALSE).booleanValue();
+  }
 }
\ No newline at end of file
diff --git 
a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
 
b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
index cd1bb30eb..2e1d411b7 100644
--- 
a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
+++ 
b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
@@ -59,10 +59,10 @@ import com.google.common.annotations.VisibleForTesting;
 @Public
 @Unstable
 public class AllocationFileLoaderService extends AbstractService {
-  
+
   public static final Log LOG = LogFactory.getLog(
       AllocationFileLoaderService.class.getName());
-  
+
   /** Time to wait between checks of the allocation file */
   public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
 
@@ -74,26 +74,28 @@ public class AllocationFileLoaderService extends 
AbstractService {
 
   public static final long THREAD_JOIN_TIMEOUT_MS = 1000;
 
+  public static final String ROOT_POOL_NAME = "root";
+
   private final Clock clock;
 
   private long lastSuccessfulReload; // Last time we successfully reloaded 
queues
   private boolean lastReloadAttemptFailed = false;
-  
-  // Path to XML file containing allocations. 
+
+  // Path to XML file containing allocations.
   private File allocFile;
-  
+
   private Listener reloadListener;
-  
+
   @VisibleForTesting
   long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
-  
+
   private Thread reloadThread;
   private volatile boolean running = true;
-  
+
   public AllocationFileLoaderService() {
     this(new SystemClock());
   }
-  
+
   public AllocationFileLoaderService(Clock clock) {
     super(AllocationFileLoaderService.class.getName());
     this.clock = clock;
@@ -288,6 +290,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
     Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
     Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>();
     Map<String, Map<String, Integer>> groupQueryLimits = new HashMap<>();
+    Map<String, Boolean> onlyCoordinators = new HashMap<>();
     Set<String> nonPreemptableQueues = new HashSet<>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
@@ -404,8 +407,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
     // Load queue elements.  A root queue can either be included or omitted.  
If
     // it's included, all other queues must be inside it.
     for (Element element : queueElements) {
-      String parent = "root";
-      if (element.getAttribute("name").equalsIgnoreCase("root")) {
+      String parent = ROOT_POOL_NAME;
+      if (element.getAttribute("name").equalsIgnoreCase(ROOT_POOL_NAME)) {
         if (queueElements.size() > 1) {
           throw new AllocationConfigurationException("If configuring root 
queue,"
               + " no other queues can be placed alongside it.");
@@ -416,7 +419,8 @@ public class AllocationFileLoaderService extends 
AbstractService {
           maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
           queueWeights, queuePolicies, minSharePreemptionTimeouts,
           fairSharePreemptionTimeouts, fairSharePreemptionThresholds, 
queueAcls,
-          userQueryLimits, groupQueryLimits, configuredQueues, 
nonPreemptableQueues);
+          userQueryLimits, groupQueryLimits, configuredQueues, 
onlyCoordinators,
+          nonPreemptableQueues);
     }
 
     // Load placement policy and pass it configured queues
@@ -430,18 +434,18 @@ public class AllocationFileLoaderService extends 
AbstractService {
     }
 
     // Set the min/fair share preemption timeout for the root queue
-    if (!minSharePreemptionTimeouts.containsKey("root")){
-      minSharePreemptionTimeouts.put("root",
+    if (!minSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)){
+      minSharePreemptionTimeouts.put(ROOT_POOL_NAME,
           defaultMinSharePreemptionTimeout);
     }
-    if (!fairSharePreemptionTimeouts.containsKey("root")) {
-      fairSharePreemptionTimeouts.put("root",
+    if (!fairSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)) {
+      fairSharePreemptionTimeouts.put(ROOT_POOL_NAME,
           defaultFairSharePreemptionTimeout);
     }
 
     // Set the fair share preemption threshold for the root queue
-    if (!fairSharePreemptionThresholds.containsKey("root")) {
-      fairSharePreemptionThresholds.put("root",
+    if (!fairSharePreemptionThresholds.containsKey(ROOT_POOL_NAME)) {
+      fairSharePreemptionThresholds.put(ROOT_POOL_NAME,
           defaultFairSharePreemptionThreshold);
     }
 
@@ -451,7 +455,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
         queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
         defaultSchedPolicy, minSharePreemptionTimeouts, 
fairSharePreemptionTimeouts,
         fairSharePreemptionThresholds, queueAcls, userQueryLimits, 
groupQueryLimits,
-        newPlacementPolicy, configuredQueues, nonPreemptableQueues);
+        onlyCoordinators, newPlacementPolicy, configuredQueues, 
nonPreemptableQueues);
     lastSuccessfulReload = clock.getTime();
     lastReloadAttemptFailed = false;
 
@@ -481,6 +485,7 @@ public class AllocationFileLoaderService extends 
AbstractService {
       Map<String, Map<String, Integer>> userQueryLimits,
       Map<String, Map<String, Integer>> groupQueryLimits,
       Map<FSQueueType, Set<String>> configuredQueues,
+      Map<String, Boolean> onlyCoordinators,
       Set<String> nonPreemptableQueues)
       throws AllocationConfigurationException {
     String queueName = 
CharMatcher.whitespace().trimFrom(element.getAttribute("name"));
@@ -576,9 +581,13 @@ public class AllocationFileLoaderService extends 
AbstractService {
             maxChildQueueResources, queueMaxApps, userMaxApps, 
queueMaxAMShares,
             queueWeights, queuePolicies, minSharePreemptionTimeouts,
             fairSharePreemptionTimeouts, fairSharePreemptionThresholds, 
queueAcls,
-            userQueryLimits, groupQueryLimits, configuredQueues, 
nonPreemptableQueues);
+            userQueryLimits, groupQueryLimits, configuredQueues, 
onlyCoordinators,
+            nonPreemptableQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
+      } else if ("onlyCoordinators".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        onlyCoordinators.put(queueName, Boolean.parseBoolean(text));
       }
     }
     if (isLeaf) {
@@ -611,14 +620,13 @@ public class AllocationFileLoaderService extends 
AbstractService {
         allocationConfiguration.getConfiguredQueues();
     Set<String> parentQueues = configuredQueues.get(FSQueueType.PARENT);
     Set<String> leafQueues = configuredQueues.get(FSQueueType.LEAF);
-    String root = "root";
-    if (parentQueues.size() == 1 && parentQueues.contains(root)) {
+    if (parentQueues.size() == 1 && parentQueues.contains(ROOT_POOL_NAME)) {
       Map<String, Integer> rootUserQueryLimits =
-          allocationConfiguration.getUserQueryLimits(root);
+          allocationConfiguration.getUserQueryLimits(ROOT_POOL_NAME);
       Map<String, Integer> rootGroupQueryLimits =
-          allocationConfiguration.getGroupQueryLimits(root);
+          allocationConfiguration.getGroupQueryLimits(ROOT_POOL_NAME);
       for (String leafQueue : leafQueues) {
-        if (leafQueue.startsWith(root)) {
+        if (leafQueue.startsWith(ROOT_POOL_NAME)) {
           Map<String, Integer> groupQueryLimits =
               allocationConfiguration.getGroupQueryLimits(leafQueue);
           Map<String, Integer> userQueryLimits =
diff --git a/tests/common/custom_cluster_test_suite.py 
b/tests/common/custom_cluster_test_suite.py
index c7fd3774e..ae01adfb6 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -84,6 +84,7 @@ DISABLE_LOG_BUFFERING = 'disable_log_buffering'
 # If True, resolves the actual files for all the log symlinks and outputs the 
resolved
 # paths to stderr.
 LOG_SYMLINKS = 'log_symlinks'
+WORKLOAD_MGMT = 'workload_mgmt'
 
 # Args that accept additional formatting to supply temporary dir path.
 ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR])
@@ -154,7 +155,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       num_exclusive_coordinators=None, kudu_args=None, 
statestored_timeout_s=None,
       impalad_timeout_s=None, expect_cores=None, reset_ranger=False,
       impalad_graceful_shutdown=False, tmp_dir_placeholders=[],
-      expect_startup_fail=False, disable_log_buffering=False, 
log_symlinks=False):
+      expect_startup_fail=False, disable_log_buffering=False, 
log_symlinks=False,
+      workload_mgmt=False):
     """Records arguments to be passed to a cluster by adding them to the 
decorated
     method's func_dict"""
     args = dict()
@@ -198,6 +200,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       args[DISABLE_LOG_BUFFERING] = True
     if log_symlinks:
       args[LOG_SYMLINKS] = True
+    if workload_mgmt:
+      args[WORKLOAD_MGMT] = True
 
     def merge_args(args_first, args_last):
       result = args_first.copy()
@@ -328,6 +332,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if IMPALAD_TIMEOUT_S in args:
       kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S]
 
+    if args.get(WORKLOAD_MGMT, False):
+      if IMPALAD_ARGS or CATALOGD_ARGS in args:
+        kwargs[WORKLOAD_MGMT] = True
+
     if args.get(EXPECT_CORES, False):
       # Make a note of any core files that already exist
       possible_cores = find_all_files('*core*')
@@ -399,25 +407,33 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if not self.SHARED_CLUSTER_ARGS:
       self.cluster_teardown(method.__name__, method.__dict__)
 
-  def wait_for_wm_init_complete(self, timeout_s=120):
+  def wait_for_wm_init_complete(self, timeout_s=60):
     """
     Waits for the catalog to report the workload management initialization 
process has
-    completed and for the catalog updates to be received by the coordinators.
+    completed and for the catalog updates to be received by the coordinators. 
The input
+    timeout_s is used as the timeout for three separate function calls. Thus, 
the
+    theoretical max amount of time this function could wait is (timeout_s * 3).
     """
-    self.assert_catalogd_log_contains("INFO", r'Completed workload management '
-        r'initialization', timeout_s=timeout_s)
-
-    catalog_log = self.assert_catalogd_log_contains("INFO", r'A catalog update 
with \d+ '
-        r'entries is assembled. Catalog version: (\d+)', timeout_s=10, 
expected_count=-1)
-
-    def assert_func():
-      coord_log = self.assert_impalad_log_contains("INFO", r'Catalog topic 
update '
-          r'applied with version: (\d+)', timeout_s=5, expected_count=-1)
-      return int(coord_log.group(1)) >= int(catalog_log.group(1))
-
-    assert retry(func=assert_func, max_attempts=10, sleep_time_s=3, 
backoff=1), \
-        "Expected a catalog topic update with version '{}' or later, but no 
such " \
-        "update was found.".format(catalog_log.group(1))
+    catalog_log = self.assert_log_contains_multiline("catalogd", "INFO", 
r'Completed '
+        r'workload management initialization.*?A catalog update with \d+ 
entries is '
+        r'assembled\. Catalog version: (\d+)', timeout_s)
+
+    # Assert each coordinator has received a catalog update that was assembled 
after
+    # workload management completed.
+    for idx, _ in enumerate(self.cluster.get_all_coordinators()):
+      node_name = "impalad"
+      if idx > 0:
+        node_name += "_node" + str(idx)
+
+      def assert_func():
+        coord_log = self.assert_log_contains(node_name, "INFO", r'Catalog 
topic update '
+            r'applied with version: (\d+)', timeout_s=timeout_s, 
expected_count=-1)
+        return int(coord_log.group(1)) >= int(catalog_log.group(1))
+
+      max_attempts = timeout_s / 3
+      assert retry(func=assert_func, max_attempts=max_attempts, sleep_time_s=3,
+          backoff=1), "Expected a catalog topic update with version '{}' or 
later, but " \
+          "no such update was found.".format(catalog_log.group(1))
 
   @classmethod
   def _stop_impala_cluster(cls):
@@ -514,7 +530,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             impalad_timeout_s=60,
                             ignore_pid_on_log_rotation=False,
                             wait_for_backends=True,
-                            log_symlinks=False):
+                            log_symlinks=False,
+                            workload_mgmt=False):
     cls.impala_log_dir = impala_log_dir
     # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests 
specifically test that
     # certain custom startup arguments work and we want to keep them 
independent of dev
@@ -544,6 +561,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       cmd.append("--impalad_args=--use_local_catalog=1")
       cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
 
+    if workload_mgmt:
+      cmd.append("--impalad_args=--enable_workload_mgmt=true")
+      cmd.append("--catalogd_args=--enable_workload_mgmt=true")
+
     default_query_option_kvs = []
     # Put any defaults first, then any arguments after that so they can 
override defaults.
     if default_query_options is not None:
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index 4f83cf909..f08c9fcff 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -180,6 +180,14 @@ class ImpalaCluster(object):
     LOG.info("Cluster: " + str(self.impalads))
     return choice([impalad for impalad in self.impalads if impalad != 
other_impalad])
 
+  def get_all_coordinators(self):
+    """Returns a list of all impalads where is_coordinator returns True. If no
+       coordinators are found, returns an empty list. The returned list is 
sorted by krpc
+       port ascending."""
+
+    return sorted([imp for imp in self.impalads if imp.is_coordinator()],
+    key=lambda x: x.service.krpc_port)
+
   def num_responsive_coordinators(self):
     """Find the number of impalad coordinators that can evaluate a test 
query."""
     n = 0
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index e865a696e..d08803e14 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -1459,6 +1459,37 @@ class ImpalaTestSuite(BaseTestSuite):
     return self.assert_log_contains(
         daemon, level, line_regex, expected_count, timeout_s, dry_run)
 
+  def assert_log_contains_multiline(self, daemon, level, line_regex, 
timeout_s=6):
+    """
+    Asserts the the daemon log with specified level (e.g. ERROR, WARNING, 
INFO) contains
+    at least one match of the provided regular expression. The difference with 
this
+    function is the regular expression is compiled with the DOTALL flag which 
causes the
+    dot operator to also match newlines. Thus, the provided line_regex can 
match over
+    multiple lines.
+
+    Returns the result of the regular expression search() function or fails an 
assertion
+    if the regular expression is not matched in the given timeframe.
+    """
+    if (self._warn_assert_log):
+      LOG.warning(
+          "{} calls assert_log_contains() with timeout_s={}. Make sure that 
glog "
+          "buffering has been disabled (--logbuflevel=-1), or "
+          "CustomClusterTestSuite.with_args is set with 
disable_log_buffering=True, "
+          "or timeout_s is sufficient.".format(self.__class__.__name__, 
timeout_s))
+
+    pattern = re.compile(line_regex, re.DOTALL)
+
+    for i in range(0, timeout_s):
+      log_file_path = self.__build_log_path(daemon, level)
+      with open(log_file_path) as log:
+        ret = pattern.search(log.read())
+        if ret is not None:
+          return ret
+      time.sleep(1)
+
+    assert False, "did not find any logfile " \
+        "contents matching the regex '{}'".format(line_regex)
+
   def assert_log_contains(self, daemon, level, line_regex, expected_count=1, 
timeout_s=6,
       dry_run=False):
     """
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 83c6b09ed..0b07749f3 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -143,9 +143,17 @@ INITIAL_QUEUE_REASON_REGEX = \
 # The path to resources directory which contains the admission control config 
files.
 RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", 
"resources")
 
+# SQL statement that selects all records for the active queries table.
+ACTIVE_SQL = "select * from sys.impala_query_live"
+
 
 def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file,
                                         additional_args="", make_copy=False):
+  """Generates impalad startup flags configuring the fair scheduler and llama 
site path
+     options and setting logging for admission control to VLOG_ROW.
+
+     The specified fair scheduler and llama site files are copied first, and 
the copies
+     are used as the value for the relevant startup flags."""
   fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file)
   llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file)
   if make_copy:
@@ -1774,6 +1782,232 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       for executor_mem_admitted in mem_admitted['executor']:
         assert executor_mem_admitted == 0
 
+  def __assert_systables_query(self, profile, expected_coords=None,
+      expected_frag_counts=None):
+    """Asserts the per-host fragment instances are correct in the provided 
profile."""
+
+    if expected_coords is None:
+      expected_coords = self.cluster.get_all_coordinators()
+
+    populate_frag_count = False
+    if expected_frag_counts is None:
+      populate_frag_count = True
+      expected_frag_counts = []
+
+    expected = []
+    for i, val in enumerate(expected_coords):
+      if populate_frag_count:
+        if i == 0:
+          expected_frag_counts.append(2)
+        else:
+          expected_frag_counts.append(1)
+
+      expected.append("{0}:{1}({2})".format(val.service.hostname, 
val.service.krpc_port,
+          expected_frag_counts[i]))
+
+    # Assert the correct request pool was used.
+    req_pool = re.search(r'\n\s+Request Pool:\s+(.*?)\n', profile)
+    assert req_pool, "Did not find request pool in query profile"
+    assert req_pool.group(1) == "root.onlycoords"
+
+    # Assert the fragment instances only ran on the coordinators.
+    perhost_frags = re.search(r'\n\s+Per Host Number of Fragment 
Instances:\s+(.*?)\n',
+        profile)
+    assert perhost_frags
+    sorted_hosts = " ".join(sorted(perhost_frags.group(1).split(" ")))
+    assert sorted_hosts
+    assert sorted_hosts == " ".join(expected)
+
+    # Assert the frontend selected the first executor group.
+    expected_verdict = "Assign to first group because only coordinators 
request pool " \
+        "specified"
+    fe_verdict = re.search(r'\n\s+Executor group 1:\n\s+Verdict: (.*?)\n', 
profile)
+    assert fe_verdict, "No frontend executor group verdict found."
+    assert fe_verdict.group(1) == expected_verdict, "Incorrect verdict found"
+
+  def __run_assert_systables_query(self, vector, expected_coords=None,
+        expected_frag_counts=None, query=ACTIVE_SQL):
+    """Runs a query using an only coordinators request pool and asserts the 
per-host
+        fragment instances are correct. This function can only be called from 
tests that
+        configured the cluster to use 'fair-scheduler-onlycoords.xml' and
+        'llama-site-onlycoords.xml'."""
+
+    vector.set_exec_option('request_pool', 'onlycoords')
+    result = self.execute_query_using_vector(query, vector)
+    assert result.success
+
+    self.__assert_systables_query(result.runtime_profile, expected_coords,
+      expected_frag_counts)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, 
cluster_size=5,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_happy_path(self, vector):
+    """Asserts queries set to use an only coordinators request pool run all 
the fragment
+       instances on all coordinators and no executors even if the query 
includes
+       non-system tables."""
+    self.wait_for_wm_init_complete()
+
+    # Execute a query that only selects from a system table using a request 
pool that is
+    # only coordinators.
+    self.__run_assert_systables_query(vector)
+
+    # Execute a query that joins a non-system table with a system table using 
a request
+    # pool that is only coordinators. All fragment instances will run on the 
coordinators
+    # without running any on the executors.
+    self.__run_assert_systables_query(
+        vector=vector,
+        expected_frag_counts=[4, 2, 2,],
+        query="select a.test_name, b.db_user from functional.jointbl a inner 
join "
+              "sys.impala_query_live b on a.test_name = b.db_name"),
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, 
cluster_size=3,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_no_executors(self, vector):
+    """Asserts queries that only select from the active queries table run even 
if no
+       executors are running."""
+    self.wait_for_wm_init_complete()
+    self.__run_assert_systables_query(vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, 
cluster_size=5,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_one_quiescing_coord(self, vector):
+    """Asserts quiescing coordinators do not run fragment instances for 
queries that only
+       select from the active queries table."""
+    self.wait_for_wm_init_complete()
+
+    # Quiesce the second coordinator.
+    all_coords = self.cluster.get_all_coordinators()
+    coord_to_quiesce = all_coords[1]
+    self.execute_query_expect_success(self.client, ": 
shutdown('{}:{}')".format(
+        coord_to_quiesce.service.hostname, coord_to_quiesce.service.krpc_port))
+
+    # Ensure only two coordinators process a system tables query.
+    self.__run_assert_systables_query(
+        vector=vector,
+        expected_coords=[all_coords[0], all_coords[2]])
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, 
cluster_size=5,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_one_coord_terminate(self, vector):
+    """Asserts a force terminated coordinator is eventually removed from the 
list of
+       active coordinators."""
+    self.wait_for_wm_init_complete()
+
+    # Abruptly end the third coordinator.
+    all_coords = self.cluster.get_all_coordinators()
+    coord_to_term = all_coords[2]
+    coord_to_term.kill()
+
+    vector.set_exec_option('request_pool', 'onlycoords')
+    client = self.default_impala_client(vector.get_value('protocol'))
+
+    done_waiting = False
+    iterations = 0
+    while not done_waiting and iterations < 20:
+      try:
+        result = self.execute_query_using_client(client, ACTIVE_SQL, vector)
+        assert result.success
+        done_waiting = True
+      except Exception as e:
+        # Since the coordinator was not gracefully shut down, it never had a 
change to
+        # send a quiescing message. Thus, the statestore will take some time 
to detect
+        # that coordinator is gone. During that time, queries again system 
tables will
+        # fail as the now terminated coordinator will still be sent rpcs.
+        if re.search(r"Exec\(\) rpc failed: Network error: "
+            r"Client connection negotiation failed: client connection to 
.*?:{}: "
+            r"connect: Connection 
refused".format(coord_to_term.service.krpc_port),
+            str(e)):
+          # Expected error, coordinator down not yet detected.
+          iterations += 1
+          sleep(3)
+        else:
+          raise e
+
+    assert done_waiting
+    self.__assert_systables_query(result.runtime_profile, [all_coords[0], 
all_coords[1]])
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, 
cluster_size=5,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_add_coord(self, vector):
+    self.wait_for_wm_init_complete()
+
+    # Add a coordinator to the cluster.
+    cluster_size = len(self.cluster.impalads)
+    self._start_impala_cluster(
+        options=[
+            "--impalad_args=s{}".format(impalad_admission_ctrl_config_args(
+                fs_allocation_file="fair-scheduler-onlycoords.xml",
+                llama_site_file="llama-site-onlycoords.xml"))],
+        add_impalads=True,
+        cluster_size=6,
+        num_coordinators=1,
+        use_exclusive_coordinators=True,
+        wait_for_backends=False,
+        workload_mgmt=True)
+
+    self.assert_log_contains("impalad_node" + str(cluster_size), "INFO",
+        "join Impala Service pool")
+
+    # Assert the new coordinator ran a fragment instance.
+    self.__run_assert_systables_query(vector)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, 
cluster_size=1,
+      workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args(
+        fs_allocation_file="fair-scheduler-onlycoords.xml",
+        llama_site_file="llama-site-onlycoords.xml",
+        
additional_args="--expected_executor_group_sets=root.group-set-small:1,"
+                        "root.group-set-large:2 "
+                        "--num_expected_executors=2 
--executor_groups=coordinator"),
+        statestored_args=_STATESTORED_ARGS)
+  def test_coord_only_pool_exec_groups(self, vector):
+    """Asserts queries using only coordinators request pools can run 
successfully when
+       executor groups are configured."""
+    self.wait_for_wm_init_complete()
+
+    # Assert queries can be run when no executors are started.
+    self.__run_assert_systables_query(vector)
+
+    # Add a single executor for the small executor group set.
+    self._start_impala_cluster(
+        options=[
+            
"--impalad_args=--executor_groups=root.group-set-small-group-000:1"],
+        add_executors=True,
+        cluster_size=1,
+        wait_for_backends=False)
+    self.cluster.statestored.service.wait_for_live_subscribers(3, timeout=30)
+    self.__run_assert_systables_query(vector)
+
+    # Add two executors for the large executor group set.
+    self._start_impala_cluster(
+        options=[
+            
"--impalad_args=--executor_groups=root.group-set-small-group-000:2"],
+        add_executors=True,
+        cluster_size=2,
+        wait_for_backends=False)
+    self.cluster.statestored.service.wait_for_live_subscribers(5, timeout=30)
+    self.__run_assert_systables_query(vector)
+
 
 class TestAdmissionControllerWithACService(TestAdmissionController):
   """Runs all of the tests from TestAdmissionController but with the second 
impalad in the

Reply via email to