This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3d68286bbbfc25bb3154d2ac06587a5490f817ce
Author: Yida Wu <[email protected]>
AuthorDate: Mon Dec 8 01:31:33 2025 -0800

    IMPALA-14605: Fix memory leak in global admissiond for cancelled queued 
queries
    
    This patch fixes a memory leak where dequeued queries being
    cancelled due to the admission service backpressure errors,
    and the admission_state_map_ hasn't cleared the coresponding
    admission state entry. Since the removal could be unsafe due
    to lock inversion risks between the admission service and
    admission controller locks.
    
    The change introduces:
    1. Async Cleanup: A background thread in AdmissionControlService
    to safely remove map entries without holding conflicting locks.
    2. AdmissionController does cleanup via a callback function
    admission_map_cleanup_cb_ registered by AdmissiondEnv.
    3. Added INBOUND_GETQUERYSTATUS_REJECT debug action to simulate
    backpressure and force cancellations during queuing.
    
    Tests:
    Added regression test test_admission_state_map_leak_in_dequeue.
    
    Change-Id: I7f16f3cf986e2038d6486d3ec687135d561e2cbf
    Reviewed-on: http://gerrit.cloudera.org:8080/23763
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/rpc/impala-service-pool.cc                 |  9 ++++
 be/src/scheduling/admission-control-service.cc    | 38 +++++++++++++++
 be/src/scheduling/admission-control-service.h     | 22 +++++++++
 be/src/scheduling/admission-controller.cc         |  3 ++
 be/src/scheduling/admission-controller.h          | 11 +++++
 be/src/scheduling/admissiond-env.cc               |  6 +++
 tests/custom_cluster/test_admission_controller.py | 58 +++++++++++++++++++++++
 7 files changed, 147 insertions(+)

diff --git a/be/src/rpc/impala-service-pool.cc 
b/be/src/rpc/impala-service-pool.cc
index bf3800a3a..43dd24e28 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -209,6 +209,15 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
         return kudu::Status::OK();
       }
     }
+    if (UNLIKELY(!FLAGS_debug_actions.empty())) {
+      Status status = DebugAction(FLAGS_debug_actions, 
"INBOUND_GETQUERYSTATUS_REJECT");
+      if (!status.ok() && c->remote_method().method_name() == 
"GetQueryStatus") {
+        mem_tracker_lock.unlock();
+        c->DiscardTransfer();
+        RejectTooBusy(c);
+        return kudu::Status::OK();
+      }
+    }
     service_mem_tracker_->Consume(transfer_size);
   }
 
diff --git a/be/src/scheduling/admission-control-service.cc 
b/be/src/scheduling/admission-control-service.cc
index 127231507..a94eb56de 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -114,11 +114,24 @@ Status AdmissionControlService::Init() {
           bind<void>(&AdmissionControlService::AdmitFromThreadPool, this, 
_2)));
   ABORT_IF_ERROR(admission_thread_pool_->Init());
 
