This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0c9fe293c317f9135427f6a68f64c92abdfe0231
Author: Yida Wu <[email protected]>
AuthorDate: Tue Dec 9 00:51:28 2025 -0800

    IMPALA-14612: Add global metrics for admission state map size
    
    We need better observability for the admission state map to warn
    about potential memory leaks.
    
    The admission state map tracks queries currently being processed or
    queued. An entry is added when a query is submitted for admission.
    The entry is removed when the query finishes execution, is rejected
    by admission control, times out while queuing, or is cancelled. If
    the removal logic is missed due to bugs, the map size grows
    indefinitely, causing a memory leak. We have observed cases where
    admission state entries were not released, causing memory leaks in
    admissiond.
    
    Adds the metric admission-control-service.num-queries and its high
    water mark to track the number of active entries. This patch updates
    GenericShardedQueryMap to support an optional
    AtomicHighWaterMarkGauge. When set, the map automatically increments
    or decrements the gauge during Add and Delete operations. This
    ensures the metric accurately reflects the map size without requiring
    manual updates at every call site.
    
    Tests:
    Updated and passed test_admission_state_map_mem_leak to verify the
    metrics.
    
    Change-Id: Ie803aabf8d91b6381c5d0d7534cd9c9fc2166a73
    Reviewed-on: http://gerrit.cloudera.org:8080/23760
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Jason Fehr <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-control-service.cc    | 13 ++++++++++---
 be/src/util/sharded-query-map-util.cc             |  3 +++
 be/src/util/sharded-query-map-util.h              | 10 ++++++++++
 common/thrift/metrics.json                        | 20 ++++++++++++++++++++
 tests/custom_cluster/test_admission_controller.py | 16 +++++++++++-----
 5 files changed, 54 insertions(+), 8 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc 
b/be/src/scheduling/admission-control-service.cc
index 4489f7f36..127231507 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -18,6 +18,7 @@
 #include "scheduling/admission-control-service.h"
 
 #include "common/constant-strings.h"
+#include "common/names.h"
 #include "gen-cpp/admission_control_service.pb.h"
 #include "gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_context.h"
@@ -35,13 +36,16 @@
 #include "util/parse-util.h"
 #include "util/promise.h"
 
-#include "common/names.h"
-
 using kudu::rpc::RpcContext;
 
 static const string QUEUE_LIMIT_MSG = "(Advanced) Limit on RPC payloads 
consumption for "
                                       "AdmissionControlService. "
     + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
+static const string ADMISSION_MAP_SIZE_METRIC_KEY =
+    "admission-control-service.num-queries";
+static const string ADMISSION_MAP_SIZE_HWM_METRIC_KEY =
+    "admission-control-service.num-queries-high-water-mark";
+
 DEFINE_string(admission_control_service_queue_mem_limit, "50MB", 
QUEUE_LIMIT_MSG.c_str());
 DEFINE_int32(admission_control_service_num_svc_threads, 0,
     "Number of threads for processing admission control service's RPCs. if 
left at "
@@ -73,7 +77,7 @@ namespace impala {
 
 AdmissionControlService::AdmissionControlService(MetricGroup* metric_group)
   : 
AdmissionControlServiceIf(AdmissiondEnv::GetInstance()->rpc_mgr()->metric_entity(),
-        AdmissiondEnv::GetInstance()->rpc_mgr()->result_tracker()) {
+      AdmissiondEnv::GetInstance()->rpc_mgr()->result_tracker()) {
   MemTracker* process_mem_tracker = 
AdmissiondEnv::GetInstance()->process_mem_tracker();
   bool is_percent; // not used
   int64_t bytes_limit =
@@ -89,6 +93,9 @@ AdmissionControlService::AdmissionControlService(MetricGroup* 
metric_group)
       bytes_limit, "Admission Control Service Queue", process_mem_tracker));
   MemTrackerMetric::CreateMetrics(
       metric_group, mem_tracker_.get(), "AdmissionControlService");
+  AtomicHighWaterMarkGauge* map_size_metric = metric_group->AddHWMGauge(
+      ADMISSION_MAP_SIZE_HWM_METRIC_KEY, ADMISSION_MAP_SIZE_METRIC_KEY, 0);
+  admission_state_map_.SetSizeMetric(map_size_metric);
 }
 
 Status AdmissionControlService::Init() {
diff --git a/be/src/util/sharded-query-map-util.cc 
b/be/src/util/sharded-query-map-util.cc
index a723b27e5..ded7e9d1d 100644
--- a/be/src/util/sharded-query-map-util.cc
+++ b/be/src/util/sharded-query-map-util.cc
@@ -20,6 +20,7 @@
 #include "runtime/query-driver.h"
 #include "scheduling/admission-control-service.h"
 #include "util/debug-util.h"
+#include "util/metrics.h"
 
 namespace impala {
 
@@ -36,6 +37,7 @@ Status GenericShardedQueryMap<K, V>::Add(const K& query_id, 
const V& obj) {
         strings::Substitute("query id $0 already exists", PrintId(query_id))));
   }
   map_ref->insert(make_pair(query_id, obj));
