IMPALA-3134: Support different proc mem limits among impalads for
admission control checks

Currently the admission controller assumes that all backends have the
same process mem limit as the impalad it itself is running on. With
this patch the proc mem limit for each impalad is available to the
admission controller and it uses it for making correct admisssion
decisions. It currently works under the assumption that the
per-process memory limit does not change dynamically.

Testing:
Added an e2e test.

IMPALA-5662: Log the queuing reason for a query

The queuing reason is now logged both while queuing for the first
time and while trying to dequeue.

Change-Id: Idb72eee790cc17466bbfa82e30f369a65f2b060e
Reviewed-on: http://gerrit.cloudera.org:8080/10396
Reviewed-by: Bikramjeet Vig <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/466188b3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/466188b3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/466188b3

Branch: refs/heads/2.x
Commit: 466188b3970595e2e04d7ecf6a5141a7d3012909
Parents: b07bb27
Author: Bikramjeet Vig <[email protected]>
Authored: Fri May 4 14:42:48 2018 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       | 70 +++++++++++---------
 be/src/scheduling/admission-controller.h        |  3 -
 be/src/scheduling/query-schedule.h              |  4 ++
 be/src/scheduling/scheduler.cc                  | 11 ++-
 be/src/scheduling/scheduler.h                   |  3 +-
 be/src/service/impala-server.cc                 |  3 +
 bin/start-impala-cluster.py                     | 12 ++++
 common/thrift/StatestoreService.thrift          |  3 +
 .../custom_cluster/test_admission_controller.py | 56 ++++++++++++++++
 9 files changed, 127 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index eef18ef..c9a2139 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -46,10 +46,6 @@ std::string PrintBytes(int64_t value) {
   return PrettyPrinter::Print(value, TUnit::BYTES);
 }
 
-int64_t GetProcMemLimit() {
-  return ExecEnv::GetInstance()->process_mem_tracker()->limit();
-}
-
 // Delimiter used for topic keys of the form 
"<pool_name><delimiter><backend_id>".
 // "!" is used because the backend id contains a colon, but it should not 
contain "!".
 // When parsing the topic key we need to be careful to find the last instance 
in
@@ -136,7 +132,7 @@ const string REASON_REQ_OVER_POOL_MEM =
     "The total memory needed is the per-node MEM_LIMIT times the number of 
nodes "
     "executing the query. See the Admission Control documentation for more 
information.";
 const string REASON_REQ_OVER_NODE_MEM =
-    "request memory needed $0 per node is greater than process mem limit 
$1.\n\n"
+    "request memory needed $0 per node is greater than process mem limit $1 of 
$2.\n\n"
     "Use the MEM_LIMIT query option to indicate how much memory is required 
per node.";
 
 // Queue decision details
@@ -150,7 +146,7 @@ const string POOL_MEM_NOT_AVAILABLE = "Not enough aggregate 
memory available in
     "with max mem resources $1. Needed $2 but only $3 was available.";
 // $0 = host name, $1 = host mem needed, $3 = host mem available
 const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0."
-    "Needed $1 but only $2 was available.";
+    "Needed $1 but only $2 out of $3 was available.";
 
 // Parses the pool name and backend_id from the topic key if it is valid.
 // Returns true if the topic key is valid and pool_name and backend_id are set.
@@ -348,10 +344,10 @@ bool AdmissionController::HasAvailableMemResources(const 
QuerySchedule& schedule
   }
 
   // Case 2:
-  int64_t proc_mem_limit = GetProcMemLimit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
+    int64_t proc_mem_limit = entry.second.proc_mem_limit;
     int64_t mem_reserved = host_mem_reserved_[host_id];
     int64_t mem_admitted = host_mem_admitted_[host_id];
     VLOG_ROW << "Checking memory on host=" << host_id
@@ -363,7 +359,8 @@ bool AdmissionController::HasAvailableMemResources(const 
QuerySchedule& schedule
     if (effective_host_mem_reserved + per_node_mem_needed > proc_mem_limit) {
       *mem_unavailable_reason = Substitute(HOST_MEM_NOT_AVAILABLE, host_id,
           PrintBytes(per_node_mem_needed),
-          PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)));
+          PrintBytes(max(proc_mem_limit - effective_host_mem_reserved, 0L)),
+          PrintBytes(proc_mem_limit));
       return false;
     }
   }