+  RETURN_IF_ERROR(Thread::Create("admission-control-service",
+      "admission-state-map-cleanup",
+      &AdmissionControlService::AdmissionStateMapCleanupLoop, this, 
&cleanup_thread_));
+
   return Status::OK();
 }
 
 void AdmissionControlService::Join() {
   admission_thread_pool_->Join();
+  shutdown_.store(true);
+  {
+    // Signal the cleanup thread to exit.
+    std::lock_guard<std::mutex> l(cleanup_queue_lock_);
+    cleanup_queue_cv_.notify_all();
+  }
+  DCHECK(cleanup_thread_ != nullptr);
+  // Wait for the cleanup thread to finish clearing the queue.
+  cleanup_thread_->Join();
 }
 
 Status AdmissionControlService::GetProxy(
@@ -391,4 +404,29 @@ bool AdmissionControlService::CheckAndUpdateHeartbeat(
   return false;
 }
 
+void AdmissionControlService::CleanupAdmissionStateMapAsync(const UniqueIdPB& 
query_id) {
+  {
+    std::lock_guard<std::mutex> lock(cleanup_queue_lock_);
+    admission_state_cleanup_queue_.push_back(query_id);
+  }
+  cleanup_queue_cv_.notify_all();
+}
+
+void AdmissionControlService::AdmissionStateMapCleanupLoop() {
+  std::unique_lock<std::mutex> lock(cleanup_queue_lock_);
+  while (true) {
+    cleanup_queue_cv_.wait(lock,
+        [&] { return !admission_state_cleanup_queue_.empty() || 
shutdown_.load(); });
+    if (admission_state_cleanup_queue_.empty() && shutdown_.load()) return;
+    std::deque<UniqueIdPB> local_queue;
+    std::swap(local_queue, admission_state_cleanup_queue_);
+    lock.unlock();
+    for (const UniqueIdPB& query_id : local_queue) {
+      discard_result(admission_state_map_.Delete(query_id));
+      VLOG_QUERY << "Cleaned up admission state map for query=" << 
PrintId(query_id);
+    }
+    lock.lock();
+  }
+}
+
 } // namespace impala
diff --git a/be/src/scheduling/admission-control-service.h 
b/be/src/scheduling/admission-control-service.h
index 5123e735d..57afc90d4 100644
--- a/be/src/scheduling/admission-control-service.h
+++ b/be/src/scheduling/admission-control-service.h
@@ -79,6 +79,10 @@ class AdmissionControlService : public 
AdmissionControlServiceIf,
   /// related RPCs.
   bool IsHealthy() { return service_started_.load(); }
 
+  /// Asyncly queues a request to remove the query from admission_state_map_.
+  /// This is non-blocking, thread-safe, and avoids deadlocks with the caller.
+  void CleanupAdmissionStateMapAsync(const UniqueIdPB& query_id);
+
  private:
   friend class ImpalaHttpHandler;
   friend class AdmissiondEnv;
@@ -161,8 +165,26 @@ class AdmissionControlService : public 
AdmissionControlServiceIf,
   /// was successful.
   bool CheckAndUpdateHeartbeat(const UniqueIdPB& coord_id, int64_t 
update_version);
 
+  /// Background thread loop that removes entries from admission_state_map_.
+  void AdmissionStateMapCleanupLoop();
+
   /// Indicates whether the admission control service is ready.
   std::atomic_bool service_started_{false};
+
+  /// Flag to signal to exit.
+  std::atomic_bool shutdown_{false};
+
+  /// Thread handle for the background cleanup worker.
+  std::unique_ptr<Thread> cleanup_thread_;
+
+  /// Protecting the cleanup queue.
+  std::mutex cleanup_queue_lock_;
+
+  /// Condition variable to wake up the cleanup thread.
+  std::condition_variable cleanup_queue_cv_;
+
+  /// Queue of query ids waiting to be removed from the map.
+  std::deque<UniqueIdPB> admission_state_cleanup_queue_;
 };
 
 } // namespace impala
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index e3bfc695b..a879d85fa 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -2654,6 +2654,9 @@ void AdmissionController::TryDequeue() {
 
       if (is_cancelled) {
         VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id);
+        if (admission_map_cleanup_cb_) {
+          admission_map_cleanup_cb_(query_id);
+        }
         return; // next query
       }
 
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index ac6f3178e..1efd3750e 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -484,6 +484,14 @@ class AdmissionController {
     bool track_per_user;
   };
 
+  // Callback type for cleaning up the admission map.
+  using AdmissionMapCleanupCb = std::function<void(const UniqueIdPB&)>;
+
+  // Register the callback function for admission map cleanup.
+  void RegisterAdmissionMapCleanupCallback(AdmissionMapCleanupCb cb) {
+    admission_map_cleanup_cb_ = std::move(cb);
+  }
+
  private:
   class PoolStats;
   friend class PoolStats;
