IMPALA-5644,IMPALA-5810: Min reservation improvements Rejects queries during admission control if: * the largest (across all backends) min buffer reservation is greater than the query mem_limit or buffer_pool_limit * the sum of the min buffer reservations across the cluster is larger than the pool max mem resources
There are some other interesting cases to consider later: * every per-backend min buffer reservation is less than the associated backend's process mem_limit; the current admission control code doesn't know about other backend's proc mem_limits. Also reduces minimum non-reservation memory (IMPALA-5810). See the JIRA for experimental results that show this slightly improves min memory requirements for small queries. One reason to tweak this is to compensate for the fact that BufferedBlockMgr didn't count small buffers against the BlockMgr limit, but BufferPool counts all buffers against it. Testing: * Adds new test cases in test_admission_controller.py * Adds BE tests in reservation-tracker-test for the reservation-util code. Change-Id: Iabe87ce8f460356cfe4d1be4d7092c5900f9d79b Reviewed-on: http://gerrit.cloudera.org:8080/7678 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7264c547 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7264c547 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7264c547 Branch: refs/heads/master Commit: 7264c547516569b6c1152bcb61724ee8c2ee79d0 Parents: 5ec4046 Author: Matthew Jacobs <[email protected]> Authored: Tue Aug 15 10:06:25 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Aug 22 08:27:12 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/bufferpool/CMakeLists.txt | 1 + .../bufferpool/reservation-tracker-test.cc | 23 +++ be/src/runtime/bufferpool/reservation-util.cc | 41 ++++++ be/src/runtime/bufferpool/reservation-util.h | 74 ++++++++++ be/src/runtime/query-state.cc | 19 +-- be/src/scheduling/admission-controller.cc | 141 ++++++++++++++----- be/src/scheduling/admission-controller.h | 8 +- be/src/scheduling/scheduler.h | 3 +- common/thrift/generate_error_codes.py | 5 + .../queries/QueryTest/analytic-fns.test | 14 +- .../queries/QueryTest/spilling-aggs.test | 2 +- .../custom_cluster/test_admission_controller.py | 32 ++++- tests/query_test/test_mem_usage_scaling.py | 29 +--- 13 files changed, 293 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt index 231230b..4b1778b 100644 --- a/be/src/runtime/bufferpool/CMakeLists.txt +++ b/be/src/runtime/bufferpool/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(BufferPool buffer-allocator.cc buffer-pool.cc reservation-tracker.cc + reservation-util.cc suballocator.cc system-allocator.cc ) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index 5d22eb9..0d57488 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -20,6 +20,7 @@ #include <vector> #include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/bufferpool/reservation-util.h" #include "common/init.h" #include "common/object-pool.h" #include "runtime/mem-tracker.h" @@ -476,6 +477,28 @@ TEST_F(ReservationTrackerTest, TransferReservation) { grandparent->Close(); grandparent_mem_tracker->Close(); } + +TEST_F(ReservationTrackerTest, ReservationUtil) { + const int64_t MEG = 1024 * 1024; + const int64_t GIG = 1024 * 1024 * 1024; + EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING); + + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0)); + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1)); + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG)); + EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG)); + + EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0)); + EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1)); + EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG)); + EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG)); + + EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit( + ReservationUtil::GetMinMemLimitFromReservation(0))); + EXPECT_EQ(4 * GIG, ReservationUtil::GetReservationLimitFromMemLimit( + ReservationUtil::GetMinMemLimitFromReservation(4 * GIG))); +} + } IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-util.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc new file mode 100644 index 0000000..85718ab --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-util.cc @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/bufferpool/reservation-util.h" + +#include <algorithm> + +namespace impala { + +// Most operators that accumulate memory use reservations, so the majority of memory +// should be allocated to buffer reservations, as a heuristic. +const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8; +const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024; + +int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) { + int64_t max_reservation = std::min<int64_t>( + RESERVATION_MEM_FRACTION * mem_limit, mem_limit - RESERVATION_MEM_MIN_REMAINING); + return std::max<int64_t>(0, max_reservation); +} + +int64_t ReservationUtil::GetMinMemLimitFromReservation(int64_t buffer_reservation) { + buffer_reservation = std::max<int64_t>(0, buffer_reservation); + return std::max<int64_t>( + buffer_reservation * (1.0 / ReservationUtil::RESERVATION_MEM_FRACTION), + buffer_reservation + ReservationUtil::RESERVATION_MEM_MIN_REMAINING); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-util.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-util.h b/be/src/runtime/bufferpool/reservation-util.h new file mode 100644 index 0000000..cd54c65 --- /dev/null +++ b/be/src/runtime/bufferpool/reservation-util.h @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFERPOOL_RESERVATION_UTIL_H_ +#define IMPALA_RUNTIME_BUFFERPOOL_RESERVATION_UTIL_H_ + +#include <stdint.h> + +namespace impala { + +/// Utility code related to buffer reservations. +class ReservationUtil { + public: + /// There are currently two classes of memory: reserved memory (i.e. memory that is + /// reserved with reservation trackers/allocated by the buffer pool), and unreserved + /// memory (i.e. everything else; code that hasn't yet been updated to use reserved + /// memory). Eventually, all memory should be in the former category, but each operator + /// must be converted to use reserved memory and that work is ongoing. See IMPALA-4834. + /// In the meantime, the system memory must be shared between these two classes of + /// memory. RESERVATION_MEM_FRACTION and RESERVATION_MEM_MIN_REMAINING are used to + /// determine an upper bound on reserved memory for a query. Operators operate reliably + /// when they are using bounded reserved memory (e.g. staying under a limit by + /// spilling), but will generally fail if they hit a limit when trying to allocate + /// unreserved memory. Thus we need to ensure there is always space left in the query + /// memory limit for unreserved memory. + + /// The fraction of the query mem limit that is used as the maximum buffer reservation + /// limit, i.e. the bound on reserved memory. It is expected that unreserved memory + /// (i.e. not accounted by buffer reservation trackers) stays within + /// (1 - RESERVATION_MEM_FRACTION). + /// TODO: remove once all operators use buffer reservations. + static const double RESERVATION_MEM_FRACTION; + + /// The minimum amount of memory that should be left after buffer reservations, i.e. + /// this is the minimum amount of memory that should be left for unreserved memory. + /// TODO: remove once all operators use buffer reservations. + static const int64_t RESERVATION_MEM_MIN_REMAINING; + + /// Helper function to get the query buffer reservation limit (in bytes) given a query + /// mem_limit. In other words, this determines the maximum portion of the mem_limit + /// that should go to reserved memory. The limit on reservations is computed as: + /// min(query_limit * RESERVATION_MEM_FRACTION, + /// query_limit - RESERVATION_MEM_MIN_REMAINING) + /// TODO: remove once all operators use buffer reservations. + static int64_t GetReservationLimitFromMemLimit(int64_t mem_limit); + + /// Helper function to get the minimum query mem_limit (in bytes) that will be large + /// enough for a buffer reservation of size 'buffer_reservation' bytes. In other words, + /// this determines the minimum mem_limit that will be large enough to accomidate + /// 'buffer_reservation' reserved memory, as well as some amount of unreserved memory + /// (determined by a heuristic). + /// The returned mem_limit X satisfies: + /// buffer_reservation <= GetReservationLimitFromMemLimit(X) + /// TODO: remove once all operators use buffer reservations. + static int64_t GetMinMemLimitFromReservation(int64_t buffer_reservation); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 1c9b03b..5ac4998 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -24,6 +24,7 @@ #include "runtime/backend-client.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/bufferpool/reservation-util.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" #include "runtime/initial-reservations.h" @@ -38,20 +39,6 @@ using namespace impala; -// The fraction of the query mem limit that is used for buffer reservations. Most -// operators that accumulate memory use reservations, so the majority of memory should -// be allocated to buffer reservations, as a heuristic. -// TODO: this will go away once all operators use buffer reservations. -static const double RESERVATION_MEM_FRACTION = 0.8; - -// The minimum amount of memory that should be left after buffer reservations. -// The limit on reservations is computed as: -// min(query_limit * RESERVATION_MEM_FRACTION, -// query_limit - RESERVATION_MEM_MIN_REMAINING) -// TODO: this will go away once all operators use buffer reservations and we have accurate -// minimum requirements. -static const int64_t RESERVATION_MEM_MIN_REMAINING = 100 * 1024 * 1024; - QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) { DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr); query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id); @@ -172,9 +159,7 @@ Status QueryState::InitBufferPoolState() { max_reservation = numeric_limits<int64_t>::max(); } else { DCHECK_GE(mem_limit, 0); - max_reservation = min<int64_t>( - mem_limit * RESERVATION_MEM_FRACTION, mem_limit - RESERVATION_MEM_MIN_REMAINING); - max_reservation = max<int64_t>(0, max_reservation); + max_reservation = ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); } VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 07f473f..339935d 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -23,6 +23,7 @@ #include <gutil/strings/substitute.h> #include "common/logging.h" +#include "runtime/bufferpool/reservation-util.h" #include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "scheduling/scheduler.h" @@ -111,9 +112,21 @@ const string PROFILE_INFO_VAL_TIME_OUT = "Timed out (queued)"; const string PROFILE_INFO_KEY_QUEUE_DETAIL = "Admission queue details"; const string PROFILE_INFO_VAL_QUEUE_DETAIL = "waited $0 ms, reason: $1"; -// Error status string formats -// $0 = pool, $1 = rejection reason (see REASON_XXX below) -const string STATUS_REJECTED = "Rejected query from pool $0 : $1"; +// Error status string details +const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION = + "minimum memory reservation is greater than memory available to the query " + "for buffer reservations. Mem available for buffer reservations based on mem_limit: " + "$0, memory reservation needed: $1. Set mem_limit to at least $2. See the query " + "profile for more information."; +const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION = + "minimum memory reservation is greater than memory available to the query " + "for buffer reservations. Mem available for buffer reservations based on " + "buffer_pool_limit: $0, memory reservation needed: $1. See the query profile for " + "more information."; +const string REASON_MIN_RESERVATION_OVER_POOL_MEM = + "minimum memory reservation needed is greater than pool max mem resources. pool " + "max mem resources: $0, cluster-wide memory reservation needed: $1. See the query " + "profile for more information."; const string REASON_DISABLED_MAX_MEM_RESOURCES = "disabled by pool max mem resources set to 0"; const string REASON_DISABLED_REQUESTS_LIMIT = "disabled by requests limit set to 0"; @@ -133,9 +146,6 @@ const string QUEUED_NUM_RUNNING = "number of running queries $0 is over limit $1 // $0 = queue size const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued queries are " "executed first"; -// $0 = timeout in milliseconds, $1 = queue detail -const string STATUS_TIME_OUT = "Admission for query exceeded timeout $0ms. Queued " - "reason: $1"; // $0 = pool name, $1 = pool max memory, $2 = pool mem needed, $3 = pool mem available const string POOL_MEM_NOT_AVAILABLE = "Not enough aggregate memory available in pool $0 " "with max mem resources $1. Needed $2 but only $3 was available."; @@ -383,35 +393,90 @@ bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule, return true; } -Status AdmissionController::RejectImmediately(QuerySchedule* schedule, - const TPoolConfig& pool_cfg) { - const string& pool_name = schedule->request_pool(); - PoolStats* stats = GetPoolStats(pool_name); - const int64_t cluster_mem_needed = schedule->GetClusterMemoryEstimate(); - string reject_reason; +bool AdmissionController::RejectImmediately(QuerySchedule* schedule, + const TPoolConfig& pool_cfg, string* rejection_reason) { + DCHECK(rejection_reason != nullptr && rejection_reason->empty()); + // This function checks for a number of cases where the query can be rejected + // immediately. The first check that fails is the error that is reported. The order of + // the checks isn't particularly important, though some thought was given to ordering + // them in a way that might make the sense for a user. + + // Compute the max (over all backends) min_reservation_bytes and the cluster total + // (across all backends) min_reservation_bytes. + int64_t max_min_reservation_bytes = -1; + int64_t cluster_min_reservation_bytes = 0; + for (const auto& e: schedule->per_backend_exec_params()) { + cluster_min_reservation_bytes += e.second.min_reservation_bytes; + if (e.second.min_reservation_bytes > max_min_reservation_bytes) { + max_min_reservation_bytes = e.second.min_reservation_bytes; + } + } + + // Checks related to the min buffer reservation against configured query memory limits: + if (schedule->query_options().__isset.buffer_pool_limit && + schedule->query_options().buffer_pool_limit > 0) { + const int64_t buffer_pool_limit = schedule->query_options().buffer_pool_limit; + if (max_min_reservation_bytes > buffer_pool_limit) { + *rejection_reason = Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION, + PrintBytes(buffer_pool_limit), PrintBytes(max_min_reservation_bytes)); + return true; + } + } else if (schedule->query_options().__isset.mem_limit && + schedule->query_options().mem_limit > 0) { + const int64_t mem_limit = schedule->query_options().mem_limit; + const int64_t max_reservation = + ReservationUtil::GetReservationLimitFromMemLimit(mem_limit); + if (max_min_reservation_bytes > max_reservation) { + const int64_t required_mem_limit = + ReservationUtil::GetMinMemLimitFromReservation(max_min_reservation_bytes); + *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION, + PrintBytes(mem_limit), PrintBytes(max_min_reservation_bytes), + PrintBytes(required_mem_limit)); + return true; + } + } + + // Checks related to pool max_requests: if (pool_cfg.max_requests == 0) { - reject_reason = REASON_DISABLED_REQUESTS_LIMIT; - } else if (pool_cfg.max_mem_resources == 0) { - reject_reason = REASON_DISABLED_MAX_MEM_RESOURCES; - } else if (pool_cfg.max_mem_resources > 0 && - cluster_mem_needed > pool_cfg.max_mem_resources) { - reject_reason = Substitute(REASON_REQ_OVER_POOL_MEM, PrintBytes(cluster_mem_needed), + *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT; + return true; + } + + // Checks related to pool max_mem_resources: + if (pool_cfg.max_mem_resources == 0) { + *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES; + return true; + } + if (pool_cfg.max_mem_resources > 0 && + cluster_min_reservation_bytes > pool_cfg.max_mem_resources) { + *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM, + PrintBytes(pool_cfg.max_mem_resources), + PrintBytes(cluster_min_reservation_bytes)); + return true; + } + if (pool_cfg.max_mem_resources > 0 && + schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) { + *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM, + PrintBytes(schedule->GetClusterMemoryEstimate()), PrintBytes(pool_cfg.max_mem_resources)); - } else if (pool_cfg.max_mem_resources > 0 && + return true; + } + if (pool_cfg.max_mem_resources > 0 && schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) { - reject_reason = Substitute(REASON_REQ_OVER_NODE_MEM, + *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(schedule->GetPerHostMemoryEstimate()), PrintBytes(GetProcMemLimit())); - } else if (stats->agg_num_queued() >= pool_cfg.max_queued) { - reject_reason = Substitute(REASON_QUEUE_FULL, pool_cfg.max_queued, + return true; + } + + // Checks related to the pool queue size: + PoolStats* stats = GetPoolStats(schedule->request_pool()); + if (stats->agg_num_queued() >= pool_cfg.max_queued) { + *rejection_reason = Substitute(REASON_QUEUE_FULL, pool_cfg.max_queued, stats->agg_num_queued()); - } else { - return Status::OK(); // Not rejected + return true; } - schedule->set_is_admitted(false); - schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT, - PROFILE_INFO_VAL_REJECTED); - stats->metrics()->total_rejected->Increment(1); - return Status(Substitute(STATUS_REJECTED, pool_name, reject_reason)); + + return false; } void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& pool_cfg) { @@ -446,7 +511,17 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) { << " PoolConfig: max_requests=" << max_requests << " max_queued=" << max_queued << " max_mem=" << PrintBytes(max_mem); VLOG_QUERY << "Stats: " << stats->DebugString(); - RETURN_IF_ERROR(RejectImmediately(schedule, pool_cfg)); + string rejection_reason; + if (RejectImmediately(schedule, pool_cfg, &rejection_reason)) { + schedule->set_is_admitted(false); + schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT, + PROFILE_INFO_VAL_REJECTED); + stats->metrics()->total_rejected->Increment(1); + const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED, + pool_name, rejection_reason); + VLOG_QUERY << rejected_msg.msg(); + return Status::Expected(rejected_msg); + } pools_for_updates_.insert(pool_name); if (CanAdmitRequest(*schedule, pool_cfg, false, ¬_admitted_reason)) { @@ -513,8 +588,10 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) { schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT); stats->Dequeue(*schedule, true); - return Status(Substitute(STATUS_TIME_OUT, queue_wait_timeout_ms, - not_admitted_reason)); + const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT, + queue_wait_timeout_ms, pool_name, not_admitted_reason); + VLOG_QUERY << rejected_msg.msg(); + return Status::Expected(rejected_msg); } // The dequeue thread updates the stats (to avoid a race condition) so we do // not change them here. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/admission-controller.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index d3ec49d..3e49cfb 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -460,10 +460,12 @@ class AdmissionController { /// admission_ctrl_lock_. void UpdateHostMemAdmitted(const QuerySchedule& schedule, int64_t per_node_mem); - /// Returns an error status if this request must be rejected immediately, e.g. requires - /// more memory than possible to reserve or the queue is already full. + /// Returns true if this request must be rejected immediately, e.g. requires more + /// memory than possible to reserve or the queue is already full. If true, + /// rejection_reason is set to a explanation of why the request was rejected. /// Must hold admission_ctrl_lock_. - Status RejectImmediately(QuerySchedule* schedule, const TPoolConfig& pool_cfg); + bool RejectImmediately(QuerySchedule* schedule, const TPoolConfig& pool_cfg, + std::string* rejection_reason); /// Gets or creates the PoolStats for pool_name. Must hold admission_ctrl_lock_. PoolStats* GetPoolStats(const std::string& pool_name); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 3126436..07e0cfc 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -89,8 +89,7 @@ class Scheduler { impala::Status Init(); /// Populates given query schedule and assigns fragments to hosts based on scan - /// ranges in the query exec request. Submits schedule to admission control before - /// returning. + /// ranges in the query exec request. Status Schedule(QuerySchedule* schedule); private: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index ad0e342..5a72d80 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -328,6 +328,11 @@ error_codes = ( ("MINIMUM_RESERVATION_UNAVAILABLE", 106, "Failed to get minimum memory reservation of " "$0 on daemon $1:$2 for query $3 because it would exceed an applicable query, " "request pool or process memory limit. Memory usage:\\n$4"), + + ("ADMISSION_REJECTED", 107, "Rejected query from pool $0: $1"), + + ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in pool $1. " + "Queued reason: $2"), ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test index 27459ef..f8a024a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test +++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test @@ -1583,18 +1583,8 @@ from functional.alltypestiny order by id int, bigint, bigint, double ==== ---- QUERY -# Regression test for IMPALA-2265, IMPALA-2559. The buffer_pool_limit is tuned to -# reproduce the issue when running this query against functional_parquet. -SET default_spillable_buffer_size=8m; -SET buffer_pool_limit=16m; -SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col -FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 LIMIT 10; ----- CATCH -Failed to get minimum memory reservation -==== ----- QUERY -# Check that the above query can succeed with the minimum buffers (3 buffers for sort, -# 2 buffer for analytic). +# Check that the a large analytic query can succeed with the minimum buffers (3 buffers +# for sort, 2 buffer for analytic). SET default_spillable_buffer_size=8m; SET buffer_pool_limit=40m; SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test index d9f60cc..34628c0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test @@ -183,7 +183,7 @@ row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\) # IMPALA-3304: test that avg() can spill with a query mem limit. # This test only covers that use FIXED_UDA_INTERMEDIATE, not functions that allocate # strings for intermediate values. -set mem_limit=200m; +set mem_limit=150m; select l_orderkey, avg(l_tax), avg(l_quantity), avg(l_discount), avg(l_extendedprice) from tpch_parquet.lineitem group by 1 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index 250298b..ff4f4c6 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -313,7 +313,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): "select * from functional.alltypestiny"] for query in non_trivial_queries: ex = self.execute_query_expect_failure(self.client, query) - assert re.search("Rejected query from pool default-pool : request memory needed " + assert re.search("Rejected query from pool default-pool: request memory needed " ".* is greater than pool max mem resources 10.00 MB", str(ex)) @pytest.mark.execute_serially @@ -321,15 +321,35 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1, pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024), statestored_args=_STATESTORED_ARGS) - def test_initial_reservation(self): - """Test behaviour with admission control enabled if the initial reservation cannot be - acquired. The query options are set so that the query will be admitted, but acquiring - the initial reservation will fail because it is larger than mem_limit. + def test_reject_min_reservation(self): + """Test that the query will be rejected by admission control if: + a) the largest per-backend min buffer reservation is larger than the query mem + limit + b) the largest per-backend min buffer reservation is larger than the + buffer_pool_limit query option + c) the cluster-wide min-buffer reservation size is larger than the pool memory + resources. """ query = "select distinct * from functional_parquet.alltypesagg" opts = {'mem_limit': '10MB', 'num_nodes': '1'} ex = self.execute_query_expect_failure(self.client, query, opts) - assert "Failed to get minimum memory reservation" in str(ex) + assert ("minimum memory reservation is greater than memory available to the query " + "for buffer reservations. Mem available for buffer reservations based on " + "mem_limit: 10.00 MB, memory reservation needed: 34.00 MB. " + "Set mem_limit to at least 109.00 MB.") in str(ex) + + query = "select distinct * from functional_parquet.alltypesagg" + opts = {'buffer_pool_limit': '10MB', 'num_nodes': '1'} + ex = self.execute_query_expect_failure(self.client, query, opts) + assert ("minimum memory reservation is greater than memory available to the query " + "for buffer reservations. Mem available for buffer reservations based on " + "buffer_pool_limit: 10.00 MB, memory reservation needed: 34.00 MB.") in str(ex) + + opts = {'mem_limit': '150MB', 'num_nodes': '1'} + ex = self.execute_query_expect_failure(self.client, query, opts) + assert ("minimum memory reservation needed is greater than pool max mem resources. " + "pool max mem resources: 10.00 MB, cluster-wide memory reservation needed: " + "34.00 MB") in str(ex) # Process mem_limit used in test_mem_limit_upper_bound PROC_MEM_TEST_LIMIT = 1024 * 1024 * 1024 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/tests/query_test/test_mem_usage_scaling.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py index eac95e6..efc500b 100644 --- a/tests/query_test/test_mem_usage_scaling.py +++ b/tests/query_test/test_mem_usage_scaling.py @@ -26,8 +26,9 @@ from tests.common.test_vector import ImpalaTestDimension # Substrings of the expected error messages when the mem limit is too low MEM_LIMIT_EXCEEDED_MSG = "Memory limit exceeded" -INITIAL_RESERVATION_MSG = "Failed to get minimum memory reservation" -MEM_LIMIT_ERROR_MSGS = [MEM_LIMIT_EXCEEDED_MSG, INITIAL_RESERVATION_MSG] +MEM_LIMIT_TOO_LOW_FOR_RESERVATION = ("minimum memory reservation is greater than memory " + "available to the query for buffer reservations") +MEM_LIMIT_ERROR_MSGS = [MEM_LIMIT_EXCEEDED_MSG, MEM_LIMIT_TOO_LOW_FOR_RESERVATION] class TestQueryMemLimitScaling(ImpalaTestSuite): """Test class to do functional validation of per query memory limits. """ @@ -90,30 +91,6 @@ class TestExprMemUsage(ImpalaTestSuite): table_format=vector.get_value('table_format')) -class TestInitialReservation(ImpalaTestSuite): - @classmethod - def get_workload(self): - # Note: this workload doesn't run exhaustively. See IMPALA-3947 before trying to move - # this test to exhaustive. - return 'tpch' - - @classmethod - def add_test_dimensions(cls): - super(TestInitialReservation, cls).add_test_dimensions() - cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) - cls.ImpalaTestMatrix.add_constraint(lambda v:\ - v.get_value('table_format').file_format in ['parquet']) - - def test_initial_reservation(self, vector): - """Test failure to get the initial reservation.""" - exec_options = copy(vector.get_value('exec_option')) - exec_options['mem_limit'] = '20m' - query = """select * from tpch_parquet.lineitem l1 - join tpch_parquet.lineitem l2 on l1.l_orderkey = l2.l_orderkey""" - result = self.execute_query_expect_failure(self.client, query, exec_options) - assert (INITIAL_RESERVATION_MSG in str(result)) - - class TestLowMemoryLimits(ImpalaTestSuite): '''Super class for the memory limit tests with the TPC-H and TPC-DS queries'''
