IMPALA-5644,IMPALA-5810: Min reservation improvements

Rejects queries during admission control if:
* the largest (across all backends) min buffer reservation is
  greater than the query mem_limit or buffer_pool_limit
* the sum of the min buffer reservations across the cluster
  is larger than the pool max mem resources

There are some other interesting cases to consider later:
* every per-backend min buffer reservation is less than the
  associated backend's process mem_limit; the current
  admission control code doesn't know about other backend's
  proc mem_limits.

Also reduces minimum non-reservation memory (IMPALA-5810).
See the JIRA for experimental results that show this
slightly improves min memory requirements for small queries.
One reason to tweak this is to compensate for the fact that
BufferedBlockMgr didn't count small buffers against the
BlockMgr limit, but BufferPool counts all buffers against
it.

Testing:
* Adds new test cases in test_admission_controller.py
* Adds BE tests in reservation-tracker-test for the
  reservation-util code.

Change-Id: Iabe87ce8f460356cfe4d1be4d7092c5900f9d79b
Reviewed-on: http://gerrit.cloudera.org:8080/7678
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7264c547
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7264c547
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7264c547

Branch: refs/heads/master
Commit: 7264c547516569b6c1152bcb61724ee8c2ee79d0
Parents: 5ec4046
Author: Matthew Jacobs <[email protected]>
Authored: Tue Aug 15 10:06:25 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Aug 22 08:27:12 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/bufferpool/CMakeLists.txt        |   1 +
 .../bufferpool/reservation-tracker-test.cc      |  23 +++
 be/src/runtime/bufferpool/reservation-util.cc   |  41 ++++++
 be/src/runtime/bufferpool/reservation-util.h    |  74 ++++++++++
 be/src/runtime/query-state.cc                   |  19 +--
 be/src/scheduling/admission-controller.cc       | 141 ++++++++++++++-----
 be/src/scheduling/admission-controller.h        |   8 +-
 be/src/scheduling/scheduler.h                   |   3 +-
 common/thrift/generate_error_codes.py           |   5 +
 .../queries/QueryTest/analytic-fns.test         |  14 +-
 .../queries/QueryTest/spilling-aggs.test        |   2 +-
 .../custom_cluster/test_admission_controller.py |  32 ++++-
 tests/query_test/test_mem_usage_scaling.py      |  29 +---
 13 files changed, 293 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt 
b/be/src/runtime/bufferpool/CMakeLists.txt
index 231230b..4b1778b 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(BufferPool
   buffer-allocator.cc
   buffer-pool.cc
   reservation-tracker.cc
+  reservation-util.cc
   suballocator.cc
   system-allocator.cc
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 5d22eb9..0d57488 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/bufferpool/reservation-util.h"
 #include "common/init.h"
 #include "common/object-pool.h"
 #include "runtime/mem-tracker.h"
@@ -476,6 +477,28 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
   grandparent->Close();
   grandparent_mem_tracker->Close();
 }