@@ -1039,6 +1047,9 @@ class AdmissionController {
 
   std::string request_queue_topic_name_;
 
+  /// The callback function for admission map cleanup.
+  AdmissionMapCleanupCb admission_map_cleanup_cb_;
+
   /// Resolves the resource pool name in 'query_ctx.request_pool' and stores 
the resulting
   /// name in 'pool_name' the resulting config in 'pool_config', and the
   /// root config in 'root_config'.
diff --git a/be/src/scheduling/admissiond-env.cc 
b/be/src/scheduling/admissiond-env.cc
index bb7ff8669..895f052f5 100644
--- a/be/src/scheduling/admissiond-env.cc
+++ b/be/src/scheduling/admissiond-env.cc
@@ -134,6 +134,12 @@ Status AdmissiondEnv::Init() {
   admission_control_svc_.reset(
       new AdmissionControlService(DaemonEnv::GetInstance()->metrics()));
   RETURN_IF_ERROR(admission_control_svc_->Init());
+  DCHECK(admission_control_svc_);
+  DCHECK(admission_controller_);
+  admission_controller_->RegisterAdmissionMapCleanupCallback(
+      [&](const UniqueIdPB& query_id) {
+        admission_control_svc_->CleanupAdmissionStateMapAsync(query_id);
+      });
 
   RETURN_IF_ERROR(cluster_membership_mgr_->Init());
   cluster_membership_mgr_->RegisterUpdateCallbackFn(
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index e085df6ed..8b033085d 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -2422,6 +2422,64 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
       
ac.get_metric_value("admission-control-service.num-queries-high-water-mark")
     assert num_queries_hwm > 1
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=("--debug_actions=INBOUND_GETQUERYSTATUS_REJECT:[email protected] "
+                    "--vmodule=admission-control-service=3 "
+                    "--default_pool_max_requests=1 --queue_wait_timeout_ms=1"),
+      disable_log_buffering=True
+  )
+  def test_admission_state_map_leak_in_dequeue(self):
+    """
+    Regression test for IMPALA-14605.
+    We verify that cancellations of dequeued queries when there are dropped 
due to
+    backpressure errors do not leak memory in the admission_state_map.
+    """
+
+    # Log patterns to verify.
+    LOG_PATTERN_LEAK_FIX = "Dequeued cancelled query"
+    LOG_PATTERN_INJECTION = "dropped due to backpressure"
+
+    ac_service = self.cluster.admissiond.service
+
+    # The workload for concurrent clients.
+    def run_client_workload(client_index):
+        impalad = self.cluster.impalads[client_index % 
len(self.cluster.impalads)]
+        client = impalad.service.create_hs2_client()
+        query = "select sleep(1000)"
+        # Run multiple iterations to maximize race condition probability.
+        for i in range(10):
+          try:
+            handle = client.execute_async(query)
+            client.wait_for_finished_timeout(handle, 3000)
+            client.close_query(handle)
+          except Exception:
+            pass
+        try:
+          client.close()
+        except Exception:
+          pass
+    threads = []
+    # Use multiple clients to do the queries concurrently.
+    num_clients = 5
+    for i in range(num_clients):
+        t = threading.Thread(target=run_client_workload, args=(i,))
+        t.start()
+        threads.append(t)
+
+    for t in threads:
+        t.join()
+
+    # Verify the logs and metrics.
+    ac_service.wait_for_metric_value(
+      "admission-control-service.num-queries", 0)
+    num_queries_hwm = \
+      
ac_service.get_metric_value("admission-control-service.num-queries-high-water-mark")
+    assert num_queries_hwm > 1
+    self.assert_log_contains_multiline("admissiond", "INFO", 
LOG_PATTERN_INJECTION)
+    self.assert_log_contains_multiline("admissiond", "INFO", 
LOG_PATTERN_LEAK_FIX)
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(

Reply via email to