+  if (size_metric_ != nullptr) size_metric_->Increment(1);
   return Status::OK();
 }
 
@@ -65,6 +67,7 @@ Status GenericShardedQueryMap<K, V>::Delete(const K& 
query_id) {
     return err;
   }
   map_ref->erase(entry);
+  if (size_metric_ != nullptr) size_metric_->Increment(-1);
   return Status::OK();
 }
 
diff --git a/be/src/util/sharded-query-map-util.h 
b/be/src/util/sharded-query-map-util.h
index 217090dc4..1f7d63f9d 100644
--- a/be/src/util/sharded-query-map-util.h
+++ b/be/src/util/sharded-query-map-util.h
@@ -28,6 +28,8 @@
 
 namespace impala {
 
+class AtomicHighWaterMarkGauge;
+
 /// This is a template that can be used for any map that maps from a query ID 
(TUniqueId
 /// or UniqueIdPB) to some object, and that needs to be sharded. It provides a 
SpinLock
 /// per shard to synchronize access to each shard of the map. The underlying 
shard is
@@ -65,6 +67,11 @@ class GenericShardedQueryMap {
     return count;
   }
 
+  void SetSizeMetric(AtomicHighWaterMarkGauge* metric) {
+    DCHECK(size_metric_ == nullptr);
+    size_metric_ = metric;
+  }
+
   // Adds ('key', 'value') to the map, returning an error if 'key' already 
exists.
   Status Add(const K& key, const V& value);
 
@@ -90,6 +97,9 @@ class GenericShardedQueryMap {
     SpinLock map_lock_;
   };
   struct MapShard shards_[NUM_QUERY_BUCKETS];
+
+  // Metric for tracking the map size.
+  AtomicHighWaterMarkGauge* size_metric_ = nullptr;
 };
 
 template <typename T>
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 310735693..182919de8 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -3948,6 +3948,26 @@
     "kind": "COUNTER",
     "key": "admission-controller.total-dequeue-failed-coordinator-limited"
   },
+  {
+    "key": "admission-control-service.num-queries-high-water-mark",
+    "label": "HWM Num Queries in Admission Control Service",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "description": "The high water mark of queries registered in Admission 
Control Service (queuing or running).",
+    "contexts": [
+      "ADMISSIOND"
+    ]
+  },
+  {
+    "key": "admission-control-service.num-queries",
+    "label": "Num Queries in Admission Control Service",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "description": "The number of queries currently registered in Admission 
Control Service (queuing or running) and not yet fully released.",
+    "contexts": [
+      "ADMISSIOND"
+    ]
+  },
   {
     "description": "The full version string of the Admission Control Server.",
     "contexts": [
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index f4254a95b..e085df6ed 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -2371,7 +2371,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     # Max timeout for waiting on query state transitions.
     timeout_s = 10
 
-    ac = self.cluster.admissiond
+    ac = self.cluster.admissiond.service
     all_coords = self.cluster.get_all_coordinators()
     assert len(all_coords) >= 2, "Test requires at least two coordinators"
 
@@ -2382,10 +2382,9 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     handle1 = client1.execute_async(long_query)
     client1.wait_for_impala_state(handle1, RUNNING, timeout_s)
 
-    # Allow some time for the system to stabilize.
-    sleep(5)
+    ac.wait_for_metric_value("admission-control-service.num-queries", 1)
     # Capture memory usage before stressing the system.
-    old_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
+    old_total_bytes = ac.get_metric_value("tcmalloc.bytes-in-use")
     assert old_total_bytes != 0
 
     # Submit short queries to coord2 which will be queued and time out.
@@ -2401,7 +2400,7 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
         client2.close_query(handle2)
 
     # Capture memory usage after the test.
-    new_total_bytes = ac.service.get_metric_value("tcmalloc.bytes-in-use")
+    new_total_bytes = ac.get_metric_value("tcmalloc.bytes-in-use")
 
     # Ensure memory usage has not grown more than 10%, indicating no leak.
     assert new_total_bytes < old_total_bytes * 1.1
@@ -2416,6 +2415,13 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     client1.close()
     client2.close()
 
+    # Verify num queries return to 0.
+    ac.wait_for_metric_value(
+      "admission-control-service.num-queries", 0)
+    num_queries_hwm = \
+      
ac.get_metric_value("admission-control-service.num-queries-high-water-mark")
+    assert num_queries_hwm > 1
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(

Reply via email to