This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e00e6c83 IMPALA-14771: Fix DCHECK hit due to dangling reference in 
admission queue
2e00e6c83 is described below

commit 2e00e6c839fcf2d2cd814eac4192480d9fa3d265
Author: Yida Wu <[email protected]>
AuthorDate: Thu Feb 19 08:27:16 2026 -0800

    IMPALA-14771: Fix DCHECK hit due to dangling reference in admission queue
    
    This patch fixes two related issues. First, tests using
    admission_control_rpc_compress_threshold_bytes were not applying the
    flag correctly at cluster startup, so the compressed path was never
    exercised. This is fixed by adding a helper in the tests to properly
    inject the flag into impalad arguments.
    
    Second, once compression was correctly enabled, during the tests, a
    DCHECK was triggered in DequeueLoop when evaluating queued queries
    with compressed execution requests. This happened because
    SubmitForAdmission() calls ClearDecompressedCache() to free the
    decompressed TQueryExecRequest while the query is queued, but
    the group states (ScheduleState objects) still held references to
    that freed request. When TryDequeue() later evaluated the query,
    it accessed these dangling references and hit the DCHECK.
    
    The fix clears group states immediately after clearing the
    decompression cache and updates the schedule recompute logic so
    group states are rebuilt when the query is dequeued.
    
    Tests:
    Passed test_admission_controller.py exhaustive tests.
    
    Change-Id: I969e4f32b6838d305c317d0a75f17211f75eed57
    Reviewed-on: http://gerrit.cloudera.org:8080/24024
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/scheduling/admission-controller.cc         |  9 +++-
 be/src/scheduling/admission-controller.h          | 12 ++++--
 tests/custom_cluster/test_admission_controller.py | 52 ++++++++++++++++++++---
 3 files changed, 61 insertions(+), 12 deletions(-)

diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 3f9f1c1e7..4f8b0dc01 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1790,7 +1790,11 @@ Status AdmissionController::SubmitForAdmission(const 
AdmissionRequest& request,
     }
     queue->Enqueue(queue_node);
     // Clear the decompressed cache to save the memory after enqueue.
-    queue_node->admission_request.request.ClearDecompressedCache();
+    if (queue_node->admission_request.request.ClearDecompressedCache()) {
+      // If the request is compressed and cache being cleared, also clear
+      // the group states to avoid dangling references.
+      queue_node->group_states.clear();
+    }
 
     // Must be done while we still hold 'admission_ctrl_lock_' as the dequeue 
loop thread
     // can modify 'not_admitted_reason'.
@@ -2339,7 +2343,8 @@ Status AdmissionController::ComputeGroupScheduleStates(
 
   DCHECK_GT(current_membership_version, 0);
   DCHECK_GE(current_membership_version, previous_membership_version);
-  if (current_membership_version <= previous_membership_version) {
+  if (current_membership_version <= previous_membership_version
+      && !queue_node->group_states.empty()) {
     VLOG(3) << "No rescheduling necessary, previous membership version: "
             << previous_membership_version
             << ", current membership version: " << current_membership_version;
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 8be3b92be..9a272acd5 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -69,7 +69,9 @@ class AdmissionExecRequest {
   virtual const TQueryExecRequest* request() const = 0;
 
   /// For compressed only, clears the cached decompressed object to save 
memory.
-  virtual void ClearDecompressedCache() const = 0;
+  /// Returns true if cache is empty, returns false if no compressed data or 
non-empty
+  /// after operation.
+  virtual bool ClearDecompressedCache() const = 0;
 };
 
 class AdmissionExecRequestUncompressed : public AdmissionExecRequest {
@@ -88,8 +90,9 @@ class AdmissionExecRequestUncompressed : public 
AdmissionExecRequest {
     return req_;
   }
 
-  void ClearDecompressedCache() const override {
-    // we don't have a compressed data, so don't need to do anything.
+  bool ClearDecompressedCache() const override {
+    // we don't have a compressed data, so return false.
+    return false;
   }
 
  private:
@@ -114,13 +117,14 @@ class AdmissionExecRequestCompressed : public 
AdmissionExecRequest {
     return nullptr;
   }
 
-  void ClearDecompressedCache() const override {
+  bool ClearDecompressedCache() const override {
     std::lock_guard<std::mutex> l(lock_);
     if (decompressed_req_) {
       LOG(INFO) << "Cleared the decompressed request for query "
                 << PrintId(decompressed_req_->query_ctx.query_id);
       decompressed_req_.reset();
     }
+    return true;
   }
 
  private:
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index fcc97d2ef..24c931957 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -206,6 +206,16 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
     if IMPALAD_ARGS in method.__dict__:
       method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
 
+  def enable_admission_compress(self, method, threshold):
+    """Inject argument to enable admission request compression.
+    Only works when admission service is enabled.
+    Must be called at setup_method() and before calling setup_method() of 
superclass."""
+    flag = 
'--admission_control_rpc_compress_threshold_bytes={0}'.format(threshold)
+    if IMPALAD_ARGS in method.__dict__:
+      method.__dict__[IMPALAD_ARGS] = method.__dict__[IMPALAD_ARGS] + ' ' + 
flag
+    else:
+      method.__dict__[IMPALAD_ARGS] = flag
+
 
 class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
 
@@ -2097,12 +2107,6 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
   """Runs all of the tests from TestAdmissionController but with the second 
impalad in the
   minicluster configured to perform all admission control."""
 
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestAdmissionControllerWithACService, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(
-        ImpalaTestDimension('admission_control_rpc_compress_threshold_bytes', 
0, 1))
-
   def get_ac_process(self):
     return self.cluster.admissiond
 
@@ -2542,6 +2546,24 @@ class 
TestAdmissionControllerWithACService(TestAdmissionController):
     client2.wait_for_impala_state(handle2, RUNNING, timeout_s)
 
 
+class TestAdmissionControllerWithACServiceCompress(TestAdmissionController):
+  """Runs all of the tests from TestAdmissionController but with the second 
impalad in the
+  minicluster configured to perform all admission control with compression."""
+
+  def get_ac_process(self):
+    return self.cluster.admissiond
+
+  def get_ac_log_name(self):
+    return "admissiond"
+
+  def setup_method(self, method):
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    self.enable_admission_service(method)
+    self.enable_admission_compress(method, 1)
+    super(TestAdmissionControllerWithACServiceCompress, 
self).setup_method(method)
+
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin
@@ -3225,3 +3247,21 @@ class 
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
       pytest.skip('runs only in exhaustive')
     self.enable_admission_service(method)
     super(TestAdmissionControllerStressWithACService, 
self).setup_method(method)
+
+
+class 
TestAdmissionControllerStressWithACServiceCompress(TestAdmissionControllerStress):
+  """Runs all of the tests from TestAdmissionControllerStress but with the 
second impalad
+  in the minicluster configured to perform all admission control with 
compression."""
+
+  def get_ac_processes(self):
+    return [self.cluster.admissiond]
+
+  def get_ac_log_name(self):
+    return "admissiond"
+
+  def setup_method(self, method):
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    self.enable_admission_service(method)
+    self.enable_admission_compress(method, 1)
+    super(TestAdmissionControllerStressWithACServiceCompress, 
self).setup_method(method)

Reply via email to