IMPALA-3134: Support different proc mem limits among impalads for admission control checks
Currently the admission controller assumes that all backends have the same process mem limit as the impalad it itself is running on. With this patch the proc mem limit for each impalad is available to the admission controller and it uses it for making correct admisssion decisions. It currently works under the assumption that the per-process memory limit does not change dynamically. Testing: Added an e2e test. IMPALA-5662: Log the queuing reason for a query The queuing reason is now logged both while queuing for the first time and while trying to dequeue. Change-Id: Idb72eee790cc17466bbfa82e30f369a65f2b060e Reviewed-on: http://gerrit.cloudera.org:8080/10396 Reviewed-by: Bikramjeet Vig <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/466188b3 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/466188b3 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/466188b3 Branch: refs/heads/2.x Commit: 466188b3970595e2e04d7ecf6a5141a7d3012909 Parents: b07bb27 Author: Bikramjeet Vig <[email protected]> Authored: Fri May 4 14:42:48 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri May 25 23:17:16 2018 +0000 ---------------------------------------------------------------------- be/src/scheduling/admission-controller.cc | 70 +++++++++++--------- be/src/scheduling/admission-controller.h | 3 - be/src/scheduling/query-schedule.h | 4 ++ be/src/scheduling/scheduler.cc | 11 ++- be/src/scheduling/scheduler.h | 3 +- be/src/service/impala-server.cc | 3 + bin/start-impala-cluster.py | 12 ++++ common/thrift/StatestoreService.thrift | 3 + .../custom_cluster/test_admission_controller.py | 56 ++++++++++++++++ 9 files changed, 127 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index eef18ef..c9a2139 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -46,10 +46,6 @@ std::string PrintBytes(int64_t value) { return PrettyPrinter::Print(value, TUnit::BYTES); } -int64_t GetProcMemLimit() { - return ExecEnv::GetInstance()->process_mem_tracker()->limit(); -} - // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>". // "!" is used because the backend id contains a colon, but it should not contain "!". // When parsing the topic key we need to be careful to find the last instance in @@ -136,7 +132,7 @@ const string REASON_REQ_OVER_POOL_MEM = "The total memory needed is the per-node MEM_LIMIT times the number of nodes " "executing the query. See the Admission Control documentation for more information."; const string REASON_REQ_OVER_NODE_MEM = - "request memory needed $0 per node is greater than process mem limit $1.\n\n" + "request memory needed $0 per node is greater than process mem limit $1 of $2.\n\n" "Use the MEM_LIMIT query option to indicate how much memory is required per node."; // Queue decision details @@ -150,7 +146,7 @@ const string POOL_MEM_NOT_AVAILABLE = "Not enough aggregate memory available in "with max mem resources $1. Needed $2 but only $3 was available."; // $0 = host name, $1 = host mem needed, $3 = host mem available const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0." - "Needed $1 but only $2 was available."; + "Needed $1 but only $2 out of $3 was available."; // Parses the pool name and backend_id from the topic key if it is valid. // Returns true if the topic key is valid and pool_name and backend_id are set. @@ -348,10 +344,10 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule } // Case 2: - int64_t proc_mem_limit = GetProcMemLimit(); for (const auto& entry : schedule.per_backend_exec_params()) { const TNetworkAddress& host = entry.first; const string host_id = TNetworkAddressToString(host); + int64_t proc_mem_limit = entry.second.proc_mem_limit; int64_t mem_reserved = host_mem_reserved_[host_id]; int64_t mem_admitted = host_mem_admitted_[host_id]; VLOG_ROW << "Checking memory on host=" << host_id @@ -363,7 +359,8 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule if (effective_host_mem_reserved + per_node_mem_needed > proc_mem_limit) { *mem_unavailable_reason = Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(per_node_mem_needed), - PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L))); + PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)), + PrintBytes(proc_mem_limit)); return false; } } @@ -403,15 +400,22 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule, // the checks isn't particularly important, though some thought was given to ordering // them in a way that might make the sense for a user. - // Compute the max (over all backends) min_mem_reservation_bytes and the cluster total - // (across all backends) min_mem_reservation_bytes. + // Compute the max (over all backends) min_mem_reservation_bytes, the cluster total + // (across all backends) min_mem_reservation_bytes and the min (over all backends) + // min_proc_mem_limit. int64_t max_min_mem_reservation_bytes = -1; int64_t cluster_min_mem_reservation_bytes = 0; + pair<const TNetworkAddress*, int64_t> min_proc_mem_limit( + nullptr, std::numeric_limits<int64_t>::max()); for (const auto& e : schedule->per_backend_exec_params()) { cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes; if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) { max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes; } + if (e.second.proc_mem_limit < min_proc_mem_limit.second) { + min_proc_mem_limit.first = &e.first; + min_proc_mem_limit.second = e.second.proc_mem_limit; + } } // Checks related to the min buffer reservation against configured query memory limits: @@ -447,25 +451,26 @@ bool AdmissionController::RejectImmediately(QuerySchedule* schedule, *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES; return true; } - if (pool_cfg.max_mem_resources > 0 - && cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) { - *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM, - PrintBytes(pool_cfg.max_mem_resources), - PrintBytes(cluster_min_mem_reservation_bytes)); - return true; - } - if (pool_cfg.max_mem_resources > 0 - && schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) { - *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM, - PrintBytes(schedule->GetClusterMemoryEstimate()), - PrintBytes(pool_cfg.max_mem_resources)); - return true; - } - if (pool_cfg.max_mem_resources > 0 && - schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) { - *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM, - PrintBytes(schedule->GetPerHostMemoryEstimate()), PrintBytes(GetProcMemLimit())); - return true; + if (pool_cfg.max_mem_resources > 0) { + if (cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) { + *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM, + PrintBytes(pool_cfg.max_mem_resources), + PrintBytes(cluster_min_mem_reservation_bytes)); + return true; + } + if (schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) { + *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM, + PrintBytes(schedule->GetClusterMemoryEstimate()), + PrintBytes(pool_cfg.max_mem_resources)); + return true; + } + int64_t perHostMemoryEstimate = schedule->GetPerHostMemoryEstimate(); + if (perHostMemoryEstimate > min_proc_mem_limit.second) { + *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM, + PrintBytes(perHostMemoryEstimate), PrintBytes(min_proc_mem_limit.second), + TNetworkAddressToString(*min_proc_mem_limit.first)); + return true; + } } // Checks related to the pool queue size: @@ -537,7 +542,8 @@ Status AdmissionController::AdmitQuery(QuerySchedule* schedule) { } // We cannot immediately admit but do not need to reject, so queue the request - VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id()); + VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id()) + << " reason: " << not_admitted_reason; stats->Queue(*schedule); queue->Enqueue(&queue_node); } @@ -875,8 +881,8 @@ void AdmissionController::DequeueLoop() { // TODO: Requests further in the queue may be blocked unnecessarily. Consider a // better policy once we have better test scenarios. if (!CanAdmitRequest(schedule, pool_config, true, ¬_admitted_reason)) { - VLOG_RPC << "Could not dequeue query id=" << PrintId(schedule.query_id()) - << " reason: " << not_admitted_reason; + VLOG_QUERY << "Could not dequeue query id=" << PrintId(schedule.query_id()) + << " reason: " << not_admitted_reason; break; } VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id()); http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 3341b1b..406e09e 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -173,9 +173,6 @@ class ExecEnv; /// above. Note the pool's max_mem_resources (#1) is not contented. /// TODO: Improve the dequeuing policy. IMPALA-2968. /// -/// TODO: Assumes all impalads have the same proc mem limit. Should send proc mem limit -/// via statestore (e.g. ideally in TBackendDescriptor) and check per-node -/// reservations against this value. /// TODO: Remove less important debug logging after more cluster testing. Should have a /// better idea of what is perhaps unnecessary. class AdmissionController { http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index fa200c6..e5d8f6a 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -65,6 +65,10 @@ struct BackendExecParams { // operators in all fragment instances that execute on this backend. This is used for // an optimization in InitialReservation. Measured in bytes. int64_t initial_mem_reservation_total_claims; + + // The process memory limit of this backend. Obtained from the scheduler's executors + // configuration which is updated by membership updates from the statestore. + int64_t proc_mem_limit; }; /// map from an impalad host address to the list of assigned fragment instance params. http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index e25b777..20c3210 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -692,7 +692,7 @@ Status Scheduler::Schedule(QuerySchedule* schedule) { ExecutorsConfigPtr config_ptr = GetExecutorsConfig(); RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule)); ComputeFragmentExecParams(*config_ptr, schedule); - ComputeBackendExecParams(schedule); + ComputeBackendExecParams(*config_ptr, schedule); #ifndef NDEBUG schedule->Validate(); #endif @@ -709,7 +709,8 @@ Status Scheduler::Schedule(QuerySchedule* schedule) { return Status::OK(); } -void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) { +void Scheduler::ComputeBackendExecParams( + const BackendConfig& executor_config, QuerySchedule* schedule) { PerBackendExecParams per_backend_params; for (const FragmentExecParams& f : schedule->fragment_exec_params()) { for (const FInstanceExecParams& i : f.instance_exec_params) { @@ -726,6 +727,12 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) { f.fragment.initial_mem_reservation_total_claims; } } + + for (auto& backend: per_backend_params) { + const TNetworkAddress& host = backend.first; + backend.second.proc_mem_limit = + LookUpBackendDesc(executor_config, host).proc_mem_limit; + } schedule->set_per_backend_exec_params(per_backend_params); stringstream min_mem_reservation_ss; http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index b87b239..d0302e6 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -411,7 +411,8 @@ class Scheduler { /// Computes BackendExecParams for all backends assigned in the query. Must be called /// after ComputeFragmentExecParams(). - void ComputeBackendExecParams(QuerySchedule* schedule); + void ComputeBackendExecParams( + const BackendConfig& executor_config, QuerySchedule* schedule); /// Compute the FragmentExecParams for all plans in the schedule's /// TQueryExecRequest.plan_exec_info. http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 36224d4..3625a7d 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -53,6 +53,7 @@ #include "runtime/data-stream-mgr.h" #include "runtime/exec-env.h" #include "runtime/lib-cache.h" +#include "runtime/mem-tracker.h" #include "runtime/timestamp-value.h" #include "runtime/timestamp-value.inline.h" #include "runtime/tmp-file-mgr.h" @@ -1654,6 +1655,8 @@ void ImpalaServer::AddLocalBackendToStatestore( local_backend_descriptor.__set_is_executor(FLAGS_is_executor); local_backend_descriptor.__set_address(exec_env_->backend_address()); local_backend_descriptor.ip_address = exec_env_->ip_address(); + local_backend_descriptor.__set_proc_mem_limit( + exec_env_->process_mem_tracker()->limit()); if (FLAGS_use_krpc) { const TNetworkAddress& krpc_address = exec_env_->krpc_address(); DCHECK(IsResolvedAddress(krpc_address)); http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/bin/start-impala-cluster.py ---------------------------------------------------------------------- diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index cd926da..1242ab9 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -84,6 +84,10 @@ parser.add_option("--kudu_master_hosts", default=KUDU_MASTER_HOSTS, # replica initialization. The ith delay is applied to the ith impalad. parser.add_option("--catalog_init_delays", dest="catalog_init_delays", default="", help=SUPPRESS_HELP) +# For testing: Semi-colon separated list of startup arguments to be passed per impalad. +# The ith group of options is applied to the ith impalad. +parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string" + ,default="", help=SUPPRESS_HELP) options, args = parser.parse_args() @@ -236,6 +240,10 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi if options.catalog_init_delays != "": delay_list = [delay.strip() for delay in options.catalog_init_delays.split(",")] + per_impalad_args = [] + if options.per_impalad_args != "": + per_impalad_args = [args.strip() for args in options.per_impalad_args.split(";")] + # Start each impalad instance and optionally redirect the output to a log file. for i in range(cluster_size): if i == 0: @@ -273,6 +281,10 @@ def start_impalad_instances(cluster_size, num_coordinators, use_exclusive_coordi if options.disable_krpc: args = "-use_krpc=false %s" % (args) + # Appended at the end so they can override previous args. + if i < len(per_impalad_args): + args = "%s %s" % (args, per_impalad_args[i]) + stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name) exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path) http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/common/thrift/StatestoreService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift index 3702932..4f2dada 100644 --- a/common/thrift/StatestoreService.thrift +++ b/common/thrift/StatestoreService.thrift @@ -71,6 +71,9 @@ struct TBackendDescriptor { // IP address + port of KRPC based ImpalaInternalService on this backend 7: optional Types.TNetworkAddress krpc_address; + + // The process memory limit of this backend (in bytes). + 8: required i64 proc_mem_limit; } // Description of a single entry in a topic http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/tests/custom_cluster/test_admission_controller.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py index a8070a6..9aa74a5 100644 --- a/tests/custom_cluster/test_admission_controller.py +++ b/tests/custom_cluster/test_admission_controller.py @@ -24,6 +24,7 @@ import pytest import re import sys import threading +from copy import copy from time import sleep, time from tests.beeswax.impala_beeswax import ImpalaBeeswaxException @@ -426,6 +427,61 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite): exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT self.execute_query_expect_success(self.client, query, exec_options) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1, + pool_max_mem=10 * PROC_MEM_TEST_LIMIT, + queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS), + start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G", + statestored_args=_STATESTORED_ARGS) + def test_heterogeneous_proc_mem_limit(self, vector): + """ Test to ensure that the admission controller takes into account the actual proc + mem limits of each impalad. Starts a cluster where the last impalad has a smaller + proc mem limit than other impalads and runs queries where admission/rejection decision + depends on the coordinator knowing the other impalad's mem limits. + The queue_wait_timeout_ms has been set to be more than the prioritized statestore + update time, so that the queries don't time out before receiving updates to pool + stats""" + # Choose a query that runs on all 3 backends. + query = "select * from functional.alltypesagg, (select 1) B limit 1" + # Successfully run a query with mem limit equal to the lowest process memory among + # impalads + exec_options = copy(vector.get_value('exec_option')) + exec_options['mem_limit'] = "2G" + self.execute_query_expect_success(self.client, query, exec_options) + # Test that a query scheduled to run on a single node and submitted to the impalad + # with higher proc mem limit succeeds. + exec_options = copy(vector.get_value('exec_option')) + exec_options['mem_limit'] = "3G" + exec_options['num_nodes'] = "1" + self.execute_query_expect_success(self.client, query, exec_options) + # Exercise rejection checks in admission controller. + try: + exec_options = copy(vector.get_value('exec_option')) + exec_options['mem_limit'] = "3G" + self.execute_query(query, exec_options) + except ImpalaBeeswaxException as e: + assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB per " + "node is greater than process mem limit 2.00 GB of \S+", str(e)), str(e) + # Exercise queuing checks in admission controller. + try: + impalad_with_2g_mem = self.cluster.impalads[2].service.create_beeswax_client() + impalad_with_2g_mem.set_configuration_option('mem_limit', '1G') + impalad_with_2g_mem.execute_async("select sleep(1000)") + # Wait for statestore update to update the mem admitted in each node. + sleep(STATESTORE_RPC_FREQUENCY_MS/1000) + exec_options = copy(vector.get_value('exec_option')) + exec_options['mem_limit'] = "2G" + # Since Queuing is synchronous and we can't close the previous query till this + # returns, we wait for this to timeout instead. + self.execute_query(query, exec_options) + except ImpalaBeeswaxException as e: + assert re.search("Queued reason: Not enough memory available on host \S+.Needed " + "2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), str(e) + finally: + if impalad_with_2g_mem is not None: + impalad_with_2g_mem.close() + class TestAdmissionControllerStress(TestAdmissionControllerBase): """Submits a number of queries (parameterized) with some delay between submissions (parameterized) and the ability to submit to one impalad or many in a round-robin
