This is an automated email from the ASF dual-hosted git repository. prozsa pushed a commit to branch branch-4.5.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit b47830268949e227b1663158988f291a9b4ed277 Author: jasonmfehr <jf...@cloudera.com> AuthorDate: Tue Jan 14 15:24:48 2025 -0800 IMPALA-13201: System Table Queries Execute When Admission Queues are Full Queries that run only against in-memory system tables are currently subject to the same admission control process as all other queries. Since these queries do not use any resources on executors, admission control does not need to consider the state of executors when deciding to admit these queries. This change adds a boolean configuration option 'onlyCoordinators' to the fair-scheduler.xml file for specifying a request pool only applies to the coordinators. When a query is submitted to a coordinator only request pool, then no executors are required to be running. Instead, all fragment instances are executed exclusively on the coordinators. A new member was added to the ClusterMembershipMgr::Snapshot struct to hold the ExecutorGroup of all coordinators. This object is kept up to date by processing statestore messages and is used when executing queries that either require the coordinators (such as queries against sys.impala_query_live) or that use an only coordinators request pool. Testing was accomplished by: 1. Adding cluster membership manager ctests to assert cluster membership manager correctly builds the list of non-quiescing coordinators. 2. RequestPoolService JUnit tests to assert the new optional <onlyCoords> config in the fair scheduler xml file is correctly parsed. 3. ExecutorGroup ctests modified to assert the new function. 4. Custom cluster admission controller tests to assert queries with a coordinator only request pool only run on the active coordinators. Change-Id: I5e0e64db92bdbf80f8b5bd85d001ffe4c8c9ffda Reviewed-on: http://gerrit.cloudera.org:8080/22249 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- be/src/scheduling/admission-controller.cc | 12 +- be/src/scheduling/cluster-membership-mgr-test.cc | 52 ++++- be/src/scheduling/cluster-membership-mgr.cc | 88 +++++--- be/src/scheduling/cluster-membership-mgr.h | 8 +- be/src/scheduling/cluster-membership-test-util.cc | 13 ++ be/src/scheduling/cluster-membership-test-util.h | 5 + be/src/scheduling/executor-group-test.cc | 32 ++- be/src/scheduling/executor-group.cc | 13 ++ be/src/scheduling/executor-group.h | 6 + be/src/service/client-request-state.cc | 2 +- be/src/util/webserver.cc | 4 +- bin/start-impala-cluster.py | 4 +- common/thrift/ImpalaInternalService.thrift | 6 + docs/topics/impala_admission_config.xml | 43 +++- .../java/org/apache/impala/service/Frontend.java | 29 ++- .../org/apache/impala/util/RequestPoolService.java | 2 + .../apache/impala/util/TestRequestPoolService.java | 12 +- .../test/resources/fair-scheduler-onlycoords.xml | 25 +++ fe/src/test/resources/fair-scheduler-test.xml | 2 + fe/src/test/resources/llama-site-onlycoords.xml | 66 ++++++ .../scheduler/fair/AllocationConfiguration.java | 10 + .../fair/AllocationFileLoaderService.java | 58 ++--- tests/common/custom_cluster_test_suite.py | 57 +++-- tests/common/impala_cluster.py | 8 + tests/common/impala_test_suite.py | 31 +++ tests/custom_cluster/test_admission_controller.py | 234 +++++++++++++++++++++ 26 files changed, 719 insertions(+), 103 deletions(-) diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 4a75ef382..c25ebbfdc 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -2250,8 +2250,14 @@ Status AdmissionController::ComputeGroupScheduleStates( } const BackendDescriptorPB& coord_desc = it->second; - vector<const ExecutorGroup*> executor_groups = - GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request); + vector<const ExecutorGroup*> executor_groups; + + if (UNLIKELY(queue_node->pool_cfg.only_coordinators)) { + executor_groups = {&membership_snapshot->all_coordinators}; + } else { + executor_groups = + GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request); + } if (executor_groups.empty()) { queue_node->not_admitted_reason = REASON_NO_EXECUTOR_GROUPS; @@ -2261,7 +2267,7 @@ Status AdmissionController::ComputeGroupScheduleStates( // Collect all coordinators if needed for the request. ExecutorGroup coords = request.request.include_all_coordinators ? - membership_snapshot->GetCoordinators() : ExecutorGroup("all-coordinators"); + membership_snapshot->all_coordinators : ExecutorGroup("all-coordinators"); // We loop over the executor groups in a deterministic order. If // --balance_queries_across_executor_groups set to true, executor groups with more diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc index 625c9e8f0..f67d508d3 100644 --- a/be/src/scheduling/cluster-membership-mgr-test.cc +++ b/be/src/scheduling/cluster-membership-mgr-test.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include <memory> #include <deque> +#include <memory> +#include <sstream> +#include <vector> #include "common/logging.h" #include "common/names.h" @@ -272,9 +274,38 @@ class ClusterMembershipMgrTest : public testing::Test { } }; +/// Asserts a provided list of expected hostnames are in the all_coordinators +/// ExecutorGroup of the provided ClusterMembershipMgr. +void _assertCoords( + ClusterMembershipMgr& cmm, const std::vector<const char*> expected_hostnames) { + ExecutorGroup::Executors actual_coords = + cmm.GetSnapshot()->all_coordinators.GetAllExecutorDescriptors(); + + std::ostringstream actual_hostnames; + for (const auto& actual : actual_coords) { + actual_hostnames << " " << actual.address().hostname(); + } + + ASSERT_EQ(expected_hostnames.size(), actual_coords.size()) + << "Actual hostnames:" << actual_hostnames.str(); + + for (auto expected : expected_hostnames) { + bool found = false; + for (const auto& actual : actual_coords) { + if (actual.address().hostname() == expected) { + found = true; + break; + } + } + + EXPECT_TRUE(found) << "did not find expected coordinator '" << expected + << "' in actual coordinators:" << actual_hostnames.str(); + } +} + /// This test takes two instances of the ClusterMembershipMgr through a common lifecycle. /// It also serves as an example for how to craft statestore messages and pass them to -/// UpdaUpdateMembership(). +/// UpdateMembership(). TEST_F(ClusterMembershipMgrTest, TwoInstances) { auto b1 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(1)); auto b2 = make_shared<BackendDescriptorPB>(MakeBackendDescriptor(2)); @@ -306,6 +337,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { // First manager now has one BE ASSERT_EQ(1, cmm1.GetSnapshot()->current_backends.size()); + _assertCoords(cmm1, {"host_1"}); + _assertCoords(cmm2, {}); // Hook up second callback and iterate with the result of the first manager cmm2.SetLocalBeDescFn([b2]() { return b2; }); @@ -314,6 +347,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas); ASSERT_EQ(1, returned_topic_deltas.size()); ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size()); + _assertCoords(cmm1, {"host_1"}); + _assertCoords(cmm2, {"host_1", "host_2"}); // Send the returned update to the first manager, this time no deltas will be returned *ss_topic_delta = returned_topic_deltas[0]; @@ -321,6 +356,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { cmm1.UpdateMembership(topic_delta_map, &returned_topic_deltas); ASSERT_EQ(0, returned_topic_deltas.size()); ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size()); + _assertCoords(cmm1, {"host_1", "host_2"}); + _assertCoords(cmm2, {"host_1", "host_2"}); // Both managers now have the same state. Shutdown one of them and step through // propagating the update. @@ -334,6 +371,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { // It will also remove itself from the executor group (but not the current backends). ASSERT_EQ(1, GetDefaultGroupSize(cmm1)); ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size()); + _assertCoords(cmm1, {"host_1", "host_2"}); + _assertCoords(cmm2, {"host_1", "host_2"}); // Propagate the quiescing to the 2nd mgr *ss_topic_delta = returned_topic_deltas[0]; @@ -343,6 +382,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { ASSERT_EQ(0, returned_topic_deltas.size()); ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size()); ASSERT_EQ(1, GetDefaultGroupSize(cmm2)); + _assertCoords(cmm1, {"host_1", "host_2"}); + _assertCoords(cmm2, {"host_2"}); // Delete the 1st backend from the 2nd one ASSERT_EQ(1, ss_topic_delta->topic_entries.size()); @@ -351,6 +392,8 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) { ASSERT_EQ(0, returned_topic_deltas.size()); ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size()); ASSERT_EQ(1, GetDefaultGroupSize(cmm2)); + _assertCoords(cmm1, {"host_1", "host_2"}); + _assertCoords(cmm2, {"host_2"}); } TEST_F(ClusterMembershipMgrTest, IsBlacklisted) { @@ -447,6 +490,11 @@ TEST_F(ClusterMembershipMgrTest, ExecutorBlacklist) { DeleteBackend(backends_[2].get()); EXPECT_EQ(NUM_BACKENDS - 1, backends_[0]->cmm->GetSnapshot()->current_backends.size()); EXPECT_EQ(NUM_BACKENDS - 2, GetDefaultGroupSize(*backends_[0]->cmm)); + + // Assert blacklisting executors does not impact the all_coordinators ExecutorGroup. + // host_1 was quiesced and host_2 was deleted, both actions do impact all_coordinators. + _assertCoords(*backends_[0]->cmm, {"host_0"}); + _assertCoords(*backends_[1]->cmm, {"host_0", "host_1"}); } // This test runs a group of 20 backends through their full lifecycle, validating that diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index e48ea98c3..ab8bf9a61 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -170,21 +170,11 @@ ClusterMembershipMgr::SnapshotPtr ClusterMembershipMgr::GetSnapshot() const { return state; } -static bool is_active_coordinator(const BackendDescriptorPB& be) { +static inline bool is_active_coordinator(const BackendDescriptorPB& be) { return be.has_is_coordinator() && be.is_coordinator() && !(be.has_is_quiescing() && be.is_quiescing()); } -ExecutorGroup ClusterMembershipMgr::Snapshot::GetCoordinators() const { - ExecutorGroup coordinators("all-coordinators"); - for (const auto& it : current_backends) { - if (is_active_coordinator(it.second)) { - coordinators.AddExecutor(it.second); - } - } - return coordinators; -} - vector<TNetworkAddress> ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses() const { vector<TNetworkAddress> coordinators; for (const auto& it : current_backends) { @@ -197,6 +187,19 @@ vector<TNetworkAddress> ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses( return coordinators; } +static inline void _removeCoordIfExists( + const std::shared_ptr<ClusterMembershipMgr::Snapshot>& state, + const BackendDescriptorPB& be) { + + // The BackendDescriptorPB may be incomplete. Use the backend id to retrieve the actual + // backend descriptor so the backend can be removed. + const BackendDescriptorPB* actual_be = + state->all_coordinators.LookUpBackendDesc(be.backend_id()); + if (actual_be != nullptr) { + state->all_coordinators.RemoveExecutor(*actual_be); + } +} + void ClusterMembershipMgr::UpdateMembership( const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, vector<TTopicDelta>* subscriber_topic_updates) { @@ -298,6 +301,11 @@ void ClusterMembershipMgr::UpdateMembership( } } new_backend_map->erase(item.key); + + // If a coordinator is not shutdown gracefully, then it will be deleted here. + if (be_desc.is_coordinator()) { + _removeCoordIfExists(new_state, be_desc); + } } continue; } @@ -346,32 +354,45 @@ void ClusterMembershipMgr::UpdateMembership( if (existing.is_quiescing()) DCHECK(be_desc.is_quiescing()); // If the node starts quiescing - if (be_desc.is_quiescing() && !existing.is_quiescing() && existing.is_executor()) { - // If the backend starts quiescing and it is present in the blacklist, remove it - // from the blacklist. If the backend is present in the blacklist, there is no - // need to remove it from the executor group because it has already been removed - bool blacklisted = new_blacklist->FindAndRemove(be_desc) - == ExecutorBlacklist::State::BLACKLISTED; - if (blacklisted) { - VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)"; - DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups)); - } else { - // Executor needs to be removed from its groups - for (const auto& group : be_desc.executor_groups()) { - VLOG(1) << "Removing backend " << item.key << " from group " - << group.DebugString() << " (quiescing)"; - RemoveExecutorAndGroup(be_desc, group, new_executor_groups); + if (be_desc.is_quiescing() && !existing.is_quiescing()) { + if (existing.is_executor()) { + // If the backend starts quiescing and it is present in the blacklist, remove it + // from the blacklist. If the backend is present in the blacklist, there is no + // need to remove it from the executor group because it has already been removed + bool blacklisted = new_blacklist->FindAndRemove(be_desc) + == ExecutorBlacklist::State::BLACKLISTED; + if (blacklisted) { + VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)"; + DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups)); + } else { + // Executor needs to be removed from its groups + for (const auto& group : be_desc.executor_groups()) { + VLOG(1) << "Removing backend " << item.key << " from group " + << group.DebugString() << " (quiescing)"; + RemoveExecutorAndGroup(be_desc, group, new_executor_groups); + } } } + + if (existing.is_coordinator()) { + _removeCoordIfExists(new_state, be_desc); + } } existing = be_desc; } else { // Create new_backend_map->insert(make_pair(item.key, be_desc)); - if (!be_desc.is_quiescing() && be_desc.is_executor()) { - for (const auto& group : be_desc.executor_groups()) { - VLOG(1) << "Adding backend " << item.key << " to group " << group.DebugString(); - FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc); + if (!be_desc.is_quiescing()) { + if (be_desc.is_executor()) { + for (const auto& group : be_desc.executor_groups()) { + VLOG(1) << "Adding backend " << item.key << " to group " << + group.DebugString(); + FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc); + } + } + + if (is_active_coordinator(be_desc)) { + new_state->all_coordinators.AddExecutor(be_desc); } } // Since this backend is new, it cannot already be on the blacklist or probation. @@ -415,6 +436,13 @@ void ClusterMembershipMgr::UpdateMembership( } } } + + // Add ourself to the list of all coordinators. + if (is_active_coordinator(*local_be_desc.get())) { + _removeCoordIfExists(new_state, *local_be_desc); + new_state->all_coordinators.AddExecutor(*local_be_desc); + } + AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates); DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist)); } diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h index 37e42eced..848067b47 100644 --- a/be/src/scheduling/cluster-membership-mgr.h +++ b/be/src/scheduling/cluster-membership-mgr.h @@ -86,10 +86,8 @@ class ClusterMembershipMgr { // Clients can obtain an immutable copy. Class instances can be created through the // implicitly-defined default and copy constructors. struct Snapshot { - Snapshot() = default; + Snapshot() : all_coordinators("all-coordinators") {}; Snapshot(const Snapshot&) = default; - /// Returns an executor group of all non-quiescing coordinators in the cluster. - ExecutorGroup GetCoordinators() const; /// Returns the addresses of all non-quiescing coordinators in the cluster. std::vector<TNetworkAddress> GetCoordinatorAddresses() const; /// The current backend descriptor of the local backend. @@ -111,6 +109,10 @@ class ClusterMembershipMgr { /// The version of this Snapshot. It is incremented every time the cluster membership /// changes. int64_t version = 0; + + // Executor group of all non-quiescing coordinators in the cluster. Set during the + // SetState() function. + ExecutorGroup all_coordinators; }; /// An immutable shared membership snapshot. diff --git a/be/src/scheduling/cluster-membership-test-util.cc b/be/src/scheduling/cluster-membership-test-util.cc index b34d03301..f3b04bb9b 100644 --- a/be/src/scheduling/cluster-membership-test-util.cc +++ b/be/src/scheduling/cluster-membership-test-util.cc @@ -81,5 +81,18 @@ BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset, return MakeBackendDescriptor(idx, group_desc, port_offset, admit_mem_limit); } +void AssertLookupById(const BackendDescriptorPB& exec1, const BackendDescriptorPB& exec2, + const ExecutorGroup& group) { + const BackendDescriptorPB* actual_exec1 = group.LookUpBackendDesc(exec1.backend_id()); + ASSERT_NE(nullptr, actual_exec1); + ASSERT_EQ(exec1.address().hostname(), actual_exec1->address().hostname()); + ASSERT_EQ(exec1.address().port(), actual_exec1->address().port()); + + const BackendDescriptorPB* actual_exec2 = group.LookUpBackendDesc(exec2.backend_id()); + ASSERT_NE(nullptr, actual_exec2); + ASSERT_EQ(exec2.address().hostname(), actual_exec2->address().hostname()); + ASSERT_EQ(exec2.address().port(), actual_exec2->address().port()); +} + } // end namespace test } // end namespace impala diff --git a/be/src/scheduling/cluster-membership-test-util.h b/be/src/scheduling/cluster-membership-test-util.h index 63a016bda..a8dfdfe19 100644 --- a/be/src/scheduling/cluster-membership-test-util.h +++ b/be/src/scheduling/cluster-membership-test-util.h @@ -55,5 +55,10 @@ BackendDescriptorPB MakeBackendDescriptor( BackendDescriptorPB MakeBackendDescriptor(int idx, int port_offset = 0, int64_t admit_mem_limit = 4L * MEGABYTE); +/// Assert the LookupBackendDesc() function returns correct results when passed a +/// backend id. +void AssertLookupById(const BackendDescriptorPB& exec1, const BackendDescriptorPB& exec2, + const ExecutorGroup& group); + } // end namespace test } // end namespace impala diff --git a/be/src/scheduling/executor-group-test.cc b/be/src/scheduling/executor-group-test.cc index e2f926c48..9a6f9cadc 100644 --- a/be/src/scheduling/executor-group-test.cc +++ b/be/src/scheduling/executor-group-test.cc @@ -31,12 +31,17 @@ using namespace impala::test; TEST(ExecutorGroupTest, AddExecutors) { ExecutorGroup group1("group1"); ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission()); + int64_t mem_limit_admission1 = 100L * MEGABYTE; - group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0, - mem_limit_admission1)); + BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* port_offset=*/0, + mem_limit_admission1); + group1.AddExecutor(exec1); + int64_t mem_limit_admission2 = 120L * MEGABYTE; - group1.AddExecutor(MakeBackendDescriptor(2, group1, /* port_offset=*/0, - mem_limit_admission2)); + BackendDescriptorPB exec2 = MakeBackendDescriptor(2, group1, /* port_offset=*/0, + mem_limit_admission2); + group1.AddExecutor(exec2); + ASSERT_EQ(mem_limit_admission1, group1.GetPerExecutorMemLimitForAdmission()); ASSERT_EQ(2, group1.NumExecutors()); IpAddr backend_ip; @@ -44,23 +49,32 @@ TEST(ExecutorGroupTest, AddExecutors) { EXPECT_EQ("10.0.0.1", backend_ip); ASSERT_TRUE(group1.LookUpExecutorIp("host_2", &backend_ip)); EXPECT_EQ("10.0.0.2", backend_ip); + + AssertLookupById(exec1, exec2, group1); } /// Test adding multiple backends on the same host. TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) { ExecutorGroup group1("group1"); + int64_t mem_limit_admission1 = 120L * MEGABYTE; - group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0, - mem_limit_admission1)); + const BackendDescriptorPB exec1 = MakeBackendDescriptor(1, group1, /* port_offset=*/0, + mem_limit_admission1); + group1.AddExecutor(exec1); + int64_t mem_limit_admission2 = 100L * MEGABYTE; - group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1, - mem_limit_admission2)); + const BackendDescriptorPB exec2 = MakeBackendDescriptor(1, group1, /* port_offset=*/1, + mem_limit_admission2); + group1.AddExecutor(exec2); + ASSERT_EQ(mem_limit_admission2, group1.GetPerExecutorMemLimitForAdmission()); IpAddr backend_ip; ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip)); EXPECT_EQ("10.0.0.1", backend_ip); const ExecutorGroup::Executors& backend_list = group1.GetExecutorsForHost("10.0.0.1"); EXPECT_EQ(2, backend_list.size()); + + AssertLookupById(exec1, exec2, group1); } /// Test removing a backend. @@ -112,6 +126,8 @@ TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) { EXPECT_EQ(1, backend_list.size()); group1.RemoveExecutor(executor1); ASSERT_EQ(0, group1.GetPerExecutorMemLimitForAdmission()); + + ASSERT_EQ(nullptr, group1.LookUpBackendDesc(executor2.backend_id())); } /// Test that exercises the size-based group health check. diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc index b99eeed86..d50ea1344 100644 --- a/be/src/scheduling/executor-group.cc +++ b/be/src/scheduling/executor-group.cc @@ -196,6 +196,19 @@ const BackendDescriptorPB* ExecutorGroup::LookUpBackendDesc( return nullptr; } +const BackendDescriptorPB* ExecutorGroup::LookUpBackendDesc( + const UniqueIdPB& be_id) const { + for (const auto& executor_list : executor_map_) { + for (const auto& backend : executor_list.second){ + if (backend.backend_id().hi() == be_id.hi() + && backend.backend_id().lo() == be_id.lo()) { + return &backend; + } + } + } + return nullptr; +} + int ExecutorGroup::NumExecutors() const { int count = 0; for (const auto& executor_list : executor_map_) count += executor_list.second.size(); diff --git a/be/src/scheduling/executor-group.h b/be/src/scheduling/executor-group.h index 1e390e020..f3fde1d19 100644 --- a/be/src/scheduling/executor-group.h +++ b/be/src/scheduling/executor-group.h @@ -96,6 +96,12 @@ class ExecutorGroup { /// change while it holds the pointer. const BackendDescriptorPB* LookUpBackendDesc(const NetworkAddressPB& host) const; + /// Looks up the backend descriptor for the executor with an id matching the provided + /// executor id. Returns nullptr if no executor is found. The returned descriptor should + /// not be retained beyond the lifetime of this ExecutorGroup and the caller must make + /// sure that the group does not change while it holds the pointer. + const BackendDescriptorPB* LookUpBackendDesc(const UniqueIdPB& be_id) const; + /// Returns the hash ring associated with this executor group. It's owned by the group /// and the caller must not hold a reference beyond the groups lifetime. const HashRing* GetHashRing() const { return &executor_ip_hash_ring_; } diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 20cc7eca7..22bc65b40 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -2504,7 +2504,7 @@ Status ClientRequestState::TryKillQueryRemotely( // coordinator in the cluster, it will be the status to return. Status status = Status::Expected(TErrorCode::INVALID_QUERY_HANDLE, PrintId(query_id)); ExecutorGroup all_coordinators = - ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->GetCoordinators(); + ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot()->all_coordinators; // Skipping the current impalad. unique_ptr<ExecutorGroup> other_coordinators{ExecutorGroup::GetFilteredExecutorGroup( &all_coordinators, {ExecEnv::GetInstance()->krpc_address()})}; diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index 8471cb545..6c9e4abf8 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -714,11 +714,11 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne } if (!authenticated) { if (use_jwt_) { - LOG(INFO) << "Invalid JWT token provided: " << bearer_token; + LOG(INFO) << "Invalid JWT token provided"; total_jwt_token_auth_failure_->Increment(1); } if (use_oauth_) { - LOG(INFO) << "Invalid OAuth token provided: " << bearer_token; + LOG(INFO) << "Invalid OAuth token provided"; total_oauth_token_auth_failure_->Increment(1); } } diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index cbab2c9bc..9f56eeeb2 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -1147,7 +1147,7 @@ if __name__ == "__main__": use_exclusive_coordinators, existing_cluster_size) expected_cluster_size += existing_cluster_size elif options.add_impalads: - cluster_ops.start_impalads(options.cluster_size, options.num_coordinators, + cluster_ops.start_impalads(options.num_coordinators, options.num_coordinators, options.use_exclusive_coordinators, existing_cluster_size) expected_cluster_size += existing_cluster_size @@ -1171,7 +1171,9 @@ if __name__ == "__main__": if options.add_impalads: # TODO: This is a hack to make the waiting logic work. We'd better add a dedicated # option for adding a new cluster using the existing catalogd and statestore. + # https://issues.apache.org/jira/browse/IMPALA-13755 expected_num_ready_impalads = options.cluster_size + expected_cluster_size = options.cluster_size impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads) except Exception as e: LOG.exception("Error starting cluster") diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 77941eb7a..c01e537a4 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -232,6 +232,12 @@ struct TPoolConfig { // If a rule for the user is not present in user_query_limits, then these rules // are evaluated, if the user is a member of a group. 13: required map<string, i32> group_query_limits + + // Specifies the state of the onlyCoordinators configuration element for a request pool. + // When request pools are configured, they can be specified as being only coordinators + // which means that pool only considers resources from the coordinator nodes and queries + // for that pool schedule fragment instances on coordinator nodes only. + 14: optional bool only_coordinators } struct TParseDateStringResult { diff --git a/docs/topics/impala_admission_config.xml b/docs/topics/impala_admission_config.xml index a1b179c13..c3c686d85 100644 --- a/docs/topics/impala_admission_config.xml +++ b/docs/topics/impala_admission_config.xml @@ -144,10 +144,10 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph <p> Here are sample <filepath>fair-scheduler.xml</filepath> and <filepath>llama-site.xml</filepath> files that define resource pools - <codeph>root.default</codeph>, <codeph>root.development</codeph>, and - <codeph>root.production</codeph>. These files define resource pools for Impala - admission control and are separate from the similar - <codeph>fair-scheduler.xml</codeph>that defines resource pools for YARN. + <codeph>root.default</codeph>, <codeph>root.development</codeph>, + <codeph>root.production</codeph>, and <codeph>root.coords</codeph>. + These files define resource pools for Impala admission control and are separate + from the similar <codeph>fair-scheduler.xml</codeph>that defines resource pools for YARN. </p> <p> @@ -173,6 +173,36 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph to those pools. </p> + <p> + The <codeph><onlyCoordinators></codeph> element is a boolean that defaults + to <codeph>false</codeph>. If this value is set to <codeph>true</codeph>, the + named request pool will contain only the coordinators and none of the executors. + The main purpose of this setting is to enable running queries against the + <codeph>sys.impala_query_live</codeph> table from workload management. Since the + data for this table is stored in the memory of the coordinators, the executors + do not need to be involved if the query only selects from this table. + </p> + + <p> + To use an <codeph><onlyCoordinators></codeph> request pool, set the + <codeph>REQUEST_POOL</codeph> query option to the name of the + <codeph><onlyCoordinators></codeph> request pool. <b>Caution</b> even though + these request pools do not contain executors, they can still run any query. + Thus, while the <codeph>REQUEST_POOL</codeph> query option is set to an only + coordinators request pool, queries have the potential to run the coordinators + out of resources. + </p> + + <p> + Caution: care must be taken when naming the <codeph><onlyCoordinators></codeph> + request pool. If the name has the same prefix as a named executor group set, then + queries may be automatically routed to the request pool. For example, if the + coordinator is configured with + <codeph>--expected_executor_group_sets=prefix1:10</codeph>, then an only coordinators + request pool named <codeph>prefix1-onlycoords<codeph> will potentially have + queries routed to it. + </p> + <codeblock><allocations> <queue name="root"> @@ -189,6 +219,11 @@ impala.admission-control.pool-queue-timeout-ms.<varname>queue_name</varname></ph <maxResources>1000000 mb, 0 vcores</maxResources> <aclSubmitApps> ops,admin</aclSubmitApps> </queue> + <queue name="coords"> + <maxResources>1000000 mb, 0 vcores</maxResources> + <aclSubmitApps>ops,admin</aclSubmitApps> + <onlyCoordinators>true</onlyCoordinators> + </queue> </queue> <queuePlacementPolicy> <rule name="specified" create="false"/> diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 4f5146dfc..e677580d3 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -230,6 +230,8 @@ import org.apache.impala.util.PatternMatcher; import org.apache.impala.util.RequestPoolService; import org.apache.impala.util.TResultRowBuilder; import org.apache.impala.util.TSessionStateUtil; +import static org.apache.impala.yarn.server.resourcemanager.scheduler.fair. + AllocationFileLoaderService.ROOT_POOL_NAME; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduTransaction; @@ -2196,8 +2198,28 @@ public class Frontend { Preconditions.checkState( !clientSetRequestPool || !queryOptions.getRequest_pool().isEmpty()); - List<TExecutorGroupSet> originalExecutorGroupSets = - ExecutorMembershipSnapshot.getAllExecutorGroupSets(); + boolean coordOnlyRequestPool = false; + + if (clientSetRequestPool && RequestPoolService.getInstance() != null) { + final String pool_name = StringUtils.prependIfMissing( + queryOptions.getRequest_pool(), ROOT_POOL_NAME + "."); + + coordOnlyRequestPool = + RequestPoolService.getInstance().getPoolConfig(pool_name).only_coordinators; + } + + List<TExecutorGroupSet> originalExecutorGroupSets; + if (coordOnlyRequestPool) { + // The query is set to use an only coordinators request pool which means that no + // executors will be involved in query execution. Thus, the planner must ignore + // all executor groups and select the default group instead. The backend will ensure + // the query is scheduled on the coordinators. + originalExecutorGroupSets = new ArrayList<>(1); + TExecutorGroupSet all_coords = new TExecutorGroupSet(); + originalExecutorGroupSets.add(all_coords); + } else { + originalExecutorGroupSets = ExecutorMembershipSnapshot.getAllExecutorGroupSets(); + } LOG.info("The original executor group sets from executor membership snapshot: " + originalExecutorGroupSets); @@ -2314,6 +2336,9 @@ public class Frontend { } else if (!Frontend.canStmtBeAutoScaled(req)) { reason = "query is not auto-scalable"; notScalable = true; + } else if (coordOnlyRequestPool) { + reason = "only coordinators request pool specified"; + notScalable = true; } if (notScalable) { diff --git a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java index 291c14243..9d4fc2ac2 100644 --- a/fe/src/main/java/org/apache/impala/util/RequestPoolService.java +++ b/fe/src/main/java/org/apache/impala/util/RequestPoolService.java @@ -378,6 +378,7 @@ public class RequestPoolService { result.setMax_requests(MAX_PLACED_RESERVATIONS_DEFAULT); result.setMax_queued(MAX_QUEUED_RESERVATIONS_DEFAULT); result.setDefault_query_options(""); + result.setOnly_coordinators(false); } else { // Capture the current conf_ in case it changes while we're using it. Configuration currentConf = conf_; @@ -403,6 +404,7 @@ public class RequestPoolService { getPoolConfigValue(currentConf, pool, MAX_QUERY_CPU_CORE_PER_NODE_LIMIT, 0L)); result.setMax_query_cpu_core_coordinator_limit(getPoolConfigValue( currentConf, pool, MAX_QUERY_CPU_CORE_COORDINATOR_LIMIT, 0L)); + result.setOnly_coordinators(allocationConf_.get().isOnlyCoordinators(pool)); } if (LOG.isTraceEnabled()) { LOG.debug("getPoolConfig(pool={}): max_mem_resources={}, max_requests={}," diff --git a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java index ca748a3d0..f31a9c108 100644 --- a/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java +++ b/fe/src/test/java/org/apache/impala/util/TestRequestPoolService.java @@ -356,7 +356,7 @@ public class TestRequestPoolService { 10000L, "mem_limit=1024m,query_timeout_s=10"); checkPoolConfigResult("root.queueB", 5, 10, -1, 30000L, "mem_limit=1024m"); checkPoolConfigResult("root.queueC", 5, 10, 1024 * ByteUnits.MEGABYTE, 30000L, - "mem_limit=1024m", 1000, 10, false, 8, 8, null, null); + "mem_limit=1024m", 1000, 10, false, 8, 8, null, null, true); Map<String, Integer> queueDUserQueryLimits = new HashMap<>(); queueDUserQueryLimits.put("userA", 2); @@ -377,7 +377,8 @@ public class TestRequestPoolService { createPoolService(ALLOCATION_FILE_EMPTY, LLAMA_CONFIG_FILE_EMPTY); Assert.assertEquals("root.userA", poolService_.assignToPool("", "userA")); checkPoolAcls("root.userA", asList("userA", "userB", "userZ"), EMPTY_LIST); - checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, null, null); + checkPoolConfigResult("root", -1, 200, -1, null, "", 0, 0, true, 0, 0, null, null, + false); } @Ignore("IMPALA-4868") @Test @@ -684,7 +685,7 @@ public class TestRequestPoolService { String expectedQueryOptions, long max_query_mem_limit, long min_query_mem_limit, boolean clamp_mem_limit_query_option, long max_query_cpu_core_per_node_limit, long max_query_cpu_core_coordinator_limit, Map<String, Integer> userQueryLimits, - Map<String, Integer> groupQueryLimits) { + Map<String, Integer> groupQueryLimits, boolean onlyCoordinators) { TPoolConfig expectedResult = new TPoolConfig(); expectedResult.setMax_requests(expectedMaxRequests); expectedResult.setMax_queued(expectedMaxQueued); @@ -706,6 +707,9 @@ public class TestRequestPoolService { userQueryLimits != null ? userQueryLimits : Collections.emptyMap()); expectedResult.setGroup_query_limits( groupQueryLimits != null ? groupQueryLimits : Collections.emptyMap()); + + expectedResult.setOnly_coordinators(onlyCoordinators); + TPoolConfig poolConfig = poolService_.getPoolConfig(pool); Assert.assertEquals( "Unexpected config values for pool " + pool, expectedResult, poolConfig); @@ -725,7 +729,7 @@ public class TestRequestPoolService { Map<String, Integer> groupQueryLimits) { checkPoolConfigResult(pool, expectedMaxRequests, expectedMaxQueued, expectedMaxMem, expectedQueueTimeoutMs, expectedQueryOptions, 0, 0, true, 0, 0, - userQueryLimits, groupQueryLimits); + userQueryLimits, groupQueryLimits, false); } private void checkPoolConfigResult(String pool, long expectedMaxRequests, diff --git a/fe/src/test/resources/fair-scheduler-onlycoords.xml b/fe/src/test/resources/fair-scheduler-onlycoords.xml new file mode 100644 index 000000000..087d6b770 --- /dev/null +++ b/fe/src/test/resources/fair-scheduler-onlycoords.xml @@ -0,0 +1,25 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<allocations> + <queue name="group-set-small"> + <weight>1.0</weight> + <schedulingPolicy>fair</schedulingPolicy> + <!-- Set a huge amount of memory to enable memory based admission control --> + <maxResources>50000000 mb, 0 vcores</maxResources> + </queue> + <queue name="group-set-large"> + <weight>1.0</weight> + <schedulingPolicy>fair</schedulingPolicy> + <!-- Set a huge amount of memory to enable memory based admission control --> + <maxResources>50000000 mb, 0 vcores</maxResources> + </queue> + <queue name="onlycoords"> + <maxResources>3000 mb, 0 vcores</maxResources> + <aclSubmitApps>* </aclSubmitApps> + <onlyCoordinators>true</onlyCoordinators> + </queue> + <queuePlacementPolicy> + <rule name="specified" create="false"/> + <rule name="reject" /> + </queuePlacementPolicy> +</allocations> \ No newline at end of file diff --git a/fe/src/test/resources/fair-scheduler-test.xml b/fe/src/test/resources/fair-scheduler-test.xml index 9753c3f56..fe8f1eb9f 100644 --- a/fe/src/test/resources/fair-scheduler-test.xml +++ b/fe/src/test/resources/fair-scheduler-test.xml @@ -23,9 +23,11 @@ <queue name="queueC"> <aclSubmitApps>* </aclSubmitApps> <maxResources>1024 mb, 0 vcores</maxResources> + <onlyCoordinators>true</onlyCoordinators> </queue> <queue name="queueD"> <aclSubmitApps>userA,userB </aclSubmitApps> + <onlyCoordinators>false</onlyCoordinators> <userQueryLimit> <user>*</user> <totalCount>3</totalCount> diff --git a/fe/src/test/resources/llama-site-onlycoords.xml b/fe/src/test/resources/llama-site-onlycoords.xml new file mode 100644 index 000000000..a8b20b427 --- /dev/null +++ b/fe/src/test/resources/llama-site-onlycoords.xml @@ -0,0 +1,66 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<configuration> + <property> + <name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-small</name> + <value>12</value> + </property> + <property> + <name>impala.admission-control.max-query-mem-limit.root.group-set-small</name> + <value>53687091200</value> + </property> + <property> + <name>impala.admission-control.max-mt-dop.root.group-set-small</name> + <value>16</value> + </property> + <property> + <name>impala.admission-control.min-query-mem-limit.root.group-set-small</name> + <value>2147483648</value> + </property> + <property> + <name>impala.admission-control.pool-queue-timeout-ms.root.group-set-small</name> + <value>600000</value> + </property> + + <property> + <name>impala.admission-control.max-query-cpu-core-per-node-limit.root.group-set-large</name> + <value>12</value> + </property> + <property> + <name>impala.admission-control.max-query-mem-limit.root.group-set-large</name> + <value>53687091200</value> + </property> + <property> + <name>impala.admission-control.max-mt-dop.root.group-set-large</name> + <value>16</value> + </property> + <property> + <name>impala.admission-control.min-query-mem-limit.root.group-set-large</name> + <value>2147483648</value> + </property> + <property> + <name>impala.admission-control.pool-queue-timeout-ms.root.group-set-large</name> + <value>600000</value> + </property> + + <property> + <name>llama.am.throttling.maximum.placed.reservations.onlycoords</name> + <value>1</value> + </property> + <property> + <name>llama.am.throttling.maximum.queued.reservations.onlycoords</name> + <value>5</value> + </property> + <property> + <name>impala.admission-control.pool-queue-timeout-ms.onlycoords</name> + <value>30000</value> + </property> + <property> + <name>impala.admission-control.max-query-mem-limit.onlycoords</name> + <value>1610612736</value><!--1.5GB--> + </property> + <property> + <name>impala.admission-control.min-query-mem-limit.onlycoords</name> + <value>52428800</value><!--50MB--> + </property> +</configuration> diff --git a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 067b1ab9d..519fa661d 100644 --- a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -56,6 +56,9 @@ public class AllocationConfiguration { private final Map<String, Map<String, Integer>> userQueryLimits; private final Map<String, Map<String, Integer>> groupQueryLimits; + // Specifies if each queue contains all nodes or only coordinators. + private final Map<String, Boolean> onlyCoordinators; + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; @@ -80,6 +83,7 @@ public class AllocationConfiguration { Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<String, Integer>> userQueryLimits, Map<String, Map<String, Integer>> groupQueryLimits, + Map<String, Boolean> onlyCoordinators, QueuePlacementPolicy placementPolicy, Map<FSQueueType, Set<String>> configuredQueues, Set<String> nonPreemptableQueues) { @@ -89,6 +93,7 @@ public class AllocationConfiguration { this.queueAcls = queueAcls; this.userQueryLimits = userQueryLimits; this.groupQueryLimits = groupQueryLimits; + this.onlyCoordinators = onlyCoordinators; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -100,6 +105,7 @@ public class AllocationConfiguration { queueAcls = new HashMap<>(); userQueryLimits = new HashMap<>(); groupQueryLimits = new HashMap<>(); + onlyCoordinators = new HashMap<>(); configuredQueues = new HashMap<>(); for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet<String>()); @@ -202,4 +208,8 @@ public class AllocationConfiguration { Map<String, Integer> limits = groupQueryLimits.get(queueName); return limits != null ? limits : Collections.emptyMap(); } + + public boolean isOnlyCoordinators(String queueName) { + return onlyCoordinators.getOrDefault(queueName, Boolean.FALSE).booleanValue(); + } } \ No newline at end of file diff --git a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index cd1bb30eb..2e1d411b7 100644 --- a/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -59,10 +59,10 @@ import com.google.common.annotations.VisibleForTesting; @Public @Unstable public class AllocationFileLoaderService extends AbstractService { - + public static final Log LOG = LogFactory.getLog( AllocationFileLoaderService.class.getName()); - + /** Time to wait between checks of the allocation file */ public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000; @@ -74,26 +74,28 @@ public class AllocationFileLoaderService extends AbstractService { public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + public static final String ROOT_POOL_NAME = "root"; + private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues private boolean lastReloadAttemptFailed = false; - - // Path to XML file containing allocations. + + // Path to XML file containing allocations. private File allocFile; - + private Listener reloadListener; - + @VisibleForTesting long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; - + private Thread reloadThread; private volatile boolean running = true; - + public AllocationFileLoaderService() { this(new SystemClock()); } - + public AllocationFileLoaderService(Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; @@ -288,6 +290,7 @@ public class AllocationFileLoaderService extends AbstractService { Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>(); Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>(); Map<String, Map<String, Integer>> groupQueryLimits = new HashMap<>(); + Map<String, Boolean> onlyCoordinators = new HashMap<>(); Set<String> nonPreemptableQueues = new HashSet<>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; @@ -404,8 +407,8 @@ public class AllocationFileLoaderService extends AbstractService { // Load queue elements. A root queue can either be included or omitted. If // it's included, all other queues must be inside it. for (Element element : queueElements) { - String parent = "root"; - if (element.getAttribute("name").equalsIgnoreCase("root")) { + String parent = ROOT_POOL_NAME; + if (element.getAttribute("name").equalsIgnoreCase(ROOT_POOL_NAME)) { if (queueElements.size() > 1) { throw new AllocationConfigurationException("If configuring root queue," + " no other queues can be placed alongside it."); @@ -416,7 +419,8 @@ public class AllocationFileLoaderService extends AbstractService { maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues); + userQueryLimits, groupQueryLimits, configuredQueues, onlyCoordinators, + nonPreemptableQueues); } // Load placement policy and pass it configured queues @@ -430,18 +434,18 @@ public class AllocationFileLoaderService extends AbstractService { } // Set the min/fair share preemption timeout for the root queue - if (!minSharePreemptionTimeouts.containsKey("root")){ - minSharePreemptionTimeouts.put("root", + if (!minSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)){ + minSharePreemptionTimeouts.put(ROOT_POOL_NAME, defaultMinSharePreemptionTimeout); } - if (!fairSharePreemptionTimeouts.containsKey("root")) { - fairSharePreemptionTimeouts.put("root", + if (!fairSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)) { + fairSharePreemptionTimeouts.put(ROOT_POOL_NAME, defaultFairSharePreemptionTimeout); } // Set the fair share preemption threshold for the root queue - if (!fairSharePreemptionThresholds.containsKey("root")) { - fairSharePreemptionThresholds.put("root", + if (!fairSharePreemptionThresholds.containsKey(ROOT_POOL_NAME)) { + fairSharePreemptionThresholds.put(ROOT_POOL_NAME, defaultFairSharePreemptionThreshold); } @@ -451,7 +455,7 @@ public class AllocationFileLoaderService extends AbstractService { queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, userQueryLimits, groupQueryLimits, - newPlacementPolicy, configuredQueues, nonPreemptableQueues); + onlyCoordinators, newPlacementPolicy, configuredQueues, nonPreemptableQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -481,6 +485,7 @@ public class AllocationFileLoaderService extends AbstractService { Map<String, Map<String, Integer>> userQueryLimits, Map<String, Map<String, Integer>> groupQueryLimits, Map<FSQueueType, Set<String>> configuredQueues, + Map<String, Boolean> onlyCoordinators, Set<String> nonPreemptableQueues) throws AllocationConfigurationException { String queueName = CharMatcher.whitespace().trimFrom(element.getAttribute("name")); @@ -576,9 +581,13 @@ public class AllocationFileLoaderService extends AbstractService { maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - userQueryLimits, groupQueryLimits, configuredQueues, nonPreemptableQueues); + userQueryLimits, groupQueryLimits, configuredQueues, onlyCoordinators, + nonPreemptableQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; + } else if ("onlyCoordinators".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + onlyCoordinators.put(queueName, Boolean.parseBoolean(text)); } } if (isLeaf) { @@ -611,14 +620,13 @@ public class AllocationFileLoaderService extends AbstractService { allocationConfiguration.getConfiguredQueues(); Set<String> parentQueues = configuredQueues.get(FSQueueType.PARENT); Set<String> leafQueues = configuredQueues.get(FSQueueType.LEAF); - String root = "root"; - if (parentQueues.size() == 1 && parentQueues.contains(root)) { + if (parentQueues.size() == 1 && parentQueues.contains(ROOT_POOL_NAME)) { Map<String, Integer> rootUserQueryLimits = - allocationConfiguration.getUserQueryLimits(root); + allocationConfiguration.getUserQueryLimits(ROOT_POOL_NAME); Map<String, Integer> rootGroupQueryLimits = - allocationConfiguration.getGroupQueryLimits(root); + allocationConfiguration.getGroupQueryLimits(ROOT_POOL_NAME); for (String leafQueue : leafQueues) { - if (leafQueue.startsWith(root)) { + if (leafQueue.startsWith(ROOT_POOL_NAME)) { Map<String, Integer> groupQueryLimits = allocationConfiguration.getGroupQueryLimits(leafQueue); Map<String, Integer> userQueryLimits = diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index c7fd3774e..ae01adfb6 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -84,6 +84,7 @@ DISABLE_LOG_BUFFERING = 'disable_log_buffering' # If True, resolves the actual files for all the log symlinks and outputs the resolved # paths to stderr. LOG_SYMLINKS = 'log_symlinks' +WORKLOAD_MGMT = 'workload_mgmt' # Args that accept additional formatting to supply temporary dir path. ACCEPT_FORMATTING = set([IMPALAD_ARGS, CATALOGD_ARGS, IMPALA_LOG_DIR]) @@ -154,7 +155,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): num_exclusive_coordinators=None, kudu_args=None, statestored_timeout_s=None, impalad_timeout_s=None, expect_cores=None, reset_ranger=False, impalad_graceful_shutdown=False, tmp_dir_placeholders=[], - expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False): + expect_startup_fail=False, disable_log_buffering=False, log_symlinks=False, + workload_mgmt=False): """Records arguments to be passed to a cluster by adding them to the decorated method's func_dict""" args = dict() @@ -198,6 +200,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): args[DISABLE_LOG_BUFFERING] = True if log_symlinks: args[LOG_SYMLINKS] = True + if workload_mgmt: + args[WORKLOAD_MGMT] = True def merge_args(args_first, args_last): result = args_first.copy() @@ -328,6 +332,10 @@ class CustomClusterTestSuite(ImpalaTestSuite): if IMPALAD_TIMEOUT_S in args: kwargs[IMPALAD_TIMEOUT_S] = args[IMPALAD_TIMEOUT_S] + if args.get(WORKLOAD_MGMT, False): + if IMPALAD_ARGS or CATALOGD_ARGS in args: + kwargs[WORKLOAD_MGMT] = True + if args.get(EXPECT_CORES, False): # Make a note of any core files that already exist possible_cores = find_all_files('*core*') @@ -399,25 +407,33 @@ class CustomClusterTestSuite(ImpalaTestSuite): if not self.SHARED_CLUSTER_ARGS: self.cluster_teardown(method.__name__, method.__dict__) - def wait_for_wm_init_complete(self, timeout_s=120): + def wait_for_wm_init_complete(self, timeout_s=60): """ Waits for the catalog to report the workload management initialization process has - completed and for the catalog updates to be received by the coordinators. + completed and for the catalog updates to be received by the coordinators. The input + timeout_s is used as the timeout for three separate function calls. Thus, the + theoretical max amount of time this function could wait is (timeout_s * 3). """ - self.assert_catalogd_log_contains("INFO", r'Completed workload management ' - r'initialization', timeout_s=timeout_s) - - catalog_log = self.assert_catalogd_log_contains("INFO", r'A catalog update with \d+ ' - r'entries is assembled. Catalog version: (\d+)', timeout_s=10, expected_count=-1) - - def assert_func(): - coord_log = self.assert_impalad_log_contains("INFO", r'Catalog topic update ' - r'applied with version: (\d+)', timeout_s=5, expected_count=-1) - return int(coord_log.group(1)) >= int(catalog_log.group(1)) - - assert retry(func=assert_func, max_attempts=10, sleep_time_s=3, backoff=1), \ - "Expected a catalog topic update with version '{}' or later, but no such " \ - "update was found.".format(catalog_log.group(1)) + catalog_log = self.assert_log_contains_multiline("catalogd", "INFO", r'Completed ' + r'workload management initialization.*?A catalog update with \d+ entries is ' + r'assembled\. Catalog version: (\d+)', timeout_s) + + # Assert each coordinator has received a catalog update that was assembled after + # workload management completed. + for idx, _ in enumerate(self.cluster.get_all_coordinators()): + node_name = "impalad" + if idx > 0: + node_name += "_node" + str(idx) + + def assert_func(): + coord_log = self.assert_log_contains(node_name, "INFO", r'Catalog topic update ' + r'applied with version: (\d+)', timeout_s=timeout_s, expected_count=-1) + return int(coord_log.group(1)) >= int(catalog_log.group(1)) + + max_attempts = timeout_s / 3 + assert retry(func=assert_func, max_attempts=max_attempts, sleep_time_s=3, + backoff=1), "Expected a catalog topic update with version '{}' or later, but " \ + "no such update was found.".format(catalog_log.group(1)) @classmethod def _stop_impala_cluster(cls): @@ -514,7 +530,8 @@ class CustomClusterTestSuite(ImpalaTestSuite): impalad_timeout_s=60, ignore_pid_on_log_rotation=False, wait_for_backends=True, - log_symlinks=False): + log_symlinks=False, + workload_mgmt=False): cls.impala_log_dir = impala_log_dir # We ignore TEST_START_CLUSTER_ARGS here. Custom cluster tests specifically test that # certain custom startup arguments work and we want to keep them independent of dev @@ -544,6 +561,10 @@ class CustomClusterTestSuite(ImpalaTestSuite): cmd.append("--impalad_args=--use_local_catalog=1") cmd.append("--catalogd_args=--catalog_topic_mode=minimal") + if workload_mgmt: + cmd.append("--impalad_args=--enable_workload_mgmt=true") + cmd.append("--catalogd_args=--enable_workload_mgmt=true") + default_query_option_kvs = [] # Put any defaults first, then any arguments after that so they can override defaults. if default_query_options is not None: diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 4f83cf909..f08c9fcff 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -180,6 +180,14 @@ class ImpalaCluster(object): LOG.info("Cluster: " + str(self.impalads)) return choice([impalad for impalad in self.impalads if impalad != other_impalad]) + def get_all_coordinators(self): + """Returns a list of all impalads where is_coordinator returns True. If no + coordinators are found, returns an empty list. The returned list is sorted by krpc + port ascending.""" + + return sorted([imp for imp in self.impalads if imp.is_coordinator()], + key=lambda x: x.service.krpc_port) + def num_responsive_coordinators(self): """Find the number of impalad coordinators that can evaluate a test query.""" n = 0 diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index e865a696e..d08803e14 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1459,6 +1459,37 @@ class ImpalaTestSuite(BaseTestSuite): return self.assert_log_contains( daemon, level, line_regex, expected_count, timeout_s, dry_run) + def assert_log_contains_multiline(self, daemon, level, line_regex, timeout_s=6): + """ + Asserts the the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains + at least one match of the provided regular expression. The difference with this + function is the regular expression is compiled with the DOTALL flag which causes the + dot operator to also match newlines. Thus, the provided line_regex can match over + multiple lines. + + Returns the result of the regular expression search() function or fails an assertion + if the regular expression is not matched in the given timeframe. + """ + if (self._warn_assert_log): + LOG.warning( + "{} calls assert_log_contains() with timeout_s={}. Make sure that glog " + "buffering has been disabled (--logbuflevel=-1), or " + "CustomClusterTestSuite.with_args is set with disable_log_buffering=True, " + "or timeout_s is sufficient.".format(self.__class__.__name__, timeout_s)) + + pattern = re.compile(line_regex, re.DOTALL) + + for i in range(0, timeout_s): + log_file_path = self.__build_log_path(daemon, level) + with open(log_file_path) as log: + ret = pattern.search(log.read()) + if ret is not None: + return ret + time.sleep(1) + + assert False, "did not find any logfile " \ + "contents matching the regex '{}'".format(line_regex) + def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6, dry_run=False): """ diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 83c6b09ed..0b07749f3 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -143,9 +143,17 @@ INITIAL_QUEUE_REASON_REGEX = \ # The path to resources directory which contains the admission control config files. RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "resources") +# SQL statement that selects all records for the active queries table. +ACTIVE_SQL = "select * from sys.impala_query_live" + def impalad_admission_ctrl_config_args(fs_allocation_file, llama_site_file, additional_args="", make_copy=False): + """Generates impalad startup flags configuring the fair scheduler and llama site path + options and setting logging for admission control to VLOG_ROW. + + The specified fair scheduler and llama site files are copied first, and the copies + are used as the value for the relevant startup flags.""" fs_allocation_path = os.path.join(RESOURCES_DIR, fs_allocation_file) llama_site_path = os.path.join(RESOURCES_DIR, llama_site_file) if make_copy: @@ -1774,6 +1782,232 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): for executor_mem_admitted in mem_admitted['executor']: assert executor_mem_admitted == 0 + def __assert_systables_query(self, profile, expected_coords=None, + expected_frag_counts=None): + """Asserts the per-host fragment instances are correct in the provided profile.""" + + if expected_coords is None: + expected_coords = self.cluster.get_all_coordinators() + + populate_frag_count = False + if expected_frag_counts is None: + populate_frag_count = True + expected_frag_counts = [] + + expected = [] + for i, val in enumerate(expected_coords): + if populate_frag_count: + if i == 0: + expected_frag_counts.append(2) + else: + expected_frag_counts.append(1) + + expected.append("{0}:{1}({2})".format(val.service.hostname, val.service.krpc_port, + expected_frag_counts[i])) + + # Assert the correct request pool was used. + req_pool = re.search(r'\n\s+Request Pool:\s+(.*?)\n', profile) + assert req_pool, "Did not find request pool in query profile" + assert req_pool.group(1) == "root.onlycoords" + + # Assert the fragment instances only ran on the coordinators. + perhost_frags = re.search(r'\n\s+Per Host Number of Fragment Instances:\s+(.*?)\n', + profile) + assert perhost_frags + sorted_hosts = " ".join(sorted(perhost_frags.group(1).split(" "))) + assert sorted_hosts + assert sorted_hosts == " ".join(expected) + + # Assert the frontend selected the first executor group. + expected_verdict = "Assign to first group because only coordinators request pool " \ + "specified" + fe_verdict = re.search(r'\n\s+Executor group 1:\n\s+Verdict: (.*?)\n', profile) + assert fe_verdict, "No frontend executor group verdict found." + assert fe_verdict.group(1) == expected_verdict, "Incorrect verdict found" + + def __run_assert_systables_query(self, vector, expected_coords=None, + expected_frag_counts=None, query=ACTIVE_SQL): + """Runs a query using an only coordinators request pool and asserts the per-host + fragment instances are correct. This function can only be called from tests that + configured the cluster to use 'fair-scheduler-onlycoords.xml' and + 'llama-site-onlycoords.xml'.""" + + vector.set_exec_option('request_pool', 'onlycoords') + result = self.execute_query_using_vector(query, vector) + assert result.success + + self.__assert_systables_query(result.runtime_profile, expected_coords, + expected_frag_counts) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_happy_path(self, vector): + """Asserts queries set to use an only coordinators request pool run all the fragment + instances on all coordinators and no executors even if the query includes + non-system tables.""" + self.wait_for_wm_init_complete() + + # Execute a query that only selects from a system table using a request pool that is + # only coordinators. + self.__run_assert_systables_query(vector) + + # Execute a query that joins a non-system table with a system table using a request + # pool that is only coordinators. All fragment instances will run on the coordinators + # without running any on the executors. + self.__run_assert_systables_query( + vector=vector, + expected_frag_counts=[4, 2, 2,], + query="select a.test_name, b.db_user from functional.jointbl a inner join " + "sys.impala_query_live b on a.test_name = b.db_name"), + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=3, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_no_executors(self, vector): + """Asserts queries that only select from the active queries table run even if no + executors are running.""" + self.wait_for_wm_init_complete() + self.__run_assert_systables_query(vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_one_quiescing_coord(self, vector): + """Asserts quiescing coordinators do not run fragment instances for queries that only + select from the active queries table.""" + self.wait_for_wm_init_complete() + + # Quiesce the second coordinator. + all_coords = self.cluster.get_all_coordinators() + coord_to_quiesce = all_coords[1] + self.execute_query_expect_success(self.client, ": shutdown('{}:{}')".format( + coord_to_quiesce.service.hostname, coord_to_quiesce.service.krpc_port)) + + # Ensure only two coordinators process a system tables query. + self.__run_assert_systables_query( + vector=vector, + expected_coords=[all_coords[0], all_coords[2]]) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_one_coord_terminate(self, vector): + """Asserts a force terminated coordinator is eventually removed from the list of + active coordinators.""" + self.wait_for_wm_init_complete() + + # Abruptly end the third coordinator. + all_coords = self.cluster.get_all_coordinators() + coord_to_term = all_coords[2] + coord_to_term.kill() + + vector.set_exec_option('request_pool', 'onlycoords') + client = self.default_impala_client(vector.get_value('protocol')) + + done_waiting = False + iterations = 0 + while not done_waiting and iterations < 20: + try: + result = self.execute_query_using_client(client, ACTIVE_SQL, vector) + assert result.success + done_waiting = True + except Exception as e: + # Since the coordinator was not gracefully shut down, it never had a change to + # send a quiescing message. Thus, the statestore will take some time to detect + # that coordinator is gone. During that time, queries again system tables will + # fail as the now terminated coordinator will still be sent rpcs. + if re.search(r"Exec\(\) rpc failed: Network error: " + r"Client connection negotiation failed: client connection to .*?:{}: " + r"connect: Connection refused".format(coord_to_term.service.krpc_port), + str(e)): + # Expected error, coordinator down not yet detected. + iterations += 1 + sleep(3) + else: + raise e + + assert done_waiting + self.__assert_systables_query(result.runtime_profile, [all_coords[0], all_coords[1]]) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=3, cluster_size=5, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_add_coord(self, vector): + self.wait_for_wm_init_complete() + + # Add a coordinator to the cluster. + cluster_size = len(self.cluster.impalads) + self._start_impala_cluster( + options=[ + "--impalad_args=s{}".format(impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml"))], + add_impalads=True, + cluster_size=6, + num_coordinators=1, + use_exclusive_coordinators=True, + wait_for_backends=False, + workload_mgmt=True) + + self.assert_log_contains("impalad_node" + str(cluster_size), "INFO", + "join Impala Service pool") + + # Assert the new coordinator ran a fragment instance. + self.__run_assert_systables_query(vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1, cluster_size=1, + workload_mgmt=True, impalad_args=impalad_admission_ctrl_config_args( + fs_allocation_file="fair-scheduler-onlycoords.xml", + llama_site_file="llama-site-onlycoords.xml", + additional_args="--expected_executor_group_sets=root.group-set-small:1," + "root.group-set-large:2 " + "--num_expected_executors=2 --executor_groups=coordinator"), + statestored_args=_STATESTORED_ARGS) + def test_coord_only_pool_exec_groups(self, vector): + """Asserts queries using only coordinators request pools can run successfully when + executor groups are configured.""" + self.wait_for_wm_init_complete() + + # Assert queries can be run when no executors are started. + self.__run_assert_systables_query(vector) + + # Add a single executor for the small executor group set. + self._start_impala_cluster( + options=[ + "--impalad_args=--executor_groups=root.group-set-small-group-000:1"], + add_executors=True, + cluster_size=1, + wait_for_backends=False) + self.cluster.statestored.service.wait_for_live_subscribers(3, timeout=30) + self.__run_assert_systables_query(vector) + + # Add two executors for the large executor group set. + self._start_impala_cluster( + options=[ + "--impalad_args=--executor_groups=root.group-set-small-group-000:2"], + add_executors=True, + cluster_size=2, + wait_for_backends=False) + self.cluster.statestored.service.wait_for_live_subscribers(5, timeout=30) + self.__run_assert_systables_query(vector) + class TestAdmissionControllerWithACService(TestAdmissionController): """Runs all of the tests from TestAdmissionController but with the second impalad in the