This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 2e00e6c83 IMPALA-14771: Fix DCHECK hit due to dangling reference in
admission queue
2e00e6c83 is described below
commit 2e00e6c839fcf2d2cd814eac4192480d9fa3d265
Author: Yida Wu <[email protected]>
AuthorDate: Thu Feb 19 08:27:16 2026 -0800
IMPALA-14771: Fix DCHECK hit due to dangling reference in admission queue
This patch fixes two related issues. First, tests using
admission_control_rpc_compress_threshold_bytes were not applying the
flag correctly at cluster startup, so the compressed path was never
exercised. This is fixed by adding a helper in the tests to properly
inject the flag into impalad arguments.
Second, once compression was correctly enabled, during the tests, a
DCHECK was triggered in DequeueLoop when evaluating queued queries
with compressed execution requests. This happened because
SubmitForAdmission() calls ClearDecompressedCache() to free the
decompressed TQueryExecRequest while the query is queued, but
the group states (ScheduleState objects) still held references to
that freed request. When TryDequeue() later evaluated the query,
it accessed these dangling references and hit the DCHECK.
The fix clears group states immediately after clearing the
decompression cache and updates the schedule recompute logic so
group states are rebuilt when the query is dequeued.
Tests:
Passed test_admission_controller.py exhaustive tests.
Change-Id: I969e4f32b6838d305c317d0a75f17211f75eed57
Reviewed-on: http://gerrit.cloudera.org:8080/24024
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/scheduling/admission-controller.cc | 9 +++-
be/src/scheduling/admission-controller.h | 12 ++++--
tests/custom_cluster/test_admission_controller.py | 52 ++++++++++++++++++++---
3 files changed, 61 insertions(+), 12 deletions(-)
diff --git a/be/src/scheduling/admission-controller.cc
b/be/src/scheduling/admission-controller.cc
index 3f9f1c1e7..4f8b0dc01 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -1790,7 +1790,11 @@ Status AdmissionController::SubmitForAdmission(const
AdmissionRequest& request,
}
queue->Enqueue(queue_node);
// Clear the decompressed cache to save the memory after enqueue.
- queue_node->admission_request.request.ClearDecompressedCache();
+ if (queue_node->admission_request.request.ClearDecompressedCache()) {
+ // If the request is compressed and cache being cleared, also clear
+ // the group states to avoid dangling references.
+ queue_node->group_states.clear();
+ }
// Must be done while we still hold 'admission_ctrl_lock_' as the dequeue
loop thread
// can modify 'not_admitted_reason'.
@@ -2339,7 +2343,8 @@ Status AdmissionController::ComputeGroupScheduleStates(
DCHECK_GT(current_membership_version, 0);
DCHECK_GE(current_membership_version, previous_membership_version);
- if (current_membership_version <= previous_membership_version) {
+ if (current_membership_version <= previous_membership_version
+ && !queue_node->group_states.empty()) {
VLOG(3) << "No rescheduling necessary, previous membership version: "
<< previous_membership_version
<< ", current membership version: " << current_membership_version;
diff --git a/be/src/scheduling/admission-controller.h
b/be/src/scheduling/admission-controller.h
index 8be3b92be..9a272acd5 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -69,7 +69,9 @@ class AdmissionExecRequest {
virtual const TQueryExecRequest* request() const = 0;
/// For compressed only, clears the cached decompressed object to save
memory.
- virtual void ClearDecompressedCache() const = 0;
+ /// Returns true if cache is empty, returns false if no compressed data or
non-empty
+ /// after operation.
+ virtual bool ClearDecompressedCache() const = 0;
};
class AdmissionExecRequestUncompressed : public AdmissionExecRequest {
@@ -88,8 +90,9 @@ class AdmissionExecRequestUncompressed : public
AdmissionExecRequest {
return req_;
}
- void ClearDecompressedCache() const override {
- // we don't have a compressed data, so don't need to do anything.
+ bool ClearDecompressedCache() const override {
+ // we don't have a compressed data, so return false.
+ return false;
}
private:
@@ -114,13 +117,14 @@ class AdmissionExecRequestCompressed : public
AdmissionExecRequest {
return nullptr;
}
- void ClearDecompressedCache() const override {
+ bool ClearDecompressedCache() const override {
std::lock_guard<std::mutex> l(lock_);
if (decompressed_req_) {
LOG(INFO) << "Cleared the decompressed request for query "
<< PrintId(decompressed_req_->query_ctx.query_id);
decompressed_req_.reset();
}
+ return true;
}
private:
diff --git a/tests/custom_cluster/test_admission_controller.py
b/tests/custom_cluster/test_admission_controller.py
index fcc97d2ef..24c931957 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -206,6 +206,16 @@ class TestAdmissionControllerBase(CustomClusterTestSuite):
if IMPALAD_ARGS in method.__dict__:
method.__dict__[ADMISSIOND_ARGS] = method.__dict__[IMPALAD_ARGS]
+ def enable_admission_compress(self, method, threshold):
+ """Inject argument to enable admission request compression.
+ Only works when admission service is enabled.
+ Must be called at setup_method() and before calling setup_method() of
superclass."""
+ flag =
'--admission_control_rpc_compress_threshold_bytes={0}'.format(threshold)
+ if IMPALAD_ARGS in method.__dict__:
+ method.__dict__[IMPALAD_ARGS] = method.__dict__[IMPALAD_ARGS] + ' ' +
flag
+ else:
+ method.__dict__[IMPALAD_ARGS] = flag
+
class TestAdmissionControllerRawHS2(TestAdmissionControllerBase, HS2TestSuite):
@@ -2097,12 +2107,6 @@ class
TestAdmissionControllerWithACService(TestAdmissionController):
"""Runs all of the tests from TestAdmissionController but with the second
impalad in the
minicluster configured to perform all admission control."""
- @classmethod
- def add_test_dimensions(cls):
- super(TestAdmissionControllerWithACService, cls).add_test_dimensions()
- cls.ImpalaTestMatrix.add_dimension(
- ImpalaTestDimension('admission_control_rpc_compress_threshold_bytes',
0, 1))
-
def get_ac_process(self):
return self.cluster.admissiond
@@ -2542,6 +2546,24 @@ class
TestAdmissionControllerWithACService(TestAdmissionController):
client2.wait_for_impala_state(handle2, RUNNING, timeout_s)
+class TestAdmissionControllerWithACServiceCompress(TestAdmissionController):
+ """Runs all of the tests from TestAdmissionController but with the second
impalad in the
+ minicluster configured to perform all admission control with compression."""
+
+ def get_ac_process(self):
+ return self.cluster.admissiond
+
+ def get_ac_log_name(self):
+ return "admissiond"
+
+ def setup_method(self, method):
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ self.enable_admission_service(method)
+ self.enable_admission_compress(method, 1)
+ super(TestAdmissionControllerWithACServiceCompress,
self).setup_method(method)
+
+
class TestAdmissionControllerStress(TestAdmissionControllerBase):
"""Submits a number of queries (parameterized) with some delay between
submissions
(parameterized) and the ability to submit to one impalad or many in a
round-robin
@@ -3225,3 +3247,21 @@ class
TestAdmissionControllerStressWithACService(TestAdmissionControllerStress):
pytest.skip('runs only in exhaustive')
self.enable_admission_service(method)
super(TestAdmissionControllerStressWithACService,
self).setup_method(method)
+
+
+class
TestAdmissionControllerStressWithACServiceCompress(TestAdmissionControllerStress):
+ """Runs all of the tests from TestAdmissionControllerStress but with the
second impalad
+ in the minicluster configured to perform all admission control with
compression."""
+
+ def get_ac_processes(self):
+ return [self.cluster.admissiond]
+
+ def get_ac_log_name(self):
+ return "admissiond"
+
+ def setup_method(self, method):
+ if self.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ self.enable_admission_service(method)
+ self.enable_admission_compress(method, 1)
+ super(TestAdmissionControllerStressWithACServiceCompress,
self).setup_method(method)