@@ -403,15 +400,22 @@ bool 
AdmissionController::RejectImmediately(QuerySchedule* schedule,
   // the checks isn't particularly important, though some thought was given to 
ordering
   // them in a way that might make the sense for a user.
 
-  // Compute the max (over all backends) min_mem_reservation_bytes and the 
cluster total
-  // (across all backends) min_mem_reservation_bytes.
+  // Compute the max (over all backends) min_mem_reservation_bytes, the 
cluster total
+  // (across all backends) min_mem_reservation_bytes and the min (over all 
backends)
+  // min_proc_mem_limit.
   int64_t max_min_mem_reservation_bytes = -1;
   int64_t cluster_min_mem_reservation_bytes = 0;
+  pair<const TNetworkAddress*, int64_t> min_proc_mem_limit(
+      nullptr, std::numeric_limits<int64_t>::max());
   for (const auto& e : schedule->per_backend_exec_params()) {
     cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
     if (e.second.min_mem_reservation_bytes > max_min_mem_reservation_bytes) {
       max_min_mem_reservation_bytes = e.second.min_mem_reservation_bytes;
     }
+    if (e.second.proc_mem_limit < min_proc_mem_limit.second) {
+      min_proc_mem_limit.first = &e.first;
+      min_proc_mem_limit.second = e.second.proc_mem_limit;
+    }
   }
 
   // Checks related to the min buffer reservation against configured query 
memory limits:
@@ -447,25 +451,26 @@ bool 
AdmissionController::RejectImmediately(QuerySchedule* schedule,
     *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
     return true;
   }
-  if (pool_cfg.max_mem_resources > 0
-      && cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) {
-    *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
-        PrintBytes(pool_cfg.max_mem_resources),
-        PrintBytes(cluster_min_mem_reservation_bytes));
-    return true;
-  }
-  if (pool_cfg.max_mem_resources > 0
-      && schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
-    *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
-        PrintBytes(schedule->GetClusterMemoryEstimate()),
-        PrintBytes(pool_cfg.max_mem_resources));
-    return true;
-  }
-  if (pool_cfg.max_mem_resources > 0 &&
-      schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) {
-    *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-        PrintBytes(schedule->GetPerHostMemoryEstimate()), 
PrintBytes(GetProcMemLimit()));
-    return true;
+  if (pool_cfg.max_mem_resources > 0) {
+    if (cluster_min_mem_reservation_bytes > pool_cfg.max_mem_resources) {
+      *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
+          PrintBytes(pool_cfg.max_mem_resources),
+          PrintBytes(cluster_min_mem_reservation_bytes));
+      return true;
+    }
+    if (schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
+          PrintBytes(schedule->GetClusterMemoryEstimate()),
+          PrintBytes(pool_cfg.max_mem_resources));
+      return true;
+    }
+    int64_t perHostMemoryEstimate = schedule->GetPerHostMemoryEstimate();
+    if (perHostMemoryEstimate > min_proc_mem_limit.second) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+          PrintBytes(perHostMemoryEstimate), 
PrintBytes(min_proc_mem_limit.second),
+          TNetworkAddressToString(*min_proc_mem_limit.first));
+      return true;
+    }
   }
 
   // Checks related to the pool queue size:
@@ -537,7 +542,8 @@ Status AdmissionController::AdmitQuery(QuerySchedule* 
schedule) {
     }
 
     // We cannot immediately admit but do not need to reject, so queue the 
request
-    VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id());
+    VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id())
+               << " reason: " << not_admitted_reason;
     stats->Queue(*schedule);
     queue->Enqueue(&queue_node);
   }
