This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit c9bfdbb272238a73e95e483c823f6b54f022de0d Author: Yida Wu <[email protected]> AuthorDate: Wed Jan 14 03:07:27 2026 -0800 IMPALA-14682: Use centralized async cleanup for admission state cleanup In IMPALA-14605, we added a mechanism to clean up the admission state asynchronously. This patch refactors all admission state deletions to use this centralized async method, making it easier to reason about when admission state is removed and to detect cases where a query’s admission state is not properly cleared. Additionally, this refactoring is a necessary step for future improvements, such as implementing time-based deletion. Also updated test_admission_state_map_mem_leak to verify the admission state number using the new global metric admission-control-service.num-queries as it is more stable than checking the log. Tests: Passed core tests. Passed exhaustive custom_cluster/test_admission_controller.py test. Change-Id: I04f46f2e42ec5e50f4dcccb6b73a34a376615ab0 Reviewed-on: http://gerrit.cloudera.org:8080/23873 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/scheduling/admission-control-service.cc | 28 +++++++++++------------ be/src/scheduling/admission-control-service.h | 6 ++--- be/src/scheduling/admission-controller.cc | 2 +- be/src/scheduling/admission-controller.h | 3 ++- be/src/scheduling/admissiond-env.cc | 4 ++-- tests/custom_cluster/test_admission_controller.py | 6 ++--- 6 files changed, 24 insertions(+), 25 deletions(-) diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index a94eb56de..c5b7aaa9b 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -241,7 +241,7 @@ void AdmissionControlService::GetQueryStatus(const GetQueryStatusRequestPB* req, // a retry may fail with an "Invalid handle" error because the entry is gone. // This is okay and doesn't cause any real problem. // To make it more robust, we may delay the removal using a time-based approach. - discard_result(admission_state_map_.Delete(req->query_id())); + CleanupAdmissionStateMapAsync(req->query_id(), __func__); VLOG(3) << "Current admission state map size: " << admission_state_map_.Count(); } RespondAndReleaseRpc(status, resp, rpc_context); @@ -265,7 +265,9 @@ void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req, } } - RESPOND_IF_ERROR(admission_state_map_.Delete(req->query_id())); + // Use async cleanup as the centralized way for admission state deletion, the + // client should not need to handle deletion errors of this internal map. + CleanupAdmissionStateMapAsync(req->query_id(), __func__); RespondAndReleaseRpc(Status::OK(), resp, rpc_context); } @@ -326,9 +328,7 @@ void AdmissionControlService::AdmissionHeartbeat(const AdmissionHeartbeatRequest req->host_id(), query_ids); for (const UniqueIdPB& query_id : cleaned_up) { - // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just - // ignore the return value. - discard_result(admission_state_map_.Delete(query_id)); + CleanupAdmissionStateMapAsync(query_id, __func__); } RespondAndReleaseRpc(Status::OK(), resp, rpc_context); @@ -343,9 +343,7 @@ void AdmissionControlService::CancelQueriesOnFailedCoordinators( for (const auto& entry : cleaned_up) { for (const UniqueIdPB& query_id : entry.second) { - // ShardedQueryMap::Delete will log an error already if anything goes wrong, so just - // ignore the return value. - discard_result(admission_state_map_.Delete(query_id)); + CleanupAdmissionStateMapAsync(query_id, __func__); } } } @@ -404,10 +402,11 @@ bool AdmissionControlService::CheckAndUpdateHeartbeat( return false; } -void AdmissionControlService::CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id) { +void AdmissionControlService::CleanupAdmissionStateMapAsync( + const UniqueIdPB& query_id, const char* caller_func) { { std::lock_guard<std::mutex> lock(cleanup_queue_lock_); - admission_state_cleanup_queue_.push_back(query_id); + admission_state_cleanup_queue_.emplace_back(query_id, caller_func); } cleanup_queue_cv_.notify_all(); } @@ -418,12 +417,13 @@ void AdmissionControlService::AdmissionStateMapCleanupLoop() { cleanup_queue_cv_.wait(lock, [&] { return !admission_state_cleanup_queue_.empty() || shutdown_.load(); }); if (admission_state_cleanup_queue_.empty() && shutdown_.load()) return; - std::deque<UniqueIdPB> local_queue; + std::deque<std::pair<UniqueIdPB, const char*>> local_queue; std::swap(local_queue, admission_state_cleanup_queue_); lock.unlock(); - for (const UniqueIdPB& query_id : local_queue) { - discard_result(admission_state_map_.Delete(query_id)); - VLOG_QUERY << "Cleaned up admission state map for query=" << PrintId(query_id); + for (const auto& entry : local_queue) { + discard_result(admission_state_map_.Delete(entry.first)); + VLOG_QUERY << "Cleaned up admission state map for query=" << PrintId(entry.first) + << ", triggered by function: " << entry.second; } lock.lock(); } diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h index 57afc90d4..762bc49a2 100644 --- a/be/src/scheduling/admission-control-service.h +++ b/be/src/scheduling/admission-control-service.h @@ -81,7 +81,7 @@ class AdmissionControlService : public AdmissionControlServiceIf, /// Asyncly queues a request to remove the query from admission_state_map_. /// This is non-blocking, thread-safe, and avoids deadlocks with the caller. - void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id); + void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id, const char* caller_func); private: friend class ImpalaHttpHandler; @@ -183,8 +183,8 @@ class AdmissionControlService : public AdmissionControlServiceIf, /// Condition variable to wake up the cleanup thread. std::condition_variable cleanup_queue_cv_; - /// Queue of query ids waiting to be removed from the map. - std::deque<UniqueIdPB> admission_state_cleanup_queue_; + /// Queue of query ids and the caller function name waiting to be removed from the map. + std::deque<std::pair<UniqueIdPB, const char*>> admission_state_cleanup_queue_; }; } // namespace impala diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 4fa8f4f37..75b9ea3ec 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -2656,7 +2656,7 @@ void AdmissionController::TryDequeue() { if (is_cancelled) { VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id); if (admission_map_cleanup_cb_) { - admission_map_cleanup_cb_(query_id); + admission_map_cleanup_cb_(query_id, __func__); } return; // next query } diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 1efd3750e..6cf60c411 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -485,7 +485,8 @@ class AdmissionController { }; // Callback type for cleaning up the admission map. - using AdmissionMapCleanupCb = std::function<void(const UniqueIdPB&)>; + using AdmissionMapCleanupCb = + std::function<void(const UniqueIdPB&, const char* caller_func)>; // Register the callback function for admission map cleanup. void RegisterAdmissionMapCleanupCallback(AdmissionMapCleanupCb cb) { diff --git a/be/src/scheduling/admissiond-env.cc b/be/src/scheduling/admissiond-env.cc index 895f052f5..c8398bc60 100644 --- a/be/src/scheduling/admissiond-env.cc +++ b/be/src/scheduling/admissiond-env.cc @@ -137,8 +137,8 @@ Status AdmissiondEnv::Init() { DCHECK(admission_control_svc_); DCHECK(admission_controller_); admission_controller_->RegisterAdmissionMapCleanupCallback( - [&](const UniqueIdPB& query_id) { - admission_control_svc_->CleanupAdmissionStateMapAsync(query_id); + [&](const UniqueIdPB& query_id, const char* caller_func) { + admission_control_svc_->CleanupAdmissionStateMapAsync(query_id, caller_func); }); RETURN_IF_ERROR(cluster_membership_mgr_->Init()); diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 8b033085d..24c063a4a 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2406,10 +2406,8 @@ class TestAdmissionControllerWithACService(TestAdmissionController): assert new_total_bytes < old_total_bytes * 1.1 # Check if the admission state map size stays 1 all the time, which is # the long running query. - admissiond_log = self.get_ac_log_name() - self.assert_log_contains(admissiond_log, 'INFO', - "Current admission state map size: {}".format(1), - expected_count=number_of_iterations) + admission_state_size = ac.get_metric_value("admission-control-service.num-queries") + assert admission_state_size == 1 # Cleanup clients. client1.close()