+
+TEST_F(ReservationTrackerTest, ReservationUtil) {
+  const int64_t MEG = 1024 * 1024;
+  const int64_t GIG = 1024 * 1024 * 1024;
+  EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
+  EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * 
GIG));
+
+  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+  EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * 
MEG));
+  EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
+
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(
+      ReservationUtil::GetMinMemLimitFromReservation(0)));
+  EXPECT_EQ(4 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(
+      ReservationUtil::GetMinMemLimitFromReservation(4 * GIG)));
+}
+
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc 
b/be/src/runtime/bufferpool/reservation-util.cc
new file mode 100644
index 0000000..85718ab
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/reservation-util.h"
+
+#include <algorithm>
+
+namespace impala {
+
+// Most operators that accumulate memory use reservations, so the majority of 
memory
+// should be allocated to buffer reservations, as a heuristic.
+const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 
1024;
+
+int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
+  int64_t max_reservation = std::min<int64_t>(
+      RESERVATION_MEM_FRACTION * mem_limit, mem_limit - 
RESERVATION_MEM_MIN_REMAINING);
+  return std::max<int64_t>(0, max_reservation);
+}
+
+int64_t ReservationUtil::GetMinMemLimitFromReservation(int64_t 
buffer_reservation) {
+  buffer_reservation = std::max<int64_t>(0, buffer_reservation);
+  return std::max<int64_t>(
+      buffer_reservation * (1.0 / ReservationUtil::RESERVATION_MEM_FRACTION),
+      buffer_reservation + ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/bufferpool/reservation-util.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.h 
b/be/src/runtime/bufferpool/reservation-util.h
new file mode 100644
index 0000000..cd54c65
--- /dev/null
+++ b/be/src/runtime/bufferpool/reservation-util.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERPOOL_RESERVATION_UTIL_H_
+#define IMPALA_RUNTIME_BUFFERPOOL_RESERVATION_UTIL_H_
+
+#include <stdint.h>
+
+namespace impala {
+
+/// Utility code related to buffer reservations.
+class ReservationUtil {
+ public:
+  /// There are currently two classes of memory: reserved memory (i.e. memory 
that is
+  /// reserved with reservation trackers/allocated by the buffer pool), and 
unreserved
+  /// memory (i.e. everything else; code that hasn't yet been updated to use 
reserved
+  /// memory). Eventually, all memory should be in the former category, but 
each operator
+  /// must be converted to use reserved memory and that work is ongoing. See 
IMPALA-4834.
+  /// In the meantime, the system memory must be shared between these two 
classes of
+  /// memory. RESERVATION_MEM_FRACTION and RESERVATION_MEM_MIN_REMAINING are 
used to
+  /// determine an upper bound on reserved memory for a query. Operators 
operate reliably
+  /// when they are using bounded reserved memory (e.g. staying under a limit 
by
+  /// spilling), but will generally fail if they hit a limit when trying to 
allocate
+  /// unreserved memory. Thus we need to ensure there is always space left in 
the query
+  /// memory limit for unreserved memory.
+
+  /// The fraction of the query mem limit that is used as the maximum buffer 
reservation
+  /// limit, i.e. the bound on reserved memory. It is expected that unreserved 
memory
+  /// (i.e. not accounted by buffer reservation trackers) stays within
+  /// (1 - RESERVATION_MEM_FRACTION).
+  /// TODO: remove once all operators use buffer reservations.
+  static const double RESERVATION_MEM_FRACTION;
+
+  /// The minimum amount of memory that should be left after buffer 
reservations, i.e.
+  /// this is the minimum amount of memory that should be left for unreserved 
memory.
+  /// TODO: remove once all operators use buffer reservations.
+  static const int64_t RESERVATION_MEM_MIN_REMAINING;
+
+  /// Helper function to get the query buffer reservation limit (in bytes) 
given a query
+  /// mem_limit. In other words, this determines the maximum portion of the 
mem_limit
+  /// that should go to reserved memory. The limit on reservations is computed 
as:
+  /// min(query_limit * RESERVATION_MEM_FRACTION,
+  ///     query_limit - RESERVATION_MEM_MIN_REMAINING)
+  /// TODO: remove once all operators use buffer reservations.
+  static int64_t GetReservationLimitFromMemLimit(int64_t mem_limit);
+
+  /// Helper function to get the minimum query mem_limit (in bytes) that will 
be large
+  /// enough for a buffer reservation of size 'buffer_reservation' bytes. In 
other words,
+  /// this determines the minimum mem_limit that will be large enough to 
accomidate
+  /// 'buffer_reservation' reserved memory, as well as some amount of 
unreserved memory
+  /// (determined by a heuristic).
+  /// The returned mem_limit X satisfies:
+  ///    buffer_reservation <= GetReservationLimitFromMemLimit(X)
+  /// TODO: remove once all operators use buffer reservations.
+  static int64_t GetMinMemLimitFromReservation(int64_t buffer_reservation);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 1c9b03b..5ac4998 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -24,6 +24,7 @@
 #include "runtime/backend-client.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/initial-reservations.h"
@@ -38,20 +39,6 @@
 
 using namespace impala;
 
-// The fraction of the query mem limit that is used for buffer reservations. 
Most
-// operators that accumulate memory use reservations, so the majority of 
memory should
-// be allocated to buffer reservations, as a heuristic.
-// TODO: this will go away once all operators use buffer reservations.
-static const double RESERVATION_MEM_FRACTION = 0.8;
-
-// The minimum amount of memory that should be left after buffer reservations.
-// The limit on reservations is computed as:
-// min(query_limit * RESERVATION_MEM_FRACTION,
-//     query_limit - RESERVATION_MEM_MIN_REMAINING)
-// TODO: this will go away once all operators use buffer reservations and we 
have accurate
-// minimum requirements.
-static const int64_t RESERVATION_MEM_MIN_REMAINING = 100 * 1024 * 1024;
-
 QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
   DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
   query_state_ = 
ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
@@ -172,9 +159,7 @@ Status QueryState::InitBufferPoolState() {
     max_reservation = numeric_limits<int64_t>::max();
   } else {
     DCHECK_GE(mem_limit, 0);
-    max_reservation = min<int64_t>(
-        mem_limit * RESERVATION_MEM_FRACTION, mem_limit - 
RESERVATION_MEM_MIN_REMAINING);
-    max_reservation = max<int64_t>(0, max_reservation);
+    max_reservation = 
ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
   }
   VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << 
max_reservation;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc 
b/be/src/scheduling/admission-controller.cc
index 07f473f..339935d 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -23,6 +23,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
+#include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "scheduling/scheduler.h"
@@ -111,9 +112,21 @@ const string PROFILE_INFO_VAL_TIME_OUT = "Timed out 
(queued)";
 const string PROFILE_INFO_KEY_QUEUE_DETAIL = "Admission queue details";
 const string PROFILE_INFO_VAL_QUEUE_DETAIL = "waited $0 ms, reason: $1";
 
-// Error status string formats
-// $0 = pool, $1 = rejection reason (see REASON_XXX below)
-const string STATUS_REJECTED = "Rejected query from pool $0 : $1";
+// Error status string details
+const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
+    "minimum memory reservation is greater than memory available to the query "
+    "for buffer reservations. Mem available for buffer reservations based on 
mem_limit: "
+    "$0, memory reservation needed: $1. Set mem_limit to at least $2. See the 
query "
+    "profile for more information.";
+const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
+    "minimum memory reservation is greater than memory available to the query "
+    "for buffer reservations. Mem available for buffer reservations based on "
+    "buffer_pool_limit: $0, memory reservation needed: $1. See the query 
profile for "
+    "more information.";
+const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
+    "minimum memory reservation needed is greater than pool max mem resources. 
pool "
+    "max mem resources: $0, cluster-wide memory reservation needed: $1. See 
the query "
+    "profile for more information.";
 const string REASON_DISABLED_MAX_MEM_RESOURCES =
     "disabled by pool max mem resources set to 0";
 const string REASON_DISABLED_REQUESTS_LIMIT = "disabled by requests limit set 
to 0";
@@ -133,9 +146,6 @@ const string QUEUED_NUM_RUNNING = "number of running 
queries $0 is over limit $1
 // $0 = queue size
 const string QUEUED_QUEUE_NOT_EMPTY = "queue is not empty (size $0); queued 
queries are "
     "executed first";
-// $0 = timeout in milliseconds, $1 = queue detail
-const string STATUS_TIME_OUT = "Admission for query exceeded timeout $0ms. 
Queued "
-    "reason: $1";
 // $0 = pool name, $1 = pool max memory, $2 = pool mem needed, $3 = pool mem 
available
 const string POOL_MEM_NOT_AVAILABLE = "Not enough aggregate memory available 
in pool $0 "
     "with max mem resources $1. Needed $2 but only $3 was available.";
@@ -383,35 +393,90 @@ bool AdmissionController::CanAdmitRequest(const 
QuerySchedule& schedule,
   return true;
 }
 
-Status AdmissionController::RejectImmediately(QuerySchedule* schedule,
-    const TPoolConfig& pool_cfg) {
-  const string& pool_name = schedule->request_pool();
-  PoolStats* stats = GetPoolStats(pool_name);
-  const int64_t cluster_mem_needed = schedule->GetClusterMemoryEstimate();
-  string reject_reason;
+bool AdmissionController::RejectImmediately(QuerySchedule* schedule,
+    const TPoolConfig& pool_cfg, string* rejection_reason) {
+  DCHECK(rejection_reason != nullptr && rejection_reason->empty());
+  // This function checks for a number of cases where the query can be rejected
+  // immediately. The first check that fails is the error that is reported. 
The order of
+  // the checks isn't particularly important, though some thought was given to 
ordering
+  // them in a way that might make the sense for a user.
+
+  // Compute the max (over all backends) min_reservation_bytes and the cluster 
total
+  // (across all backends) min_reservation_bytes.
+  int64_t max_min_reservation_bytes = -1;
+  int64_t cluster_min_reservation_bytes = 0;
+  for (const auto& e: schedule->per_backend_exec_params()) {
+    cluster_min_reservation_bytes += e.second.min_reservation_bytes;
+    if (e.second.min_reservation_bytes > max_min_reservation_bytes) {
+      max_min_reservation_bytes = e.second.min_reservation_bytes;
+    }
+  }
+
+  // Checks related to the min buffer reservation against configured query 
memory limits:
+  if (schedule->query_options().__isset.buffer_pool_limit &&
+      schedule->query_options().buffer_pool_limit > 0) {
+    const int64_t buffer_pool_limit = 
schedule->query_options().buffer_pool_limit;
+    if (max_min_reservation_bytes > buffer_pool_limit) {
+      *rejection_reason = 
Substitute(REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION,
+          PrintBytes(buffer_pool_limit), 
PrintBytes(max_min_reservation_bytes));
+      return true;
+    }
+  } else if (schedule->query_options().__isset.mem_limit &&
+      schedule->query_options().mem_limit > 0) {
+    const int64_t mem_limit = schedule->query_options().mem_limit;
+    const int64_t max_reservation =
+        ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
+    if (max_min_reservation_bytes > max_reservation) {
+      const int64_t required_mem_limit =
+          
ReservationUtil::GetMinMemLimitFromReservation(max_min_reservation_bytes);
+      *rejection_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
+          PrintBytes(mem_limit), PrintBytes(max_min_reservation_bytes),
+          PrintBytes(required_mem_limit));
+      return true;
+    }
+  }
+
+  // Checks related to pool max_requests:
   if (pool_cfg.max_requests == 0) {
-    reject_reason = REASON_DISABLED_REQUESTS_LIMIT;
-  } else if (pool_cfg.max_mem_resources == 0) {
-    reject_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
-  } else if (pool_cfg.max_mem_resources > 0 &&
-      cluster_mem_needed > pool_cfg.max_mem_resources) {
-    reject_reason = Substitute(REASON_REQ_OVER_POOL_MEM, 
PrintBytes(cluster_mem_needed),
+    *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
+    return true;
+  }
+
+  // Checks related to pool max_mem_resources:
+  if (pool_cfg.max_mem_resources == 0) {
+    *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
+    return true;
+  }
+  if (pool_cfg.max_mem_resources > 0 &&
+      cluster_min_reservation_bytes > pool_cfg.max_mem_resources) {
+    *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
+        PrintBytes(pool_cfg.max_mem_resources),
+        PrintBytes(cluster_min_reservation_bytes));
+    return true;
+  }
+  if (pool_cfg.max_mem_resources > 0 &&
+      schedule->GetClusterMemoryEstimate() > pool_cfg.max_mem_resources) {
+    *rejection_reason = Substitute(REASON_REQ_OVER_POOL_MEM,
+        PrintBytes(schedule->GetClusterMemoryEstimate()),
         PrintBytes(pool_cfg.max_mem_resources));
-  } else if (pool_cfg.max_mem_resources > 0 &&
+    return true;
+  }
+  if (pool_cfg.max_mem_resources > 0 &&
       schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) {
-    reject_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+    *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
         PrintBytes(schedule->GetPerHostMemoryEstimate()), 
PrintBytes(GetProcMemLimit()));
-  } else if (stats->agg_num_queued() >= pool_cfg.max_queued) {
-    reject_reason = Substitute(REASON_QUEUE_FULL, pool_cfg.max_queued,
+    return true;
+  }
+
+  // Checks related to the pool queue size:
+  PoolStats* stats = GetPoolStats(schedule->request_pool());
+  if (stats->agg_num_queued() >= pool_cfg.max_queued) {
+    *rejection_reason = Substitute(REASON_QUEUE_FULL, pool_cfg.max_queued,
         stats->agg_num_queued());
-  } else {
-    return Status::OK(); // Not rejected
+    return true;
   }
-  schedule->set_is_admitted(false);
-  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-      PROFILE_INFO_VAL_REJECTED);
-  stats->metrics()->total_rejected->Increment(1);
-  return Status(Substitute(STATUS_REJECTED, pool_name, reject_reason));
+
+  return false;
 }
 
 void AdmissionController::PoolStats::UpdateConfigMetrics(const TPoolConfig& 
pool_cfg) {
@@ -446,7 +511,17 @@ Status AdmissionController::AdmitQuery(QuerySchedule* 
schedule) {
                << " PoolConfig: max_requests=" << max_requests << " 
max_queued="
                << max_queued << " max_mem=" << PrintBytes(max_mem);
     VLOG_QUERY << "Stats: " << stats->DebugString();
-    RETURN_IF_ERROR(RejectImmediately(schedule, pool_cfg));
+    string rejection_reason;
+    if (RejectImmediately(schedule, pool_cfg, &rejection_reason)) {
+      schedule->set_is_admitted(false);
+      
schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
+          PROFILE_INFO_VAL_REJECTED);
+      stats->metrics()->total_rejected->Increment(1);
+      const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
+          pool_name, rejection_reason);
+      VLOG_QUERY << rejected_msg.msg();
+      return Status::Expected(rejected_msg);
+    }
     pools_for_updates_.insert(pool_name);
 
     if (CanAdmitRequest(*schedule, pool_cfg, false, &not_admitted_reason)) {
@@ -513,8 +588,10 @@ Status AdmissionController::AdmitQuery(QuerySchedule* 
schedule) {
       
schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
           PROFILE_INFO_VAL_TIME_OUT);
       stats->Dequeue(*schedule, true);
-      return Status(Substitute(STATUS_TIME_OUT, queue_wait_timeout_ms,
-          not_admitted_reason));
+      const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT,
+          queue_wait_timeout_ms, pool_name, not_admitted_reason);
+      VLOG_QUERY << rejected_msg.msg();
+      return Status::Expected(rejected_msg);
     }
     // The dequeue thread updates the stats (to avoid a race condition) so we 
do
     // not change them here.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h 
b/be/src/scheduling/admission-controller.h
index d3ec49d..3e49cfb 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -460,10 +460,12 @@ class AdmissionController {
   /// admission_ctrl_lock_.
   void UpdateHostMemAdmitted(const QuerySchedule& schedule, int64_t 
per_node_mem);
 
-  /// Returns an error status if this request must be rejected immediately, 
e.g. requires
-  /// more memory than possible to reserve or the queue is already full.
+  /// Returns true if this request must be rejected immediately, e.g. requires 
more
+  /// memory than possible to reserve or the queue is already full. If true,
+  /// rejection_reason is set to a explanation of why the request was rejected.
   /// Must hold admission_ctrl_lock_.
-  Status RejectImmediately(QuerySchedule* schedule, const TPoolConfig& 
pool_cfg);
+  bool RejectImmediately(QuerySchedule* schedule, const TPoolConfig& pool_cfg,
+      std::string* rejection_reason);
 
   /// Gets or creates the PoolStats for pool_name. Must hold 
admission_ctrl_lock_.
   PoolStats* GetPoolStats(const std::string& pool_name);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 3126436..07e0cfc 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -89,8 +89,7 @@ class Scheduler {
   impala::Status Init();
 
   /// Populates given query schedule and assigns fragments to hosts based on 
scan
-  /// ranges in the query exec request. Submits schedule to admission control 
before
-  /// returning.
+  /// ranges in the query exec request.
   Status Schedule(QuerySchedule* schedule);
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index ad0e342..5a72d80 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -328,6 +328,11 @@ error_codes = (
   ("MINIMUM_RESERVATION_UNAVAILABLE", 106, "Failed to get minimum memory 
reservation of "
      "$0 on daemon $1:$2 for query $3 because it would exceed an applicable 
query, "
      "request pool or process memory limit. Memory usage:\\n$4"),
+
+  ("ADMISSION_REJECTED", 107, "Rejected query from pool $0: $1"),
+
+  ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in 
pool $1. "
+     "Queued reason: $2"),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test 
b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 27459ef..f8a024a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1583,18 +1583,8 @@ from functional.alltypestiny order by id
 int, bigint, bigint, double
 ====
 ---- QUERY
-# Regression test for IMPALA-2265, IMPALA-2559. The buffer_pool_limit is tuned 
to
-# reproduce the issue when running this query against functional_parquet.
-SET default_spillable_buffer_size=8m;
-SET buffer_pool_limit=16m;
-SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col
-FROM functional_parquet.alltypes t1 CROSS JOIN functional_parquet.alltypes t2 
LIMIT 10;
----- CATCH
-Failed to get minimum memory reservation
-====
----- QUERY
-# Check that the above query can succeed with the minimum buffers (3 buffers 
for sort,
-# 2 buffer for analytic).
+# Check that the a large analytic query can succeed with the minimum buffers 
(3 buffers
+# for sort, 2 buffer for analytic).
 SET default_spillable_buffer_size=8m;
 SET buffer_pool_limit=40m;
 SELECT lag(-180, 13) over (ORDER BY t1.int_col ASC, t2.int_col ASC) AS int_col

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test 
b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
index d9f60cc..34628c0 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
@@ -183,7 +183,7 @@ row_regex: .*SpilledPartitions: .* \([1-9][0-9]*\)
 # IMPALA-3304: test that avg() can spill with a query mem limit.
 # This test only covers that use FIXED_UDA_INTERMEDIATE, not functions that 
allocate
 # strings for intermediate values.
-set mem_limit=200m;
+set mem_limit=150m;
 select l_orderkey, avg(l_tax), avg(l_quantity), avg(l_discount), 
avg(l_extendedprice)
 from tpch_parquet.lineitem
 group by 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py 
b/tests/custom_cluster/test_admission_controller.py
index 250298b..ff4f4c6 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -313,7 +313,7 @@ class TestAdmissionController(TestAdmissionControllerBase, 
HS2TestSuite):
         "select * from functional.alltypestiny"]
     for query in non_trivial_queries:
       ex = self.execute_query_expect_failure(self.client, query)
-      assert re.search("Rejected query from pool default-pool : request memory 
needed "
+      assert re.search("Rejected query from pool default-pool: request memory 
needed "
           ".* is greater than pool max mem resources 10.00 MB", str(ex))
 
   @pytest.mark.execute_serially
@@ -321,15 +321,35 @@ class 
TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
           pool_max_mem=10 * 1024 * 1024, proc_mem_limit=1024 * 1024 * 1024),
       statestored_args=_STATESTORED_ARGS)
-  def test_initial_reservation(self):
-    """Test behaviour with admission control enabled if the initial 
reservation cannot be
-    acquired. The query options are set so that the query will be admitted, 
but acquiring
-    the initial reservation will fail because it is larger than mem_limit.
+  def test_reject_min_reservation(self):
+    """Test that the query will be rejected by admission control if:
+       a) the largest per-backend min buffer reservation is larger than the 
query mem
+          limit
+       b) the largest per-backend min buffer reservation is larger than the
+          buffer_pool_limit query option
+       c) the cluster-wide min-buffer reservation size is larger than the pool 
memory
+          resources.
     """
     query = "select distinct * from functional_parquet.alltypesagg"
     opts = {'mem_limit': '10MB', 'num_nodes': '1'}
     ex = self.execute_query_expect_failure(self.client, query, opts)
-    assert "Failed to get minimum memory reservation" in str(ex)
+    assert ("minimum memory reservation is greater than memory available to 
the query "
+        "for buffer reservations. Mem available for buffer reservations based 
on "
+        "mem_limit: 10.00 MB, memory reservation needed: 34.00 MB. "
+        "Set mem_limit to at least 109.00 MB.") in str(ex)
+
+    query = "select distinct * from functional_parquet.alltypesagg"
+    opts = {'buffer_pool_limit': '10MB', 'num_nodes': '1'}
+    ex = self.execute_query_expect_failure(self.client, query, opts)
+    assert ("minimum memory reservation is greater than memory available to 
the query "
+        "for buffer reservations. Mem available for buffer reservations based 
on "
+        "buffer_pool_limit: 10.00 MB, memory reservation needed: 34.00 MB.") 
in str(ex)
+
+    opts = {'mem_limit': '150MB', 'num_nodes': '1'}
+    ex = self.execute_query_expect_failure(self.client, query, opts)
+    assert ("minimum memory reservation needed is greater than pool max mem 
resources. "
+        "pool max mem resources: 10.00 MB, cluster-wide memory reservation 
needed: "
+        "34.00 MB") in str(ex)
 
   # Process mem_limit used in test_mem_limit_upper_bound
   PROC_MEM_TEST_LIMIT = 1024 * 1024 * 1024

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7264c547/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py 
b/tests/query_test/test_mem_usage_scaling.py
index eac95e6..efc500b 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -26,8 +26,9 @@ from tests.common.test_vector import ImpalaTestDimension
 
 # Substrings of the expected error messages when the mem limit is too low
 MEM_LIMIT_EXCEEDED_MSG = "Memory limit exceeded"
-INITIAL_RESERVATION_MSG = "Failed to get minimum memory reservation"
-MEM_LIMIT_ERROR_MSGS = [MEM_LIMIT_EXCEEDED_MSG, INITIAL_RESERVATION_MSG]
+MEM_LIMIT_TOO_LOW_FOR_RESERVATION = ("minimum memory reservation is greater 
than memory "
+  "available to the query for buffer reservations")
+MEM_LIMIT_ERROR_MSGS = [MEM_LIMIT_EXCEEDED_MSG, 
MEM_LIMIT_TOO_LOW_FOR_RESERVATION]
 
 class TestQueryMemLimitScaling(ImpalaTestSuite):
   """Test class to do functional validation of per query memory limits. """
@@ -90,30 +91,6 @@ class TestExprMemUsage(ImpalaTestSuite):
       table_format=vector.get_value('table_format'))
 
 
-class TestInitialReservation(ImpalaTestSuite):
-  @classmethod
-  def get_workload(self):
-    # Note: this workload doesn't run exhaustively. See IMPALA-3947 before 
trying to move
-    # this test to exhaustive.
-    return 'tpch'
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestInitialReservation, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format in ['parquet'])
-
-  def test_initial_reservation(self, vector):
-    """Test failure to get the initial reservation."""
-    exec_options = copy(vector.get_value('exec_option'))
-    exec_options['mem_limit'] = '20m'
-    query = """select * from tpch_parquet.lineitem l1
-               join tpch_parquet.lineitem l2 on l1.l_orderkey = 
l2.l_orderkey"""
-    result = self.execute_query_expect_failure(self.client, query, 
exec_options)
-    assert (INITIAL_RESERVATION_MSG in str(result))
-
-
 class TestLowMemoryLimits(ImpalaTestSuite):
   '''Super class for the memory limit tests with the TPC-H and TPC-DS 
queries'''
 

Reply via email to