This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 543f0206eb68429c5bc7d69f2623addf71f57f7b Author: Yida Wu <[email protected]> AuthorDate: Thu Dec 18 02:28:47 2025 -0800 IMPALA-14661: Optimize admissiond memory usage by compressing exec requests In global admissiond, the TQueryExecRequest can be very large for complex queries, consuming large memory while queries are queued. This patch adds support for compressing TQueryExecRequest when sending it to the admission control service through AdmitQuery RPC. This reduces memory usage in admissiond for large query execution requests. Compression is controlled by the new startup flag admission_control_rpc_compress_threshold_bytes. A value of 0 disables compression, while positive values enable compression for requests larger than the threshold. The uncompressed path remains unchanged. Adds a new TQueryExecRequestCompressed thrift struct along with compression and decompression helper functions. The admission controller now handles both compressed and uncompressed requests through a common AdmissionExecRequest abstraction. Compressed requests are decompressed lazily and cached to reduce decompression overhead. Decompression timing is carefully controlled. Requests are initially decompressed at submission, but if a request is queued, the decompressed request cache is released to reduce memory usage. When a queued request is later dequeued, it is decompressed again and the decompressed cache is retained. Since admission uses FIFO ordering, a dequeued request is expected to be at the head of the queue and may be accessed multiple times if not admitted immediately. Retaining the cache in this case avoids repeated decompression. This patch also removes the query_options field in AdmissionRequest to eliminate ambiguity between TExecRequest.query_options and the query options nested in TQueryExecRequest. ClientRequestState is updated to sync the top-level TExecRequest.query_options into the nested TQueryExecRequest before admission. As a result, the admission controller now reads query options from TQueryExecRequest, enforcing a single source of truth for admission logic. Adds admissiond metrics to track compressed size, uncompressed size, and compression ratio for query execution requests. Testing: Adds unit tests for Thrift compression and decompression helpers. Updates admission controller tests to cover compressed requests. Passed exhaustive tests. Change-Id: I5a676d1a806451cbf84b0a3f8a706d7c6655e12d Reviewed-on: http://gerrit.cloudera.org:8080/23852 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Yida Wu <[email protected]> --- be/src/rpc/sidecar-util.h | 48 ++++++-- be/src/rpc/thrift-util-test.cc | 32 ++++++ be/src/rpc/thrift-util.h | 108 +++++++++++++++++- be/src/scheduling/admission-control-service.cc | 35 ++++-- be/src/scheduling/admission-control-service.h | 13 +++ be/src/scheduling/admission-controller-test.cc | 8 +- be/src/scheduling/admission-controller.cc | 123 ++++++++++++++++++--- be/src/scheduling/admission-controller.h | 91 ++++++++++++++- .../scheduling/remote-admission-control-client.cc | 20 ++-- be/src/service/client-request-state.cc | 13 +-- be/src/service/client-request-state.h | 4 + common/protobuf/admission_control_service.proto | 3 + common/thrift/Query.thrift | 8 ++ common/thrift/metrics.json | 30 +++++ tests/custom_cluster/test_admission_controller.py | 6 + 15 files changed, 483 insertions(+), 59 deletions(-) diff --git a/be/src/rpc/sidecar-util.h b/be/src/rpc/sidecar-util.h index d1d7956b3..35f022afe 100644 --- a/be/src/rpc/sidecar-util.h +++ b/be/src/rpc/sidecar-util.h @@ -31,27 +31,53 @@ namespace impala { class KrpcSerializer { public: - KrpcSerializer() : serializer_(/* compact */ true) {} + KrpcSerializer(int64_t threshold_bytes = 0) + : serializer_(/* compact */ true), compress_threshold_bytes_(threshold_bytes) {} /// Serialize obj and set it as a sidecar on 'rpc_controller', returning the idx in - /// 'sidecar_idx'. The memory for the sidecar is owned by this object and must remain - /// valid until the rpc has completed. - template <class T> - Status SerializeToSidecar( - const T* obj, kudu::rpc::RpcController* rpc_controller, int* sidecar_idx) { - uint8_t* serialized_buf = nullptr; - uint32_t serialized_len = 0; - RETURN_IF_ERROR(serializer_.SerializeToBuffer(obj, &serialized_len, &serialized_buf)); + /// 'sidecar_idx'. + /// + /// If TCompressed is provided and the serialized size exceeds + /// 'compress_threshold_bytes_', the object is compressed and the TCompressed object + /// is sent instead. Compression is disabled when the threshold is 0. + /// + /// The sidecar memory is owned by this serializer and must remain valid until the rpc + /// has completed. If provided, 'is_compressed' indicates whether the sidecar thrift + /// object is compressed. + template <typename TCompressed = void, typename T> + Status SerializeToSidecar(const T* obj, kudu::rpc::RpcController* rpc_controller, + int* sidecar_idx, bool* is_compressed = nullptr) { + uint8_t* buf = nullptr; + uint32_t len = 0; + RETURN_IF_ERROR(serializer_.SerializeToBuffer(obj, &len, &buf)); + + if (is_compressed != nullptr) *is_compressed = false; + + if constexpr (!std::is_same_v<TCompressed, void>) { + // Compression path. + if (compress_threshold_bytes_ > 0 && len > compress_threshold_bytes_) { + // NOTE: 'buf' points to the serializer's internal buffer. + // Ensure 'buf' is fully consumed into a compressed thrift in + // CreateCompressedThrift() before the next call to SerializeToBuffer() + // that could reset or reuse the serializer buffer. + TCompressed compressed_obj; + RETURN_IF_ERROR(CreateCompressedThrift<TCompressed>(buf, len, &compressed_obj)); + RETURN_IF_ERROR(serializer_.SerializeToBuffer(&compressed_obj, &len, &buf)); + if (is_compressed != nullptr) *is_compressed = true; + } + } + std::unique_ptr<kudu::rpc::RpcSidecar> rpc_sidecar = - kudu::rpc::RpcSidecar::FromSlice(kudu::Slice(serialized_buf, serialized_len)); + kudu::rpc::RpcSidecar::FromSlice(kudu::Slice(buf, len)); KUDU_RETURN_IF_ERROR( - rpc_controller->AddOutboundSidecar(move(rpc_sidecar), sidecar_idx), + rpc_controller->AddOutboundSidecar(std::move(rpc_sidecar), sidecar_idx), "Failed to add sidecar"); return Status::OK(); } private: ThriftSerializer serializer_; + int64_t compress_threshold_bytes_; }; // Retrieves the sidecar at 'sidecar_idx' from 'rpc_context' and deserializes it into diff --git a/be/src/rpc/thrift-util-test.cc b/be/src/rpc/thrift-util-test.cc index 427cb3d42..5b90cf241 100644 --- a/be/src/rpc/thrift-util-test.cc +++ b/be/src/rpc/thrift-util-test.cc @@ -129,4 +129,36 @@ TEST(ThriftUtil, SerDeBuffer100MB) { } } +struct MockCompressedThrift { + std::string compressed_data; + int64_t uncompressed_size; +}; + +TEST(ThriftUtil, CompressDecompressQueryOptions) { + TQueryOptions options; + options.__set_mem_limit(1024 * 1024 * 1024LL); + options.__set_query_timeout_s(300); + options.__set_abort_on_error(true); + + // Compress TQueryOptions. + MockCompressedThrift compressed_obj; + Status status = + CompressThrift<TQueryOptions, MockCompressedThrift>(options, &compressed_obj); + + EXPECT_TRUE(status.ok()) << status.GetDetail(); + EXPECT_GT(compressed_obj.uncompressed_size, 0); + EXPECT_GT(compressed_obj.compressed_data.size(), 0); + + // Decompress TQueryOptions. + TQueryOptions decompressed_options; + status = DecompressThrift<MockCompressedThrift, TQueryOptions>( + compressed_obj, &decompressed_options); + + EXPECT_TRUE(status.ok()) << status.GetDetail(); + + // Verify the fields. + EXPECT_EQ(options.mem_limit, decompressed_options.mem_limit); + EXPECT_EQ(options.query_timeout_s, decompressed_options.query_timeout_s); + EXPECT_EQ(options.abort_on_error, decompressed_options.abort_on_error); +} } diff --git a/be/src/rpc/thrift-util.h b/be/src/rpc/thrift-util.h index 311a636a6..fbb77a46f 100644 --- a/be/src/rpc/thrift-util.h +++ b/be/src/rpc/thrift-util.h @@ -19,18 +19,20 @@ #ifndef IMPALA_RPC_THRIFT_UTIL_H #define IMPALA_RPC_THRIFT_UTIL_H -#include <boost/shared_ptr.hpp> -#include <thrift/protocol/TBinaryProtocol.h> #include <sstream> #include <vector> + +#include <boost/shared_ptr.hpp> #include <thrift/TApplicationException.h> #include <thrift/TConfiguration.h> +#include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TDebugProtocol.h> #include <thrift/transport/TBufferTransports.h> #include <thrift/transport/TSSLSocket.h> #include <thrift/transport/TTransportException.h> #include "common/status.h" +#include "util/codec.h" namespace impala { @@ -267,6 +269,108 @@ bool IsPeekTimeoutTException(const apache::thrift::transport::TTransportExceptio /// Returns true if the exception indicates the other end of the TCP socket was closed. bool IsConnResetTException(const apache::thrift::transport::TTransportException& e); + +// Helper that compresses a raw buffer using LZ4 and creates a TCompressed object. +// +// TCompressed must have: +// - int64_t uncompressed_size; +// - binary compressed_data; +// +// Params: +// serialized_buf: Raw uncompressed data to compress. +// serialized_len: Length of the uncompressed data in bytes. +// dest: Output parameter for the compressed thrift object. +template <typename TCompressed> +Status CreateCompressedThrift( + const uint8_t* serialized_buf, uint32_t serialized_len, TCompressed* dest) { + DCHECK(serialized_buf != nullptr); + DCHECK(dest != nullptr); + + // Prepare compressor. + boost::scoped_ptr<Codec> compressor; + Codec::CodecInfo codec_info(THdfsCompression::LZ4); + RETURN_IF_ERROR(Codec::CreateCompressor(nullptr, false, codec_info, &compressor)); + + // Calculate buffer size and resize string. + int64_t max_out = compressor->MaxOutputLen(serialized_len); + std::string compressed_str(max_out, '\0'); + + // Perform Compression. + uint8_t* out_ptr = reinterpret_cast<uint8_t*>(compressed_str.data()); + int64_t actual_len = max_out; + RETURN_IF_ERROR(compressor->ProcessBlock( + true, serialized_len, serialized_buf, &actual_len, &out_ptr)); + // Validate output length. + if (actual_len <= 0 || actual_len > std::numeric_limits<uint32_t>::max()) { + return Status(strings::Substitute( + "Invalid actual compressed length in compressed request: $0B", actual_len)); + } + compressed_str.resize(actual_len); + + // Write to the TCompressed object. + dest->uncompressed_size = static_cast<int64_t>(serialized_len); + dest->compressed_data = std::move(compressed_str); + + return Status::OK(); +} + +// Serializes a T object 'src', creates a compressed thrift by the serialized bytes, +// and writes to a compressed T object 'dest'. +// +// Params: +// src: The thrift object to serialize and compress. +// dest: Output parameter for the compressed thrift object. +template <typename T, typename TCompressed> +Status CompressThrift(const T& src, TCompressed* dest) { + ThriftSerializer serializer(/* compact */ true); + uint8_t* serialized_buf = nullptr; + uint32_t serialized_len = 0; + RETURN_IF_ERROR(serializer.SerializeToBuffer(&src, &serialized_len, &serialized_buf)); + return CreateCompressedThrift<TCompressed>(serialized_buf, serialized_len, dest); +} + +// Decompresses a compressed Thrift object 'src' and deserializes it back into +// the standard Thrift object 'dest'. +// +// TCompressed must have: +// - int64_t uncompressed_size; +// - binary compressed_data; +// +// Params: +// src: The compressed thrift object to decompress and deserialize. +// dest: Output parameter for the deserialized thrift object. +template <typename TCompressed, typename T> +Status DecompressThrift(const TCompressed& src, T* dest) { + DCHECK(dest != nullptr); + int64_t uncompressed_len = src.uncompressed_size; + if (uncompressed_len <= 0 || uncompressed_len > std::numeric_limits<uint32_t>::max()) { + return Status(strings::Substitute( + "Invalid uncompressed size in compressed request: $0B", uncompressed_len)); + } + + // Prepare decompressor and decompress. + boost::scoped_ptr<Codec> decompressor; + RETURN_IF_ERROR( + Codec::CreateDecompressor(nullptr, false, THdfsCompression::LZ4, &decompressor)); + std::vector<uint8_t> uncompressed_buf(uncompressed_len); + uint8_t* out_ptr = uncompressed_buf.data(); + int64_t actual_out = uncompressed_len; + const std::string& input_data = src.compressed_data; + const uint8_t* in_ptr = reinterpret_cast<const uint8_t*>(input_data.data()); + RETURN_IF_ERROR( + decompressor->ProcessBlock(true, input_data.size(), in_ptr, &actual_out, &out_ptr)); + if (actual_out <= 0 || actual_out > std::numeric_limits<uint32_t>::max()) { + return Status(strings::Substitute( + "Invalid actual output size after decompression: $0B", actual_out)); + } + if (actual_out != uncompressed_len) { + return Status("Decompressed size did not match expected size from header"); + } + + // Deserialize and write to the T object. + uint32_t deser_len = static_cast<uint32_t>(actual_out); + return DeserializeThriftMsg(uncompressed_buf.data(), &deser_len, true, dest); +} } #endif diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc index c5b7aaa9b..f57287e69 100644 --- a/be/src/scheduling/admission-control-service.cc +++ b/be/src/scheduling/admission-control-service.cc @@ -151,16 +151,21 @@ void AdmissionControlService::AdmitQuery( VLOG(1) << "AdmitQuery: query_id=" << req->query_id() << " coordinator=" << req->coord_id(); - shared_ptr<AdmissionState> admission_state; - admission_state = make_shared<AdmissionState>(req->query_id(), req->coord_id()); - admission_state->query_exec_request = make_unique<TQueryExecRequest>(); - + shared_ptr<AdmissionState> admission_state = + make_shared<AdmissionState>(req->query_id(), req->coord_id()); admission_state->summary_profile = RuntimeProfile::Create(&admission_state->profile_pool, "Summary"); + if (req->has_is_compressed() && req->is_compressed()) { + admission_state->query_exec_request_compressed = + make_unique<TQueryExecRequestCompressed>(); + RESPOND_IF_ERROR(GetSidecar(req->query_exec_request_sidecar_idx(), rpc_context, + admission_state->query_exec_request_compressed.get())); - RESPOND_IF_ERROR(GetSidecar(req->query_exec_request_sidecar_idx(), rpc_context, - admission_state->query_exec_request.get())); - + } else { + admission_state->query_exec_request = make_unique<TQueryExecRequest>(); + RESPOND_IF_ERROR(GetSidecar(req->query_exec_request_sidecar_idx(), rpc_context, + admission_state->query_exec_request.get())); + } for (const NetworkAddressPB& address : req->blacklisted_executor_addresses()) { admission_state->blacklisted_executor_addresses.emplace(address); } @@ -212,7 +217,7 @@ void AdmissionControlService::GetQueryStatus(const GetQueryStatusRequestPB* req, if (admission_state->admit_status.ok()) { *resp->mutable_query_schedule() = *admission_state->schedule.get(); // Free TQueryExecRequest since it's not required after admission is done - admission_state->query_exec_request.reset(); + admission_state->ReleaseQueryExecRequest(); } else { status = admission_state->admit_status; } @@ -359,9 +364,19 @@ void AdmissionControlService::AdmitFromThreadPool(const UniqueIdPB& query_id) { { lock_guard<mutex> l(admission_state->lock); bool queued; + if (admission_state->query_exec_request) { + admission_state->admission_exec_request = + std::make_unique<AdmissionExecRequestUncompressed>( + admission_state->query_exec_request.get()); + } else { + admission_state->admission_exec_request = + std::make_unique<AdmissionExecRequestCompressed>( + admission_state->query_exec_request_compressed.get(), + AdmissiondEnv::GetInstance()->admission_controller()); + } + AdmissionController::AdmissionRequest request = {admission_state->query_id, - admission_state->coord_id, *admission_state->query_exec_request, - admission_state->query_exec_request->query_ctx.client_request.query_options, + admission_state->coord_id, *admission_state->admission_exec_request, admission_state->summary_profile, admission_state->blacklisted_executor_addresses}; admission_state->admit_status = diff --git a/be/src/scheduling/admission-control-service.h b/be/src/scheduling/admission-control-service.h index 762bc49a2..365f48fb3 100644 --- a/be/src/scheduling/admission-control-service.h +++ b/be/src/scheduling/admission-control-service.h @@ -92,11 +92,24 @@ class AdmissionControlService : public AdmissionControlServiceIf, AdmissionState(const UniqueIdPB& query_id, const UniqueIdPB& coord_id) : query_id(query_id), coord_id(coord_id) {} + ~AdmissionState() { ReleaseQueryExecRequest(); } + + void ReleaseQueryExecRequest() { + // 'admission_exec_request' holds the raw pointer to the request objects, + // so ensure it is destroyed before the requests become invalid. + admission_exec_request.reset(); + query_exec_request.reset(); + query_exec_request_compressed.reset(); + } + // The following are copied from the AdmitQueryRequestPB for this query and are valid // at any point after this AdmissionState has been added to 'admission_state_map_'. UniqueIdPB query_id; UniqueIdPB coord_id; std::unique_ptr<TQueryExecRequest> query_exec_request; + std::unique_ptr<TQueryExecRequestCompressed> query_exec_request_compressed; + std::unique_ptr<AdmissionExecRequest> admission_exec_request; + std::unordered_set<NetworkAddressPB> blacklisted_executor_addresses; // Protects all of the following members. diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc index 8dcdbc569..694e6d341 100644 --- a/be/src/scheduling/admission-controller-test.cc +++ b/be/src/scheduling/admission-controller-test.cc @@ -415,10 +415,12 @@ class AdmissionControllerTest : public testing::Test { UniqueIdPB* query_id = pool_.Add(new UniqueIdPB()); RuntimeProfile* summary_profile = pool_.Add(new RuntimeProfile(&pool_, "foo")); std::unordered_set<NetworkAddressPB> blacklisted_executor_addresses; - const TQueryExecRequest& exec_request = schedule_state->request(); + const TQueryExecRequest& t_exec_request = schedule_state->request(); + AdmissionExecRequestUncompressed* exec_request = + pool_.Add(new AdmissionExecRequestUncompressed(&t_exec_request)); TQueryOptions query_options; - AdmissionController::AdmissionRequest request = {*query_id, *coord_id, exec_request, - query_options, summary_profile, blacklisted_executor_addresses}; + AdmissionController::AdmissionRequest request = {*query_id, *coord_id, *exec_request, + summary_profile, blacklisted_executor_addresses}; // Clear queue_nodes_ so we can call this method again, though this means there can // only ever be one queue node. diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 75b9ea3ec..3f9f1c1e7 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -38,6 +38,7 @@ #include "util/bit-util.h" #include "util/collection-metrics.h" #include "util/debug-util.h" +#include "util/histogram-metric.h" #include "util/memory-metrics.h" #include "util/metrics.h" #include "util/pretty-printer.h" @@ -127,6 +128,12 @@ const string EXEC_GROUP_QUERY_LOAD_KEY_FORMAT = const string TOTAL_DEQUEUE_FAILED_COORDINATOR_LIMITED = "admission-controller.total-dequeue-failed-coordinator-limited"; +const string EXEC_REQ_COMPRESSED_SIZE_KEY = + "admission-controller.query-exec-request-compressed-bytes"; +const string EXEC_REQ_UNCOMPRESSED_SIZE_KEY = + "admission-controller.query-exec-request-uncompressed-bytes"; +const string EXEC_REQ_COMPRESSION_RATIO_KEY = + "admission-controller.query-exec-request-compression-ratio"; // Define metric key format strings for metrics in PoolMetrics // '$0' is replaced with the pool name by strings::Substitute @@ -346,6 +353,55 @@ static inline bool ParsePoolTopicKey( return true; } +Status AdmissionExecRequestCompressed::GetQueryExecRequest( + const TQueryExecRequest** out_req) const { + lock_guard<std::mutex> l(lock_); + if (decompressed_req_ != nullptr) { + *out_req = decompressed_req_.get(); + return Status::OK(); + } + DCHECK(req_ != nullptr); + + unique_ptr<TQueryExecRequest> new_req = std::make_unique<TQueryExecRequest>(); + RETURN_IF_ERROR(DecompressThrift(*req_, new_req.get())); + + int64_t comp_size = req_->compressed_data.size(); + int64_t uncomp_size = req_->uncompressed_size; + float ratio = (comp_size > 0) ? (static_cast<float>(uncomp_size) / comp_size) : 0.0f; + + LOG(INFO) << "Decompress TQueryExecRequest for query " + << PrintId(new_req->query_ctx.query_id) << ": Compressed size=" << comp_size + << " B" + << ", Uncompressed size=" << uncomp_size << " B" + << ", Ratio=" << ratio << "x"; + + // Update the compression stats only once. + if (first_decompress_ && admission_controller_ != nullptr) { + admission_controller_->UpdateAdmissionRequestCompressionStats( + comp_size, uncomp_size, ratio); + } + + // Store the object in our cache. + decompressed_req_ = std::move(new_req); + *out_req = decompressed_req_.get(); + first_decompress_ = false; + + return Status::OK(); +} + +void AdmissionController::UpdateAdmissionRequestCompressionStats( + int64_t compressed_size, int64_t uncompressed_size, float ratio) const { + if (compressed_size_metric_ != nullptr) { + compressed_size_metric_->Update(compressed_size); + } + if (uncompressed_size_metric_ != nullptr) { + uncompressed_size_metric_->Update(uncompressed_size); + } + if (compression_ratio_metric_ != nullptr) { + compression_ratio_metric_->Update(ratio); + } +} + // Append to ss a debug string for memory consumption part of the pool stats. // Here is one example. // topN_query_stats: queries=[554b016cf0f3a37f:9a1bfcfd00000000, @@ -700,6 +756,13 @@ AdmissionController::AdmissionController(ClusterMembershipMgr* cluster_membershi }); total_dequeue_failed_coordinator_limited_ = metrics_group_->AddCounter(TOTAL_DEQUEUE_FAILED_COORDINATOR_LIMITED, 0); + compressed_size_metric_ = metrics_group_->RegisterMetric(new HistogramMetric( + MetricDefs::Get(EXEC_REQ_COMPRESSED_SIZE_KEY), numeric_limits<int64_t>::max(), 3)); + uncompressed_size_metric_ = metrics_group_->RegisterMetric( + new HistogramMetric(MetricDefs::Get(EXEC_REQ_UNCOMPRESSED_SIZE_KEY), + numeric_limits<int64_t>::max(), 3)); + compression_ratio_metric_ = metrics_group_->RegisterMetric( + new HistogramMetric(MetricDefs::Get(EXEC_REQ_COMPRESSION_RATIO_KEY), 10000, 3)); if (FLAGS_cluster_membership_topic_id.empty()) { request_queue_topic_name_ = Statestore::IMPALA_REQUEST_QUEUE_TOPIC; } else { @@ -1584,7 +1647,11 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, unique_ptr<QuerySchedulePB>* schedule_result, bool& queued, std::string* request_pool) { queued = false; - DebugActionNoFail(request.query_options, "AC_BEFORE_ADMISSION"); + const TQueryExecRequest* exec_req = nullptr; + RETURN_IF_ERROR(request.request.GetQueryExecRequest(&exec_req)); + DCHECK(exec_req != nullptr); + DebugActionNoFail( + exec_req->query_ctx.client_request.query_options, "AC_BEFORE_ADMISSION"); DCHECK(schedule_result->get() == nullptr); ClusterMembershipMgr::SnapshotPtr membership_snapshot = @@ -1618,8 +1685,8 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, // Re-resolve the pool name to propagate any resolution errors now that this request is // known to require a valid pool. All executor groups / schedules will use the same pool // name. - RETURN_IF_ERROR(ResolvePoolAndGetConfig(request.request.query_ctx, - &queue_node->pool_name, &queue_node->pool_cfg, &queue_node->root_cfg)); + RETURN_IF_ERROR(ResolvePoolAndGetConfig(exec_req->query_ctx, &queue_node->pool_name, + &queue_node->pool_cfg, &queue_node->root_cfg)); request.summary_profile->AddInfoString("Request Pool", queue_node->pool_name); { @@ -1678,8 +1745,11 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, } string user; - RETURN_IF_ERROR(GetEffectiveShortUser( - queue_node->admission_request.request.query_ctx.session, &user)); + const TQueryExecRequest* exec_req_queue_node = nullptr; + RETURN_IF_ERROR( + queue_node->admission_request.request.GetQueryExecRequest(&exec_req_queue_node)); + DCHECK(exec_req_queue_node != nullptr); + RETURN_IF_ERROR(GetEffectiveShortUser(exec_req_queue_node->query_ctx.session, &user)); if (queue_node->admitted_schedule.get() != nullptr) { DCHECK(queue_node->admitted_schedule->query_schedule_pb().get() != nullptr); @@ -1719,6 +1789,8 @@ Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request, stats->IncrementPerUser(user); } queue->Enqueue(queue_node); + // Clear the decompressed cache to save the memory after enqueue. + queue_node->admission_request.request.ClearDecompressedCache(); // Must be done while we still hold 'admission_ctrl_lock_' as the dequeue loop thread // can modify 'not_admitted_reason'. @@ -1786,9 +1858,13 @@ Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id, queue_nodes_.erase(query_id); }); + const TQueryExecRequest* exec_req_queue_node = nullptr; + RETURN_IF_ERROR( + queue_node->admission_request.request.GetQueryExecRequest(&exec_req_queue_node)); + DCHECK(exec_req_queue_node != nullptr); // Disallow the FAIL action here. It would leave the queue in an inconsistent state. - DebugActionNoFail( - queue_node->admission_request.query_options, "AC_AFTER_ADMISSION_OUTCOME"); + DebugActionNoFail(exec_req_queue_node->query_ctx.client_request.query_options, + "AC_AFTER_ADMISSION_OUTCOME"); { lock_guard<mutex> lock(admission_ctrl_lock_); @@ -2270,6 +2346,9 @@ Status AdmissionController::ComputeGroupScheduleStates( return Status::OK(); } const AdmissionRequest& request = queue_node->admission_request; + const TQueryExecRequest* exec_req = nullptr; + RETURN_IF_ERROR(request.request.GetQueryExecRequest(&exec_req)); + DCHECK(exec_req != nullptr); VLOG(3) << "Scheduling query " << PrintId(request.query_id) << " with membership version " << current_membership_version; @@ -2304,7 +2383,7 @@ Status AdmissionController::ComputeGroupScheduleStates( executor_groups = {&membership_snapshot->all_coordinators}; } else { executor_groups = - GetExecutorGroupsForQuery(membership_snapshot->executor_groups, request); + GetExecutorGroupsForQuery(membership_snapshot->executor_groups, *exec_req); } if (executor_groups.empty()) { @@ -2314,8 +2393,9 @@ Status AdmissionController::ComputeGroupScheduleStates( } // Collect all coordinators if needed for the request. - ExecutorGroup coords = request.request.include_all_coordinators ? - membership_snapshot->all_coordinators : ExecutorGroup("all-coordinators"); + ExecutorGroup coords = exec_req->include_all_coordinators ? + membership_snapshot->all_coordinators : + ExecutorGroup("all-coordinators"); // We loop over the executor groups in a deterministic order. If // --balance_queries_across_executor_groups set to true, executor groups with more @@ -2342,7 +2422,8 @@ Status AdmissionController::ComputeGroupScheduleStates( } unique_ptr<ScheduleState> group_state = make_unique<ScheduleState>(request.query_id, - request.request, request.query_options, request.summary_profile, false); + *exec_req, exec_req->query_ctx.client_request.query_options, + request.summary_profile, false); const string& group_name = executor_group->name(); VLOG(3) << "Scheduling for executor group: " << group_name << " with " << executor_group->NumExecutors() << " executors"; @@ -2628,6 +2709,17 @@ void AdmissionController::TryDequeue() { VLOG(3) << "Dequeueing from stats for pool " << pool_name; stats->Dequeue(false); const UniqueIdPB& query_id = queue_node->admission_request.query_id; + const TQueryExecRequest* exec_req = nullptr; + Status status = + queue_node->admission_request.request.GetQueryExecRequest(&exec_req); + if (!status.ok()) { + LOG(WARNING) << "Failed to get query execution request for dequeued query " + << PrintId(query_id) << ": " << status.GetDetail(); + // Fall to rejection handling. + queue_node->not_admitted_reason = + Substitute("Failed to get query execution request: $0", status.GetDetail()); + is_rejected = true; + } if (is_rejected) { AdmissionOutcome outcome = queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED); @@ -2667,8 +2759,7 @@ void AdmissionController::TryDequeue() { DCHECK(!is_rejected); DCHECK(queue_node->admitted_schedule != nullptr); string user; - Status status = GetEffectiveShortUser( - queue_node->admission_request.request.query_ctx.session, &user); + status = GetEffectiveShortUser(exec_req->query_ctx.session, &user); DCHECK_OK(status); // Should never happen because user name was checked at query // entry. AdmitQuery(queue_node, user, true /* was_queued */, is_trivial); @@ -3097,15 +3188,15 @@ const pair<int64_t, int64_t> AdmissionController::GetAvailableMemAndSlots( vector<const ExecutorGroup*> AdmissionController::GetExecutorGroupsForQuery( const ClusterMembershipMgr::ExecutorGroups& all_groups, - const AdmissionRequest& request) { + const TQueryExecRequest& exec_req) { vector<const ExecutorGroup*> matching_groups; - if (scheduler_->IsCoordinatorOnlyQuery(request.request)) { + if (scheduler_->IsCoordinatorOnlyQuery(exec_req)) { // Coordinator only queries can run regardless of the presence of exec groups. This // empty group works as a proxy to schedule coordinator only queries. matching_groups.push_back(cluster_membership_mgr_->GetEmptyExecutorGroup()); return matching_groups; } - const string& pool_name = request.request.query_ctx.request_pool; + const string& pool_name = exec_req.query_ctx.request_pool; string prefix(pool_name + POOL_GROUP_DELIMITER); // We search for matching groups before the health check so that we don't fall back to // the default group in case there are matching but unhealthy groups. diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 6cf60c411..8be3b92be 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -34,6 +34,7 @@ #include "scheduling/schedule-state.h" #include "statestore/statestore-subscriber.h" #include "util/condition-variable.h" +#include "util/debug-util.h" #include "util/internal-queue.h" #include "util/runtime-profile.h" @@ -54,6 +55,82 @@ enum class AdmissionOutcome { CANCELLED, }; +class AdmissionExecRequest { + public: + virtual ~AdmissionExecRequest() = default; + + /// Retrieves the underlying TQueryExecRequest. + /// For compressed requests, this may trigger decompression. + /// Returns a pointer to the request via the 'out_req' output parameter. + virtual Status GetQueryExecRequest(const TQueryExecRequest** out_req) const = 0; + + /// Fast accessor for uncompressed requests only. + /// Avoids Status checks, DCHECKs if called on compressed requests. + virtual const TQueryExecRequest* request() const = 0; + + /// For compressed only, clears the cached decompressed object to save memory. + virtual void ClearDecompressedCache() const = 0; +}; + +class AdmissionExecRequestUncompressed : public AdmissionExecRequest { + public: + explicit AdmissionExecRequestUncompressed(const TQueryExecRequest* req) : req_(req) { + DCHECK(req_ != nullptr); + } + + Status GetQueryExecRequest(const TQueryExecRequest** out_req) const override { + *out_req = req_; + return Status::OK(); + } + + const TQueryExecRequest* request() const override { + DCHECK(req_ != nullptr); + return req_; + } + + void ClearDecompressedCache() const override { + // we don't have a compressed data, so don't need to do anything. + } + + private: + const TQueryExecRequest* req_; +}; + +class AdmissionController; +class AdmissionExecRequestCompressed : public AdmissionExecRequest { + public: + explicit AdmissionExecRequestCompressed( + const TQueryExecRequestCompressed* req, AdmissionController* admission_controller) + : req_(req), admission_controller_(admission_controller) { + DCHECK(req_ != nullptr); + DCHECK(admission_controller_ != nullptr); + } + + Status GetQueryExecRequest(const TQueryExecRequest** out_req) const override; + + const TQueryExecRequest* request() const override { + DCHECK(false) << "AdmissionExecRequestCompressed does not support request(). " + << "Use GetQueryExecRequest() instead."; + return nullptr; + } + + void ClearDecompressedCache() const override { + std::lock_guard<std::mutex> l(lock_); + if (decompressed_req_) { + LOG(INFO) << "Cleared the decompressed request for query " + << PrintId(decompressed_req_->query_ctx.query_id); + decompressed_req_.reset(); + } + } + + private: + const TQueryExecRequestCompressed* req_; + const AdmissionController* admission_controller_; + mutable std::mutex lock_; + mutable std::unique_ptr<TQueryExecRequest> decompressed_req_; + mutable bool first_decompress_ = true; +}; + /// The AdmissionController is used to throttle requests (e.g. queries, DML) based /// on available cluster resources, which are configured in one or more resource pools. A /// request will either be admitted for immediate execution, queued for later execution, @@ -373,8 +450,7 @@ class AdmissionController { struct AdmissionRequest { const UniqueIdPB& query_id; const UniqueIdPB& coord_id; - const TQueryExecRequest& request; - const TQueryOptions& query_options; + const AdmissionExecRequest& request; RuntimeProfile* summary_profile; std::unordered_set<NetworkAddressPB>& blacklisted_executor_addresses; }; @@ -493,6 +569,9 @@ class AdmissionController { admission_map_cleanup_cb_ = std::move(cb); } + void UpdateAdmissionRequestCompressionStats( + int64_t compressed_size, int64_t uncompressed_size, float ratio) const; + private: class PoolStats; friend class PoolStats; @@ -546,6 +625,12 @@ class AdmissionController { /// executor groups). IntCounter* total_dequeue_failed_coordinator_limited_ = nullptr; + /// Histograms for tracking the compressed/uncompressed sizes and compression ratios + /// of the admission requests of TQueryExecRequest. + mutable HistogramMetric* compressed_size_metric_; + mutable HistogramMetric* uncompressed_size_metric_; + mutable HistogramMetric* compression_ratio_metric_; + /// A typedef for a holder of per-user loads. /// This matches the thrift-generated type of user_loads in TPoolStats. /// There are a few helper functions for this type. @@ -1328,7 +1413,7 @@ class AdmissionController { /// resource pool associated with the query. std::vector<const ExecutorGroup*> GetExecutorGroupsForQuery( const ClusterMembershipMgr::ExecutorGroups& all_groups, - const AdmissionRequest& request); + const TQueryExecRequest& exec_req); /// Returns the current size of the cluster. int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot); diff --git a/be/src/scheduling/remote-admission-control-client.cc b/be/src/scheduling/remote-admission-control-client.cc index c090c89c9..f88d36ca0 100644 --- a/be/src/scheduling/remote-admission-control-client.cc +++ b/be/src/scheduling/remote-admission-control-client.cc @@ -17,6 +17,7 @@ #include "scheduling/remote-admission-control-client.h" +#include "common/names.h" #include "gen-cpp/admission_control_service.pb.h" #include "gen-cpp/admission_control_service.proxy.h" #include "kudu/rpc/rpc_controller.h" @@ -31,15 +32,16 @@ #include "util/time.h" #include "util/uid-util.h" -#include "common/names.h" - - DEFINE_int32(admission_status_retry_time_ms, 10, "(Advanced) The number of milliseconds coordinators will wait before retrying the " "GetQueryStatus rpc."); DEFINE_int32(admission_max_retry_time_s, 60, "(Advanced) The amount of time in seconds the coordinator will spend attempting to " "retry admission if the admissiond is unreachable."); +DEFINE_int64(admission_control_rpc_compress_threshold_bytes, 0, + "The minimum size in bytes of the serialized TQueryExecRequest required to trigger " + "compression in Admission Control RPCs. If set to 0, compression is disabled. " + "If set to a positive value, requests larger than this size will be compressed."); using namespace strings; using namespace kudu::rpc; @@ -57,9 +59,12 @@ Status RemoteAdmissionControlClient::TryAdmitQuery(AdmissionControlServiceProxy* AdmitQueryResponsePB resp; RpcController rpc_controller; - KrpcSerializer serializer; + KrpcSerializer serializer(FLAGS_admission_control_rpc_compress_threshold_bytes); int sidecar_idx; - RETURN_IF_ERROR(serializer.SerializeToSidecar(&request, &rpc_controller, &sidecar_idx)); + bool is_compressed; + RETURN_IF_ERROR(serializer.SerializeToSidecar<TQueryExecRequestCompressed>( + &request, &rpc_controller, &sidecar_idx, &is_compressed)); + req->set_is_compressed(is_compressed); req->set_query_exec_request_sidecar_idx(sidecar_idx); Status admit_status = Status::OK(); @@ -118,7 +123,7 @@ Status RemoteAdmissionControlClient::SubmitForAdmission( int64_t admission_start = MonotonicMillis(); kudu::Status admit_rpc_status = kudu::Status::OK(); Status admit_status = - TryAdmitQuery(proxy.get(), request.request, &req, &admit_rpc_status); + TryAdmitQuery(proxy.get(), *request.request.request(), &req, &admit_rpc_status); int32_t num_retries = 0; // Only retry AdmitQuery if the rpc layer reported a network error, indicating that the // admissiond was unreachable. @@ -142,7 +147,8 @@ Status RemoteAdmissionControlClient::SubmitForAdmission( // Re-resolve the admissiond address on each retry to handle cases // where the admissiond has restarted with a new IP. RETURN_IF_ERROR(AdmissionControlService::GetProxy(&proxy)); - admit_status = TryAdmitQuery(proxy.get(), request.request, &req, &admit_rpc_status); + admit_status = + TryAdmitQuery(proxy.get(), *request.request.request(), &req, &admit_rpc_status); } KUDU_RETURN_IF_ERROR(admit_rpc_status, "AdmitQuery rpc failed"); diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 031a90348..d75ca6ee0 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -677,10 +677,10 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { otel_span_manager_->StartChildSpanAdmissionControl(); } - const TQueryExecRequest* query_exec_request; TQueryExecRequest req; + req = exec_req.query_exec_request; + req.query_ctx.client_request.query_options = exec_req.query_options; if (ExecEnv::GetInstance()->AdmissionServiceEnabled()) { - req = exec_req.query_exec_request; if (req.__isset.query_plan) { // Use the swap() to ensure the string's memory is deallocated. // Using clear() sets the size to 0 but may not release the capacity. @@ -705,14 +705,13 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() { std::string().swap(client_req.redacted_stmt); client_req.__isset.redacted_stmt = false; } - query_exec_request = &req; - } else { - query_exec_request = &exec_req.query_exec_request; } + DCHECK(!admission_exec_request_); + admission_exec_request_ = std::make_unique<AdmissionExecRequestUncompressed>(&req); Status admit_status = admission_control_client_->SubmitForAdmission( - {query_id_pb, ExecEnv::GetInstance()->backend_id(), *query_exec_request, - exec_req.query_options, summary_profile_, blacklisted_executor_addresses_}, + {query_id_pb, ExecEnv::GetInstance()->backend_id(), *admission_exec_request_, + summary_profile_, blacklisted_executor_addresses_}, query_events_, &schedule_, &wait_start_time_ms_, &wait_end_time_ms_, otel_span_manager_.get()); diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index e327907e3..467e5aa55 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -39,6 +39,7 @@ namespace impala { class AdmissionControlClient; +class AdmissionExecRequest; class Coordinator; class Expr; class Frontend; @@ -757,6 +758,9 @@ class ClientRequestState { /// is initialized once it's finalized. It's owned by the parent QueryDriver. AtomicPtr<const TExecRequest> exec_request_{&unknown_exec_request_}; + /// Construct only once to be used as a member of AdmissionRequest. + std::unique_ptr<AdmissionExecRequest> admission_exec_request_; + /// If true, effective_user() has access to the runtime profile and execution /// summary. AtomicBool user_has_profile_access_{true}; diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto index 4954ac572..201769ac0 100644 --- a/common/protobuf/admission_control_service.proto +++ b/common/protobuf/admission_control_service.proto @@ -219,6 +219,9 @@ message AdmitQueryRequestPB { // List of backends this query should not be scheduled on. repeated NetworkAddressPB blacklisted_executor_addresses = 4; + + // Indicates whether the TQueryExecRequest sidecar is compressed. + optional bool is_compressed = 5 [default = false]; } message AdmitQueryResponsePB { diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 26750f5a1..ff46f940d 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -1079,6 +1079,14 @@ struct TFinalizeParams { 9: optional TIcebergDmlFinalizeParams iceberg_params; } +struct TQueryExecRequestCompressed { + // The size in bytes of the serialized TQueryExecRequest before compression. + 1: required i64 uncompressed_size + + // Serialized TQueryExecRequest compressed data. + 2: required binary compressed_data +} + // Result of call to ImpalaPlanService/JniFrontend.createExecRequest() struct TQueryExecRequest { // Exec info for all plans; the first one materializes the query result, and subsequent diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index b5954b2e3..c358b699b 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -3958,6 +3958,36 @@ "kind": "COUNTER", "key": "admission-controller.total-dequeue-failed-coordinator-limited" }, + { + "description": "Compressed TQueryExecRequest sizes processed by the Admission Controller.", + "contexts": [ + "ADMISSIOND" + ], + "label": "Admission Control Query Execution Request Compressed Size", + "units": "BYTES", + "kind": "HISTOGRAM", + "key": "admission-controller.query-exec-request-compressed-bytes" + }, + { + "description": "Uncompressed TQueryExecRequest sizes processed by the Admission Controller.", + "contexts": [ + "ADMISSIOND" + ], + "label": "Admission Control Query Execution Request Uncompressed Size", + "units": "BYTES", + "kind": "HISTOGRAM", + "key": "admission-controller.query-exec-request-uncompressed-bytes" + }, + { + "description": "Compression ratios (Uncompressed / Compressed) of TQueryExecRequest sizes.", + "contexts": [ + "ADMISSIOND" + ], + "label": "Admission Control Query Execution Request Compression Ratio", + "units": "NONE", + "kind": "HISTOGRAM", + "key": "admission-controller.query-exec-request-compression-ratio" + }, { "key": "admission-control-service.num-queries-high-water-mark", "label": "HWM Num Queries in Admission Control Service", diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 24c063a4a..4be8f7a76 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -2097,6 +2097,12 @@ class TestAdmissionControllerWithACService(TestAdmissionController): """Runs all of the tests from TestAdmissionController but with the second impalad in the minicluster configured to perform all admission control.""" + @classmethod + def add_test_dimensions(cls): + super(TestAdmissionControllerWithACService, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension( + ImpalaTestDimension('admission_control_rpc_compress_threshold_bytes', 0, 1)) + def get_ac_process(self): return self.cluster.admissiond
