This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3d68286bbbfc25bb3154d2ac06587a5490f817ce Author: Yida Wu <[email protected]> AuthorDate: Mon Dec 8 01:31:33 2025 -0800 IMPALA-14605: Fix memory leak in global admissiond for cancelled queued queries This patch fixes a memory leak where dequeued queries being cancelled due to the admission service backpressure errors, and the admission_state_map_ hasn't cleared the coresponding admission state entry. Since the removal could be unsafe due to lock inversion risks between the admission service and admission controller locks. The change introduces: 1. Async Cleanup: A background thread in AdmissionControlService to safely remove map entries without holding conflicting locks. 2. AdmissionController does cleanup via a callback function admission_map_cleanup_cb_ registered by AdmissiondEnv. 3. Added INBOUND_GETQUERYSTATUS_REJECT debug action to simulate backpressure and force cancellations during queuing. Tests: Added regression test test_admission_state_map_leak_in_dequeue. Change-Id: I7f16f3cf986e2038d6486d3ec687135d561e2cbf Reviewed-on: http://gerrit.cloudera.org:8080/23763 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/rpc/impala-service-pool.cc | 9 ++++ be/src/scheduling/admission-control-service.cc | 38 +++++++++++++++ be/src/scheduling/admission-control-service.h | 22 +++++++++ be/src/scheduling/admission-controller.cc | 3 ++ be/src/scheduling/admission-controller.h | 11 +++++ be/src/scheduling/admissiond-env.cc | 6 +++ tests/custom_cluster/test_admission_controller.py | 58 +++++++++++++++++++++++ 7 files changed, 147 insertions(+) diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc index bf3800a3a..43dd24e28 100644 --- a/be/src/rpc/impala-service-pool.cc +++ b/be/src/rpc/impala-service-pool.cc @@ -209,6 +209,15 @@ kudu::Status ImpalaServicePool::QueueInboundCall( return kudu::Status::OK(); } } + if (UNLIKELY(!FLAGS_debug_actions.empty())) { + Status status = DebugAction(FLAGS_debug_actions, "INBOUND_GETQUERYSTATUS_REJECT"); + if (!status.ok() && c->remote_method().method_name() == "GetQueryStatus") { + mem_tracker_lock.unlock(); + c->DiscardTransfer(); + RejectTooBusy(c); + return kudu::Status::OK(); + } + } service_mem_tracker_->Consume(transfer_size); } diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index 127231507..a94eb56de 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -114,11 +114,24 @@ Status AdmissionControlService::Init() { bind<void>(&AdmissionControlService::AdmitFromThreadPool, this, _2))); ABORT_IF_ERROR(admission_thread_pool_->Init()); + RETURN_IF_ERROR(Thread::Create("admission-control-service", + "admission-state-map-cleanup", + &AdmissionControlService::AdmissionStateMapCleanupLoop, this, &cleanup_thread_)); + return Status::OK(); } void AdmissionControlService::Join() { admission_thread_pool_->Join(); + shutdown_.store(true); + { + // Signal the cleanup thread to exit. + std::lock_guard<std::mutex> l(cleanup_queue_lock_); + cleanup_queue_cv_.notify_all(); + } + DCHECK(cleanup_thread_ != nullptr); + // Wait for the cleanup thread to finish clearing the queue. + cleanup_thread_->Join(); } Status AdmissionControlService::GetProxy( @@ -391,4 +404,29 @@ bool AdmissionControlService::CheckAndUpdateHeartbeat( return false; } +void AdmissionControlService::CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id) { + { + std::lock_guard<std::mutex> lock(cleanup_queue_lock_); + admission_state_cleanup_queue_.push_back(query_id); + } + cleanup_queue_cv_.notify_all(); +} + +void AdmissionControlService::AdmissionStateMapCleanupLoop() { + std::unique_lock<std::mutex> lock(cleanup_queue_lock_); + while (true) { + cleanup_queue_cv_.wait(lock, + [&] { return !admission_state_cleanup_queue_.empty() || shutdown_.load(); }); + if (admission_state_cleanup_queue_.empty() && shutdown_.load()) return; + std::deque<UniqueIdPB> local_queue; + std::swap(local_queue, admission_state_cleanup_queue_); + lock.unlock(); + for (const UniqueIdPB& query_id : local_queue) { + discard_result(admission_state_map_.Delete(query_id)); + VLOG_QUERY << "Cleaned up admission state map for query=" << PrintId(query_id); + } + lock.lock(); + } +} + } // namespace impala diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h index 5123e735d..57afc90d4 100644 --- a/be/src/scheduling/admission-control-service.h +++ b/be/src/scheduling/admission-control-service.h @@ -79,6 +79,10 @@ class AdmissionControlService : public AdmissionControlServiceIf, /// related RPCs. bool IsHealthy() { return service_started_.load(); } + /// Asyncly queues a request to remove the query from admission_state_map_. + /// This is non-blocking, thread-safe, and avoids deadlocks with the caller. + void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id); + private: friend class ImpalaHttpHandler; friend class AdmissiondEnv; @@ -161,8 +165,26 @@ class AdmissionControlService : public AdmissionControlServiceIf, /// was successful. bool CheckAndUpdateHeartbeat(const UniqueIdPB& coord_id, int64_t update_version); + /// Background thread loop that removes entries from admission_state_map_. + void AdmissionStateMapCleanupLoop(); + /// Indicates whether the admission control service is ready. std::atomic_bool service_started_{false}; + + /// Flag to signal to exit. + std::atomic_bool shutdown_{false}; + + /// Thread handle for the background cleanup worker. + std::unique_ptr<Thread> cleanup_thread_; + + /// Protecting the cleanup queue. + std::mutex cleanup_queue_lock_; + + /// Condition variable to wake up the cleanup thread. + std::condition_variable cleanup_queue_cv_; + + /// Queue of query ids waiting to be removed from the map. + std::deque<UniqueIdPB> admission_state_cleanup_queue_; }; } // namespace impala diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index e3bfc695b..a879d85fa 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -2654,6 +2654,9 @@ void AdmissionController::TryDequeue() { if (is_cancelled) { VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id); + if (admission_map_cleanup_cb_) { + admission_map_cleanup_cb_(query_id); + } return; // next query } diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index ac6f3178e..1efd3750e 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -484,6 +484,14 @@ class AdmissionController { bool track_per_user; }; + // Callback type for cleaning up the admission map. + using AdmissionMapCleanupCb = std::function<void(const UniqueIdPB&)>; + + // Register the callback function for admission map cleanup. + void RegisterAdmissionMapCleanupCallback(AdmissionMapCleanupCb cb) { + admission_map_cleanup_cb_ = std::move(cb); + } + private: class PoolStats; friend class PoolStats; @@ -1039,6 +1047,9 @@ class AdmissionController { std::string request_queue_topic_name_; + /// The callback function for admission map cleanup. + AdmissionMapCleanupCb admission_map_cleanup_cb_; + /// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting /// name in 'pool_name' the resulting config in 'pool_config', and the /// root config in 'root_config'. diff --git a/be/src/scheduling/admissiond-env.cc b/be/src/scheduling/admissiond-env.cc index bb7ff8669..895f052f5 100644 --- a/be/src/scheduling/admissiond-env.cc +++ b/be/src/scheduling/admissiond-env.cc @@ -134,6 +134,12 @@ Status AdmissiondEnv::Init() { admission_control_svc_.reset( new AdmissionControlService(DaemonEnv::GetInstance()->metrics())); RETURN_IF_ERROR(admission_control_svc_->Init()); + DCHECK(admission_control_svc_); + DCHECK(admission_controller_); + admission_controller_->RegisterAdmissionMapCleanupCallback( + [&](const UniqueIdPB& query_id) { + admission_control_svc_->CleanupAdmissionStateMapAsync(query_id); + }); RETURN_IF_ERROR(cluster_membership_mgr_->Init()); cluster_membership_mgr_->RegisterUpdateCallbackFn( diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index e085df6ed..8b033085d 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2422,6 +2422,64 @@ class TestAdmissionControllerWithACService(TestAdmissionController): ac.get_metric_value("admission-control-service.num-queries-high-water-mark") assert num_queries_hwm > 1 + @SkipIfNotHdfsMinicluster.tuned_for_minicluster + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=("--debug_actions=INBOUND_GETQUERYSTATUS_REJECT:[email protected] " + "--vmodule=admission-control-service=3 " + "--default_pool_max_requests=1 --queue_wait_timeout_ms=1"), + disable_log_buffering=True + ) + def test_admission_state_map_leak_in_dequeue(self): + """ + Regression test for IMPALA-14605. + We verify that cancellations of dequeued queries when there are dropped due to + backpressure errors do not leak memory in the admission_state_map. + """ + + # Log patterns to verify. + LOG_PATTERN_LEAK_FIX = "Dequeued cancelled query" + LOG_PATTERN_INJECTION = "dropped due to backpressure" + + ac_service = self.cluster.admissiond.service + + # The workload for concurrent clients. + def run_client_workload(client_index): + impalad = self.cluster.impalads[client_index % len(self.cluster.impalads)] + client = impalad.service.create_hs2_client() + query = "select sleep(1000)" + # Run multiple iterations to maximize race condition probability. + for i in range(10): + try: + handle = client.execute_async(query) + client.wait_for_finished_timeout(handle, 3000) + client.close_query(handle) + except Exception: + pass + try: + client.close() + except Exception: + pass + threads = [] + # Use multiple clients to do the queries concurrently. + num_clients = 5 + for i in range(num_clients): + t = threading.Thread(target=run_client_workload, args=(i,)) + t.start() + threads.append(t) + + for t in threads: + t.join() + + # Verify the logs and metrics. + ac_service.wait_for_metric_value( + "admission-control-service.num-queries", 0) + num_queries_hwm = \ + ac_service.get_metric_value("admission-control-service.num-queries-high-water-mark") + assert num_queries_hwm > 1 + self.assert_log_contains_multiline("admissiond", "INFO", LOG_PATTERN_INJECTION) + self.assert_log_contains_multiline("admissiond", "INFO", LOG_PATTERN_LEAK_FIX) + @SkipIfNotHdfsMinicluster.tuned_for_minicluster @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(