@@ -875,8 +881,8 @@ void AdmissionController::DequeueLoop() {
         // TODO: Requests further in the queue may be blocked unnecessarily. 
Consider a
         // better policy once we have better test scenarios.
         if (!CanAdmitRequest(schedule, pool_config, true, 
&not_admitted_reason)) {
-          VLOG_RPC << "Could not dequeue query id=" << 
PrintId(schedule.query_id())
-                   << " reason: " << not_admitted_reason;
+          VLOG_QUERY << "Could not dequeue query id=" << 
PrintId(schedule.query_id())
+                     << " reason: " << not_admitted_reason;
           break;
         }
         VLOG_RPC << "Dequeuing query=" << PrintId(schedule.query_id());

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index 3341b1b..406e09e 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -173,9 +173,6 @@ class ExecEnv;
 /// above. Note the pool's max_mem_resources (#1) is not contented.
 /// TODO: Improve the dequeuing policy. IMPALA-2968.
 ///
-/// TODO: Assumes all impalads have the same proc mem limit. Should send proc 
mem limit
-///       via statestore (e.g. ideally in TBackendDescriptor) and check 
per-node
-///       reservations against this value.
 /// TODO: Remove less important debug logging after more cluster testing. 
Should have a
 ///       better idea of what is perhaps unnecessary.
 class AdmissionController {

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h 
b/be/src/scheduling/query-schedule.h
index fa200c6..e5d8f6a 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -65,6 +65,10 @@ struct BackendExecParams {
   // operators in all fragment instances that execute on this backend. This is 
used for
   // an optimization in InitialReservation. Measured in bytes.
   int64_t initial_mem_reservation_total_claims;
+
+  // The process memory limit of this backend. Obtained from the scheduler's 
executors
+  // configuration which is updated by membership updates from the statestore.
+  int64_t proc_mem_limit;
 };
 
 /// map from an impalad host address to the list of assigned fragment instance 
params.

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e25b777..20c3210 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -692,7 +692,7 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   ExecutorsConfigPtr config_ptr = GetExecutorsConfig();
   RETURN_IF_ERROR(ComputeScanRangeAssignment(*config_ptr, schedule));
   ComputeFragmentExecParams(*config_ptr, schedule);
-  ComputeBackendExecParams(schedule);
+  ComputeBackendExecParams(*config_ptr, schedule);
 #ifndef NDEBUG
   schedule->Validate();
 #endif
@@ -709,7 +709,8 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
   return Status::OK();
 }
 
-void Scheduler::ComputeBackendExecParams(QuerySchedule* schedule) {
+void Scheduler::ComputeBackendExecParams(
+    const BackendConfig& executor_config, QuerySchedule* schedule) {
   PerBackendExecParams per_backend_params;
   for (const FragmentExecParams& f : schedule->fragment_exec_params()) {
     for (const FInstanceExecParams& i : f.instance_exec_params) {
@@ -726,6 +727,12 @@ void Scheduler::ComputeBackendExecParams(QuerySchedule* 
schedule) {
           f.fragment.initial_mem_reservation_total_claims;
     }
   }
+
+  for (auto& backend: per_backend_params) {
+    const TNetworkAddress& host = backend.first;
+    backend.second.proc_mem_limit =
+        LookUpBackendDesc(executor_config, host).proc_mem_limit;
+  }
   schedule->set_per_backend_exec_params(per_backend_params);
 
   stringstream min_mem_reservation_ss;

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index b87b239..d0302e6 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -411,7 +411,8 @@ class Scheduler {
 
   /// Computes BackendExecParams for all backends assigned in the query. Must 
be called
   /// after ComputeFragmentExecParams().
-  void ComputeBackendExecParams(QuerySchedule* schedule);
+  void ComputeBackendExecParams(
+      const BackendConfig& executor_config, QuerySchedule* schedule);
 
   /// Compute the FragmentExecParams for all plans in the schedule's
   /// TQueryExecRequest.plan_exec_info.

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 36224d4..3625a7d 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -53,6 +53,7 @@
 #include "runtime/data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
@@ -1654,6 +1655,8 @@ void ImpalaServer::AddLocalBackendToStatestore(
   local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
   local_backend_descriptor.__set_address(exec_env_->backend_address());
   local_backend_descriptor.ip_address = exec_env_->ip_address();
+  local_backend_descriptor.__set_proc_mem_limit(
+      exec_env_->process_mem_tracker()->limit());
   if (FLAGS_use_krpc) {
     const TNetworkAddress& krpc_address = exec_env_->krpc_address();
     DCHECK(IsResolvedAddress(krpc_address));

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index cd926da..1242ab9 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -84,6 +84,10 @@ parser.add_option("--kudu_master_hosts", 
default=KUDU_MASTER_HOSTS,
 # replica initialization. The ith delay is applied to the ith impalad.
 parser.add_option("--catalog_init_delays", dest="catalog_init_delays", 
default="",
                   help=SUPPRESS_HELP)
+# For testing: Semi-colon separated list of startup arguments to be passed per 
impalad.
+# The ith group of options is applied to the ith impalad.
+parser.add_option("--per_impalad_args", dest="per_impalad_args", type="string"
+                  ,default="", help=SUPPRESS_HELP)
 
 options, args = parser.parse_args()
 
@@ -236,6 +240,10 @@ def start_impalad_instances(cluster_size, 
num_coordinators, use_exclusive_coordi
   if options.catalog_init_delays != "":
     delay_list = [delay.strip() for delay in 
options.catalog_init_delays.split(",")]
 
+  per_impalad_args = []
+  if options.per_impalad_args != "":
+    per_impalad_args = [args.strip() for args in 
options.per_impalad_args.split(";")]
+
   # Start each impalad instance and optionally redirect the output to a log 
file.
   for i in range(cluster_size):
     if i == 0:
@@ -273,6 +281,10 @@ def start_impalad_instances(cluster_size, 
num_coordinators, use_exclusive_coordi
     if options.disable_krpc:
       args = "-use_krpc=false %s" % (args)
 
+    # Appended at the end so they can override previous args.
+    if i < len(per_impalad_args):
+      args = "%s %s" % (args, per_impalad_args[i])
+
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % 
service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/common/thrift/StatestoreService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/StatestoreService.thrift 
b/common/thrift/StatestoreService.thrift
index 3702932..4f2dada 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -71,6 +71,9 @@ struct TBackendDescriptor {
 
   // IP address + port of KRPC based ImpalaInternalService on this backend
   7: optional Types.TNetworkAddress krpc_address;
+
+  // The process memory limit of this backend (in bytes).
+  8: required i64 proc_mem_limit;
 }
 
 // Description of a single entry in a topic

http://git-wip-us.apache.org/repos/asf/impala/blob/466188b3/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index a8070a6..9aa74a5 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -24,6 +24,7 @@ import pytest
 import re
 import sys
 import threading
+from copy import copy
 from time import sleep, time
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
@@ -426,6 +427,61 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
     exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
     self.execute_query_expect_success(self.client, query, exec_options)
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
+      pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
+      queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),
+      
start_args="--per_impalad_args=-mem_limit=3G;-mem_limit=3G;-mem_limit=2G",
+      statestored_args=_STATESTORED_ARGS)
+  def test_heterogeneous_proc_mem_limit(self, vector):
+    """ Test to ensure that the admission controller takes into account the 
actual proc
+    mem limits of each impalad. Starts a cluster where the last impalad has a 
smaller
+    proc mem limit than other impalads and runs queries where 
admission/rejection decision
+    depends on the coordinator knowing the other impalad's mem limits.
+    The queue_wait_timeout_ms has been set to be more than the prioritized 
statestore
+    update time, so that the queries don't time out before receiving updates 
to pool
+    stats"""
+    # Choose a query that runs on all 3 backends.
+    query = "select * from functional.alltypesagg, (select 1) B limit 1"
+    # Successfully run a query with mem limit equal to the lowest process 
memory among
+    # impalads
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "2G"
+    self.execute_query_expect_success(self.client, query, exec_options)
+    # Test that a query scheduled to run on a single node and submitted to the 
impalad
+    # with higher proc mem limit succeeds.
+    exec_options = copy(vector.get_value('exec_option'))
+    exec_options['mem_limit'] = "3G"
+    exec_options['num_nodes'] = "1"
+    self.execute_query_expect_success(self.client, query, exec_options)
+    # Exercise rejection checks in admission controller.
+    try:
+      exec_options = copy(vector.get_value('exec_option'))
+      exec_options['mem_limit'] = "3G"
+      self.execute_query(query, exec_options)
+    except ImpalaBeeswaxException as e:
+      assert re.search("Rejected query from pool \S+: request memory needed 
3.00 GB per "
+          "node is greater than process mem limit 2.00 GB of \S+", str(e)), 
str(e)
+    # Exercise queuing checks in admission controller.
+    try:
+      impalad_with_2g_mem = 
self.cluster.impalads[2].service.create_beeswax_client()
+      impalad_with_2g_mem.set_configuration_option('mem_limit', '1G')
+      impalad_with_2g_mem.execute_async("select sleep(1000)")
+      # Wait for statestore update to update the mem admitted in each node.
+      sleep(STATESTORE_RPC_FREQUENCY_MS/1000)
+      exec_options = copy(vector.get_value('exec_option'))
+      exec_options['mem_limit'] = "2G"
+      # Since Queuing is synchronous and we can't close the previous query 
till this
+      # returns, we wait for this to timeout instead.
+      self.execute_query(query, exec_options)
+    except ImpalaBeeswaxException as e:
+      assert re.search("Queued reason: Not enough memory available on host 
\S+.Needed "
+          "2.00 GB but only 1.00 GB out of 2.00 GB was available.", str(e)), 
str(e)
+    finally:
+      if impalad_with_2g_mem is not None:
+        impalad_with_2g_mem.close()
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between 
submissions
   (parameterized) and the ability to submit to one impalad or many in a 
round-robin

Reply via email to