http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index 80c5558..cafab72 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -20,7 +20,7 @@
 
 #include <deque>
 
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "util/tuple-row-compare.h"
 
 namespace impala {
@@ -31,8 +31,7 @@ class RowBatch;
 
 /// Sorter contains the external sort implementation. Its purpose is to sort 
arbitrarily
 /// large input data sets with a fixed memory budget by spilling data to disk 
if
-/// necessary. BufferedBlockMgr is used to allocate and manage blocks of data 
to be
-/// sorted.
+/// necessary.
 //
 /// The client API for Sorter is as follows:
 /// AddBatch() is used to add input rows to be sorted. Multiple tuples in an 
input row are
@@ -52,20 +51,20 @@ class RowBatch;
 /// GetNext() is used to retrieve sorted rows. It can be called multiple times.
 /// AddBatch()/AddBatchNoSpill(), InputDone() and GetNext() must be called in 
that order.
 //
-/// Batches of input rows are collected into a sequence of pinned 
BufferedBlockMgr blocks
-/// called a run. The maximum size of a run is determined by the number of 
blocks that
+/// Batches of input rows are collected into a sequence of pinned BufferPool 
pages
+/// called a run. The maximum size of a run is determined by the number of 
pages that
 /// can be pinned by the Sorter. After the run is full, it is sorted in 
memory, unpinned
 /// and the next run is constructed. The variable-length column data (e.g. 
string slots)
-/// in the materialized sort tuples are stored in a separate sequence of 
blocks from the
-/// tuples themselves.  When the blocks containing tuples in a run are 
unpinned, the
+/// in the materialized sort tuples are stored in a separate sequence of pages 
from the
+/// tuples themselves.  When the pages containing tuples in a run are 
unpinned, the
 /// var-len slot pointers are converted to offsets from the start of the first 
var-len
-/// data block. When a block is read back, these offsets are converted back to 
pointers.
+/// data page. When a page is read back, these offsets are converted back to 
pointers.
 /// The in-memory sorter sorts the fixed-length tuples in-place. The output 
rows have the
 /// same schema as the materialized sort tuples.
 //
 /// After the input is consumed, the sorter is left with one or more sorted 
runs. If
 /// there are multiple runs, the runs are merged using SortedRunMerger. At 
least one
-/// block per run (two if there are var-length slots) must be pinned in memory 
during
+/// page per run (two if there are var-length slots) must be pinned in memory 
during
 /// a merge, so multiple merges may be necessary if the number of runs is too 
large.
 /// First a series of intermediate merges are performed, until the number of 
runs is
 /// small enough to do a single final merge that returns batches of sorted 
rows to the
@@ -73,7 +72,7 @@ class RowBatch;
 ///
 /// If there is a single sorted run (i.e. no merge required), only tuple rows 
are
 /// copied into the output batch supplied by GetNext(), and the data itself is 
left in
-/// pinned blocks held by the sorter.
+/// pinned pages held by the sorter.
 ///
 /// When merges are performed, one input batch is created to hold tuple rows 
for each
 /// input run, and one batch is created to hold deep copied rows (i.e. ptrs + 
data) from
@@ -84,7 +83,7 @@ class RowBatch;
 /// During a merge, one row batch is created for each input run, and one batch 
is created
 /// for the output of the merge (if is not the final merge). It is assumed 
that the memory
 /// for these batches have already been accounted for in the memory budget for 
the sort.
-/// That is, the memory for these batches does not come out of the block 
buffer manager.
+/// That is, the memory for these batches does not come out of the buffer pool.
 //
 /// TODO: Not necessary to actually copy var-len data - instead take ownership 
of the
 /// var-length data in the input batch. Copying can be deferred until a run is 
unpinned.
@@ -96,17 +95,23 @@ class Sorter {
   /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to 
be
   /// sorted. 'compare_less_than' is a comparator for the sort tuples (returns 
true if
   /// lhs < rhs). 'merge_batch_size_' is the size of the batches created to 
provide rows
-  /// to the merger and retrieve rows from an intermediate merger. 
'enable_spilling'
-  /// should be set to false to reduce the number of requested buffers if the 
caller will
-  /// use AddBatchNoSpill().
+  /// to the merger and retrieve rows from an intermediate merger. 'node_id' 
is the ID of
+  /// the exec node using the sorter for error reporting. 'enable_spilling' 
should be set
+  /// to false to reduce the number of requested buffers if the caller will use
+  /// AddBatchNoSpill().
+  ///
+  /// The Sorter assumes that it has exclusive use of the client's
+  /// reservations for sorting, and may increase the size of the client's 
reservation.
+  /// The caller is responsible for ensuring that the minimum reservation 
(returned from
+  /// ComputeMinReservation()) is available.
   Sorter(const TupleRowComparator& compare_less_than,
       const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* 
output_row_desc,
-      MemTracker* mem_tracker, RuntimeProfile* profile, RuntimeState* state,
-      bool enable_spilling = true);
-
+      MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t 
page_len,
+      RuntimeProfile* profile, RuntimeState* state, int node_id,
+      bool enable_spilling);
   ~Sorter();
 
-  /// Initial set-up of the sorter for execution. Registers with the block mgr.
+  /// Initial set-up of the sorter for execution.
   /// The evaluators for 'sort_tuple_exprs_' will be created and stored in 
'obj_pool'.
   /// All allocation from the evaluators will be from 'expr_mem_pool'.
   Status Prepare(ObjectPool* obj_pool, MemPool* expr_mem_pool) 
WARN_UNUSED_RESULT;
@@ -143,24 +148,29 @@ class Sorter {
   /// Close the Sorter and free resources.
   void Close(RuntimeState* state);
 
+  /// Compute the minimum amount of buffer memory in bytes required to execute 
a
+  /// sort with the current sorter.
+  int64_t ComputeMinReservation();
+
  private:
+  class Page;
   class Run;
   class TupleIterator;
   class TupleSorter;
 
   /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign 
it to
   /// 'merger_'. Attempts to set up merger with 'max_num_runs' runs but may 
set it
-  /// up with fewer if it cannot pin the initial blocks of all of the runs. 
Fails
+  /// up with fewer if it cannot pin the initial pages of all of the runs. 
Fails
   /// if it cannot merge at least two runs. The runs to be merged are removed 
from
   /// 'sorted_runs_'.  The Sorter sets the 'deep_copy_input' flag to true for 
the
-  /// merger, since the blocks containing input run data will be deleted as 
input
+  /// merger, since the pages containing input run data will be deleted as 
input
   /// runs are read.
   Status CreateMerger(int max_num_runs) WARN_UNUSED_RESULT;
 
   /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single 
larger
   /// merged run until there are few enough runs to be merged with a single 
merger.
   /// Returns when 'merger_' is set up to merge the final runs.
-  /// At least 1 (2 if var-len slots) block from each sorted run must be 
pinned for
+  /// At least 1 (2 if var-len slots) page from each sorted run must be pinned 
for
   /// a merge. If the number of sorted runs is too large, merge sets of 
smaller runs
   /// into large runs until a final merge can be performed. An intermediate 
row batch
   /// containing deep copied rows is used for the output of each intermediate 
merge.
@@ -177,6 +187,9 @@ class Sorter {
   /// Helper that cleans up all runs in the sorter.
   void CleanupAllRuns();
 
+  /// ID of the ExecNode that owns the sorter, used for error reporting.
+  const int node_id_;
+
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;
 
@@ -184,11 +197,11 @@ class Sorter {
   const TupleRowComparator& compare_less_than_;
   boost::scoped_ptr<TupleSorter> in_mem_tuple_sorter_;
 
-  /// Block manager object used to allocate, pin and release runs. Not owned 
by Sorter.
-  BufferedBlockMgr* block_mgr_;
+  /// Client used to allocate pages from the buffer pool. Not owned.
+  BufferPool::ClientHandle* const buffer_pool_client_;
 
-  /// Handle to block mgr to make allocations from.
-  BufferedBlockMgr::Client* block_mgr_client_;
+  /// The length of page to use.
+  const int64_t page_len_;
 
   /// True if the tuples to be sorted have var-length slots.
   bool has_var_len_slots_;
@@ -211,7 +224,7 @@ class Sorter {
   /// BEGIN: Members that must be Reset()
 
   /// The current unsorted run that is being collected. Is sorted and added to
-  /// sorted_runs_ after it is full (i.e. number of blocks allocated == max 
available
+  /// sorted_runs_ after it is full (i.e. number of pages allocated == max 
available
   /// buffers) or after the input is complete. Owned and placed in obj_pool_.
   /// When it is added to sorted_runs_, it is set to NULL.
   Run* unsorted_run_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 37b4363..23dfa4c 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -20,7 +20,6 @@
 #include <limits>
 #include <memory>
 
-#include "runtime/buffered-block-mgr.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/tmp-file-mgr.h"
 #include "runtime/query-state.h"
@@ -38,8 +37,8 @@ scoped_ptr<MetricGroup> TestEnv::static_metrics_;
 
 TestEnv::TestEnv()
   : have_tmp_file_mgr_args_(false),
-    buffer_pool_min_buffer_len_(-1),
-    buffer_pool_capacity_(-1) {}
+    buffer_pool_min_buffer_len_(64 * 1024),
+    buffer_pool_capacity_(0) {}
 
 Status TestEnv::Init() {
   if (static_metrics_ == NULL) {
@@ -59,9 +58,7 @@ Status TestEnv::Init() {
   } else {
     RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
   }
-  if (buffer_pool_min_buffer_len_ != -1 && buffer_pool_capacity_ != -1) {
-    exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, 
buffer_pool_capacity_);
-  }
+  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, 
buffer_pool_capacity_);
   return Status::OK();
 }
 
@@ -88,6 +85,7 @@ void TestEnv::TearDownQueries() {
   for (RuntimeState* runtime_state : runtime_states_) 
runtime_state->ReleaseResources();
   runtime_states_.clear();
   for (QueryState* query_state : query_states_) {
+    query_state->ReleaseInitialReservationRefcount();
     exec_env_->query_exec_mgr()->ReleaseQueryState(query_state);
   }
   query_states_.clear();
@@ -137,17 +135,4 @@ Status TestEnv::CreateQueryState(
   *runtime_state = rs;
   return Status::OK();
 }
-
-Status TestEnv::CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers,
-    int block_size, const TQueryOptions* query_options, RuntimeState** 
runtime_state) {
-  RETURN_IF_ERROR(CreateQueryState(query_id, query_options, runtime_state));
-
-  shared_ptr<BufferedBlockMgr> mgr;
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
-      (*runtime_state)->query_state()->query_mem_tracker(),
-      (*runtime_state)->runtime_profile(), tmp_file_mgr(),
-      CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
-  (*runtime_state)->set_block_mgr(mgr);
-  return Status::OK();
-}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index f314452..e721510 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -55,13 +55,8 @@ class TestEnv {
   Status CreateQueryState(
       int64_t query_id, const TQueryOptions* query_options, RuntimeState** 
runtime_state);
 
-  /// Same as CreateQueryState() but also creates a BufferedBlockMgr with the 
provided
-  /// parameters. If 'max_buffers' is -1, there is no limit, otherwise the 
limit is
-  /// max_buffers * block_size.
-  Status CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, int 
block_size,
-      const TQueryOptions* query_options, RuntimeState** runtime_state);
-  /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs,
-  /// etc, that were created since the last TearDownQueries() call.
+  /// Destroy all query states and associated RuntimeStates, etc, that were 
created since
+  /// the last TearDownQueries() call.
   void TearDownQueries();
 
   /// Calculate memory limit accounting for overflow and negative values.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc 
b/be/src/runtime/tmp-file-mgr-test.cc
index c94ba1d..343ec93 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -145,6 +145,12 @@ class TmpFileMgrTest : public ::testing::Test {
     return bytes_allocated;
   }
 
+  /// Helpers to call WriteHandle methods.
+  void Cancel(TmpFileMgr::WriteHandle* handle) { handle->Cancel(); }
+  void WaitForWrite(TmpFileMgr::WriteHandle* handle) {
+    handle->WaitForWrite();
+  }
+
   // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'.
   void SignalCallback(Status write_status) {
     {
@@ -481,8 +487,8 @@ TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) {
   string file_path = handle->TmpFilePath();
 
   // Cancel the write - prior to the IMPALA-4820 fix decryption could race 
with the write.
-  handle->Cancel();
-  handle->WaitForWrite();
+  Cancel(handle.get());
+  WaitForWrite(handle.get());
   ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range));
   WaitForCallbacks(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index c71c370..ba7210d 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -281,21 +281,9 @@ class TmpFileMgr {
       DCHECK(read_range_ == nullptr);
     }
 
-    /// Cancels any in-flight writes or reads. Reads are cancelled 
synchronously and
-    /// writes are cancelled asynchronously. After Cancel() is called, writes 
are not
-    /// retried. The write callback may be called with a CANCELLED status 
(unless
-    /// it succeeded or encountered a different error first).
-    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't 
need it.
-    void Cancel();
-
     /// Cancel any in-flight read synchronously.
     void CancelRead();
 
-    /// Blocks until the write completes either successfully or unsuccessfully.
-    /// May return before the write callback has been called.
-    /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't 
need it.
-    void WaitForWrite();
-
     /// Path of temporary file backing the block. Intended for use in testing.
     /// Returns empty string if no backing file allocated.
     std::string TmpFilePath() const;
@@ -307,6 +295,7 @@ class TmpFileMgr {
 
    private:
     friend class FileGroup;
+    friend class TmpFileMgrTest;
 
     WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback 
cb);
 
@@ -327,6 +316,16 @@ class TmpFileMgr {
     /// then calls 'cb_'.
     void WriteComplete(const Status& write_status);
 
+    /// Cancels any in-flight writes or reads. Reads are cancelled 
synchronously and
+    /// writes are cancelled asynchronously. After Cancel() is called, writes 
are not
+    /// retried. The write callback may be called with a CANCELLED status 
(unless
+    /// it succeeded or encountered a different error first).
+    void Cancel();
+
+    /// Blocks until the write completes either successfully or unsuccessfully.
+    /// May return before the write callback has been called.
+    void WaitForWrite();
+
     /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
     Status EncryptAndHash(MemRange buffer) WARN_UNUSED_RESULT;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index 6be04f6..bf0f9b4 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -399,9 +399,9 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     ss << query_exec_request.per_host_mem_estimate;
     summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
   }
-  if (query_exec_request.__isset.per_host_min_reservation) {
+  if (query_exec_request.query_ctx.__isset.per_host_min_reservation) {
     stringstream ss;
-    ss << query_exec_request.per_host_min_reservation;
+    ss << query_exec_request.query_ctx.per_host_min_reservation;
     summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
   }
   if (!query_exec_request.query_ctx.__isset.parent_query_id &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 8dcd7af..c123902 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -261,10 +261,10 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
       case TImpalaQueryOptions::QUERY_TIMEOUT_S:
         query_options->__set_query_timeout_s(atoi(value.c_str()));
         break;
-      case TImpalaQueryOptions::MAX_BLOCK_MGR_MEMORY: {
+      case TImpalaQueryOptions::BUFFER_POOL_LIMIT: {
         int64_t mem;
-        RETURN_IF_ERROR(ParseMemValue(value, "block mgr memory limit", &mem));
-        query_options->__set_max_block_mgr_memory(mem);
+        RETURN_IF_ERROR(ParseMemValue(value, "buffer pool limit", &mem));
+        query_options->__set_buffer_pool_limit(mem);
         break;
       }
       case TImpalaQueryOptions::APPX_COUNT_DISTINCT: {
@@ -505,6 +505,28 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         query_options->__set_disable_codegen_rows_threshold(val);
         break;
       }
+      case TImpalaQueryOptions::DEFAULT_SPILLABLE_BUFFER_SIZE: {
+        int64_t buffer_size_bytes;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes));
+        if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
+          return Status(
+              Substitute("Buffer size must be a power of two: $0", 
buffer_size_bytes));
+        }
+        query_options->__set_default_spillable_buffer_size(buffer_size_bytes);
+        break;
+      }
+      case TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE: {
+        int64_t buffer_size_bytes;
+        RETURN_IF_ERROR(
+            ParseMemValue(value, "Spillable buffer size", &buffer_size_bytes));
+        if (!BitUtil::IsPowerOf2(buffer_size_bytes)) {
+          return Status(
+              Substitute("Buffer size must be a power of two: $0", 
buffer_size_bytes));
+        }
+        query_options->__set_min_spillable_buffer_size(buffer_size_bytes);
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding 
entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 603c783..8d6af02 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DISABLE_CODEGEN_ROWS_THRESHOLD + 1);\
+      TImpalaQueryOptions::MIN_SPILLABLE_BUFFER_SIZE + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -62,7 +62,7 @@ class TQueryOptions;
   QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\
   QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\
   QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S)\
-  QUERY_OPT_FN(max_block_mgr_memory, MAX_BLOCK_MGR_MEMORY)\
+  QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT)\
   QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT)\
   QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS)\
   QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
@@ -93,6 +93,8 @@ class TQueryOptions;
   QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS)\
   QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE)\
   QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD)\
+  QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE)\
+  QUERY_OPT_FN(min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE)\
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 5ebd9b5..913b331 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -28,7 +28,7 @@
 #include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/macros.h"
-#include "runtime/buffered-block-mgr.h"
+#include "util/cpu-info.h"
 #include "util/hash-util.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/util/static-asserts.cc
----------------------------------------------------------------------
diff --git a/be/src/util/static-asserts.cc b/be/src/util/static-asserts.cc
index cf12e36..7662906 100644
--- a/be/src/util/static-asserts.cc
+++ b/be/src/util/static-asserts.cc
@@ -18,7 +18,6 @@
 #include <boost/static_assert.hpp>
 
 #include "common/hdfs.h"
-#include "runtime/buffered-tuple-stream.h"
 #include "runtime/string-value.h"
 #include "runtime/timestamp-value.h"
 #include "udf/udf.h"
@@ -37,7 +36,6 @@ class UnusedClass {
   BOOST_STATIC_ASSERT(sizeof(boost::gregorian::date) == 4);
   BOOST_STATIC_ASSERT(sizeof(hdfsFS) == sizeof(void*));
   BOOST_STATIC_ASSERT(sizeof(hdfsFile) == sizeof(void*));
-  BOOST_STATIC_ASSERT(sizeof(BufferedTupleStream::RowIdx) == sizeof(void*));
 
   // If the memory layout of any of these types changes, it will be necessary 
to change
   // LlvmCodeGen::GetUdfValType(), and we may also run into calling convention 
problems

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 79da0d6..3a88915 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -389,23 +389,11 @@ struct TQueryExecRequest {
   // Estimated per-host peak memory consumption in bytes. Used for resource 
management.
   8: optional i64 per_host_mem_estimate
 
-  // Minimum query-wide buffer reservation required per host in bytes. This is 
the peak
-  // minimum reservation that may be required by the concurrently-executing 
operators at
-  // any point in query execution. It may be less than the initial reservation 
total
-  // claims (below) if execution of some operators never overlaps, which 
allows reuse of
-  // reservations.
-  9: optional i64 per_host_min_reservation;
-
-  // Total of the initial buffer reservations that we expect to be claimed per 
host.
-  // I.e. the sum over all operators in all fragment instances that execute on 
that host.
-  // Measured in bytes.
-  10: optional i64 per_host_initial_reservation_total_claims;
-
   // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
-  11: required list<Types.TNetworkAddress> host_list
+  9: required list<Types.TNetworkAddress> host_list
 
   // Column lineage graph
-  12: optional LineageGraph.TLineageGraph lineage_graph
+  10: optional LineageGraph.TLineageGraph lineage_graph
 }
 
 enum TCatalogOpType {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 4aefe55..b477299 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -130,7 +130,7 @@ struct TQueryOptions {
   26: optional i32 query_timeout_s = 0
 
   // test hook to cap max memory for spilling operators (to force them to 
spill).
-  27: optional i64 max_block_mgr_memory
+  27: optional i64 buffer_pool_limit
 
   // If true, transforms all count(distinct) aggregations into NDV()
   28: optional bool appx_count_distinct = 0
@@ -255,6 +255,14 @@ struct TQueryOptions {
   // If the number of rows processed per node is below the threshold codegen 
will be
   // automatically disabled by the planner.
   57: optional i32 disable_codegen_rows_threshold = 50000
+
+  // The default spillable buffer size in bytes, which may be overridden by 
the planner.
+  // Defaults to 2MB.
+  58: optional i64 default_spillable_buffer_size = 2097152;
+
+  // The minimum spillable buffer to use. The planner will not choose a size 
smaller than
+  // this. Defaults to 64KB.
+  59: optional i64 min_spillable_buffer_size = 65536;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
@@ -375,6 +383,18 @@ struct TQueryCtx {
   // String containing a timestamp (in UTC) set as the query submission time. 
It
   // represents the same point in time as now_string
   17: required string utc_timestamp_string
+
+  // Minimum query-wide buffer reservation required per host in bytes. This is 
the peak
+  // minimum reservation that may be required by the concurrently-executing 
operators at
+  // any point in query execution. It may be less than the initial reservation 
total
+  // claims (below) if execution of some operators never overlaps, which 
allows reuse of
+  // reservations.
+  18: optional i64 per_host_min_reservation;
+
+  // Total of the initial buffer reservations that we expect to be claimed per 
host.
+  // I.e. the sum over all operators in all fragment instances that execute on 
that host.
+  // Measured in bytes.
+  19: optional i64 per_host_initial_reservation_total_claims;
 }
 
 // Specification of one output destination of a plan fragment

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index ec82bf1..ced884b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -159,7 +159,7 @@ enum TImpalaQueryOptions {
   QUERY_TIMEOUT_S,
 
   // Test hook for spill to disk operators
-  MAX_BLOCK_MGR_MEMORY,
+  BUFFER_POOL_LIMIT,
 
   // Transforms all count(distinct) aggregations into NDV()
   APPX_COUNT_DISTINCT,
@@ -279,6 +279,12 @@ enum TImpalaQueryOptions {
   // If the number of rows processed per node is below the threshold and 
disable_codegen
   // is unset, codegen will be automatically be disabled by the planner.
   DISABLE_CODEGEN_ROWS_THRESHOLD,
+
+  // The default spillable buffer size, in bytes.
+  DEFAULT_SPILLABLE_BUFFER_SIZE,
+
+  // The minimum spillable buffer size, in bytes.
+  MIN_SPILLABLE_BUFFER_SIZE,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index c1ff302..468ca44 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -481,6 +481,21 @@ struct TUnnestNode {
   1: required Exprs.TExpr collection_expr
 }
 
+// This contains all of the information computed by the plan as part of the 
resource
+// profile that is needed by the backend to execute.
+struct TBackendResourceProfile {
+  // The minimum reservation for this plan node in bytes.
+  1: required i64 min_reservation
+
+  // The maximum reservation for this plan node in bytes. MAX_INT64 means 
effectively
+  // unlimited.
+  2: required i64 max_reservation
+
+  // The spillable buffer size in bytes to use for this node, chosen by the 
planner.
+  // Set iff the node uses spillable buffers.
+  3: optional i64 spillable_buffer_size
+}
+
 // This is essentially a union of all messages corresponding to subclasses
 // of PlanNode.
 struct TPlanNode {
@@ -526,6 +541,9 @@ struct TPlanNode {
 
   // Runtime filters assigned to this plan node
   24: optional list<TRuntimeFilterDesc> runtime_filters
+
+  // Resource profile for this plan node.
+  25: required TBackendResourceProfile resource_profile
 }
 
 // A flattened representation of a tree of PlanNodes, obtained by depth-first

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py 
b/common/thrift/generate_error_codes.py
index 80e054e..ccd713c 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -240,11 +240,11 @@ error_codes = (
 
   ("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 76, "Cannot perform hash join at 
node with "
    "id $0. Repartitioning did not reduce the size of a spilled partition. 
Repartitioning "
-   "level $1. Number of rows $2."),
+   "level $1. Number of rows $2:\\n$3\\n$4"),
 
   ("PARTITIONED_AGG_REPARTITION_FAILS", 77,  "Cannot perform aggregation at 
node with "
    "id $0. Repartitioning did not reduce the size of a spilled partition. 
Repartitioning "
-   "level $1. Number of rows $2."),
+   "level $1. Number of rows $2:\\n$3\\n$4"),
 
   ("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at 
offset $1"),
 
@@ -322,10 +322,14 @@ error_codes = (
 
   # TODO: IMPALA-3200: make sure that this references the correct query option.
   ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node 
with "
-    "id $1. Limit is $2, which can be increased with query option 
max_row_size"),
+    "id $1. Increase the <TBD> query option (currently $2) to process larger 
rows."),
 
   ("IR_VERIFY_FAILED", 105,
    "Failed to verify generated IR function $0, see log for more details."),
+
+  ("MINIMUM_RESERVATION_UNAVAILABLE", 106, "Failed to get minimum memory 
reservation of "
+     "$0 on daemon $1:$2 for query $3 because it would exceed an applicable 
query, "
+     "request pool or process memory limit. Memory usage:\\n$4"),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java 
b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index 2041090..3c33bf1 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -17,8 +17,6 @@
 
 package org.apache.impala.common;
 
-import org.apache.impala.service.BackendConfig;
-
 /**
  * Contains runtime-specific parameters such as the number of CPU cores. 
Currently only
  * used in Plan cost estimation. The static RuntimeEnv members can be set so 
that tests
@@ -33,9 +31,6 @@ public class RuntimeEnv {
   // PlanNode.computeResourceProfile(). Currently the backend only support a 
single
   // spillable buffer size, so this is equal to 
PlanNode.DEFAULT_SPILLABLE_BUFFER_BYTES,
   // except in planner tests.
-  // TODO: IMPALA-3200: this get from query option
-  private long minSpillableBufferBytes_;
-
   // Indicates whether this is an environment for testing.
   private boolean isTestEnv_;
 
@@ -48,15 +43,10 @@ public class RuntimeEnv {
    */
   public void reset() {
     numCores_ = Runtime.getRuntime().availableProcessors();
-    minSpillableBufferBytes_ = BackendConfig.INSTANCE.getReadSize();
   }
 
   public int getNumCores() { return numCores_; }
   public void setNumCores(int numCores) { this.numCores_ = numCores; }
-  public long getMinSpillableBufferBytes() { return minSpillableBufferBytes_; }
-  public void setMinSpillableBufferBytes(long minSpillableBufferBytes) {
-    minSpillableBufferBytes_ = minSpillableBufferBytes;
-  }
   public void setTestEnv(boolean v) { isTestEnv_ = v; }
   public boolean isTestEnv() { return isTestEnv_; }
   public boolean isKuduSupported() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java 
b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
index 004c84e..c938f76 100644
--- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java
@@ -30,7 +30,6 @@ import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAggregationNode;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
@@ -302,24 +301,24 @@ public class AggregationNode extends PlanNode {
 
     // Must be kept in sync with 
PartitionedAggregationNode::MinRequiredBuffers() in be.
     long perInstanceMinBuffers;
+    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     if (aggInfo_.getGroupingExprs().isEmpty() || useStreamingPreagg_) {
       perInstanceMinBuffers = 0;
     } else {
       final int PARTITION_FANOUT = 16;
-      long minBuffers = 2 * PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() 
? 1 : 0);
-      long bufferSize = getDefaultSpillableBufferBytes();
+      long minBuffers = PARTITION_FANOUT + 1 + (aggInfo_.needsSerialize() ? 1 
: 0);
       if (perInstanceDataBytes != -1) {
         long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
         // Scale down the buffer size if we think there will be excess free 
space with the
         // default buffer size, e.g. with small dimension tables.
         bufferSize = Math.min(bufferSize, Math.max(
-            RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(),
+            queryOptions.getMin_spillable_buffer_size(),
             BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
       }
       perInstanceMinBuffers = bufferSize * minBuffers;
     }
 
-    nodeResourceProfile_ =
-        new ResourceProfile(perInstanceMemEstimate, perInstanceMinBuffers);
+    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
+        perInstanceMemEstimate, perInstanceMinBuffers, bufferSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java 
b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index d4bafcf..0322d88 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -248,8 +248,11 @@ public class AnalyticEvalNode extends PlanNode {
     // TODO: come up with estimate based on window
     long perInstanceMemEstimate = 0;
 
+    // Analytic always uses the default spillable buffer size.
+    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     // Must be kept in sync with MIN_REQUIRED_BUFFERS in AnalyticEvalNode in 
be.
-    long perInstanceMinBufferBytes = 2 * getDefaultSpillableBufferBytes();
-    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
+    long perInstanceMinBufferBytes = 2 * bufferSize;
+    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
+        perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
index 879d9d8..cea9b53 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java
@@ -332,7 +332,7 @@ public class DataSourceScanNode extends ScanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    nodeResourceProfile_ = new ResourceProfile(1024L * 1024L * 1024L, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L * 
1024L);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java 
b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
index d1369f5..af4f9a6 100644
--- a/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/DataStreamSink.java
@@ -51,7 +51,7 @@ public class DataStreamSink extends DataSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    resourceProfile_ = new ResourceProfile(0, 0);
+    resourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java 
b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
index 0d0acc9..3fb8bae 100644
--- a/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
@@ -62,7 +62,7 @@ public class EmptySetNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java 
b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
index 478a054..87d2fd2 100644
--- a/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ExchangeNode.java
@@ -184,7 +184,7 @@ public class ExchangeNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ =  new ResourceProfile(0, 0);
+    nodeResourceProfile_ =  ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
index bbecbf1..d56aa98 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java
@@ -497,7 +497,7 @@ public class HBaseScanNode extends ScanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: What's a good estimate of memory consumption?
-    nodeResourceProfile_ =  new ResourceProfile(1024L * 1024L * 1024L, 0);
+    nodeResourceProfile_ =  ResourceProfile.noReservation(1024L * 1024L * 
1024L);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
index 947665e..28939ed 100644
--- a/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HBaseTableSink.java
@@ -44,7 +44,7 @@ public class HBaseTableSink extends TableSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    resourceProfile_ = new ResourceProfile(0, 0);
+    resourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index e828125..5ff17c5 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -28,7 +28,6 @@ import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TEqJoinCondition;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.THashJoinNode;
@@ -223,17 +222,18 @@ public class HashJoinNode extends JoinNode {
     long minBuffers = PARTITION_FANOUT + 1
         + (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0);
 
-    long bufferSize = getDefaultSpillableBufferBytes();
+    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
     if (perInstanceDataBytes != -1) {
       long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
       // Scale down the buffer size if we think there will be excess free 
space with the
       // default buffer size, e.g. if the right side is a small dimension 
table.
       bufferSize = Math.min(bufferSize, Math.max(
-          RuntimeEnv.INSTANCE.getMinSpillableBufferBytes(),
+          queryOptions.getMin_spillable_buffer_size(),
           BitUtil.roundUpToPowerOf2(bytesPerBuffer)));
     }
 
     long perInstanceMinBufferBytes = bufferSize * minBuffers;
-    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinBufferBytes);
+    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
+        perInstanceMemEstimate, perInstanceMinBufferBytes, bufferSize);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 0ba5bc6..bf183be 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1021,7 +1021,7 @@ public class HdfsScanNode extends ScanNode {
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     Preconditions.checkNotNull(scanRanges_, "Cost estimation requires scan 
ranges.");
     if (scanRanges_.isEmpty()) {
-      nodeResourceProfile_ = new ResourceProfile(0, 0);
+      nodeResourceProfile_ = ResourceProfile.noReservation(0);
       return;
     }
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRanges_.size());
@@ -1075,7 +1075,7 @@ public class HdfsScanNode extends ScanNode {
           PrintUtils.printBytes(perHostUpperBound)));
       perInstanceMemEstimate = perHostUpperBound;
     }
-    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+    nodeResourceProfile_ = 
ResourceProfile.noReservation(perInstanceMemEstimate);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index fed4ffd..46709c0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -99,7 +99,7 @@ public class HdfsTableSink extends TableSink {
           PlanNode.checkedMultiply(numPartitionsPerInstance, 
perPartitionMemReq);
       perInstanceMemEstimate = Math.min(perInstanceInputBytes, 
perInstanceMemReq);
     }
-    resourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+    resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java 
b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
index 69cc133..14acb26 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinBuildSink.java
@@ -87,6 +87,6 @@ public class JoinBuildSink extends DataSink {
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     // The memory consumption is counted against the join PlanNode.
-    resourceProfile_ = new ResourceProfile(0, 0);
+    resourceProfile_ = ResourceProfile.noReservation(0);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 57403e4..37a4e5c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -270,7 +270,7 @@ public class KuduScanNode extends ScanNode {
 
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java 
b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
index b7dcdd8..f75b170 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduTableSink.java
@@ -60,7 +60,7 @@ public class KuduTableSink extends TableSink {
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add a memory estimate
-    resourceProfile_ = new ResourceProfile(0, 0);
+    resourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java 
b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
index 0ec8e4f..16a3caf 100644
--- a/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/NestedLoopJoinNode.java
@@ -20,9 +20,6 @@ package org.apache.impala.planner;
 import java.util.Collections;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.Expr;
@@ -86,7 +83,7 @@ public class NestedLoopJoinNode extends JoinNode {
       perInstanceMemEstimate =
           (long) Math.ceil(getChild(1).cardinality_ * getChild(1).avgRowSize_);
     }
-    nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+    nodeResourceProfile_ = 
ResourceProfile.noReservation(perInstanceMemEstimate);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/PlanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java 
b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index 9723c4a..2557f98 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -33,7 +33,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.TreeNode;
 import org.apache.impala.planner.RuntimeFilterGenerator.RuntimeFilter;
-import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendResourceProfile;
 import org.apache.impala.thrift.TExecStats;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlan;
@@ -408,6 +408,8 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
       msg.addToRuntime_filters(filter.toThrift());
     }
     msg.setDisable_codegen(disableCodegen_);
+    Preconditions.checkState(nodeResourceProfile_.isValid());
+    msg.resource_profile = nodeResourceProfile_.toThrift();
     toThrift(msg);
     container.addToNodes(msg);
     // For the purpose of the BE consider ExchangeNodes to have no children.
@@ -677,16 +679,6 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
   }
 
   /**
-   * The default size of buffer used in spilling nodes. Used in
-   * computeNodeResourceProfile().
-   */
-  protected final static long getDefaultSpillableBufferBytes() {
-    // BufferedBlockMgr uses --read_size to determine buffer size.
-    // TODO: IMPALA-3200: get from query option
-    return BackendConfig.INSTANCE.getReadSize();
-  }
-
-  /**
    * The input cardinality is the sum of output cardinalities of its children.
    * For scan nodes the input cardinality is the expected number of rows 
scanned.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java 
b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index fba9149..07eb58b 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -37,7 +37,7 @@ public class PlanRootSink extends DataSink {
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add a memory estimate
-    resourceProfile_ = new ResourceProfile(0, 0);
+    resourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   protected TDataSink toThrift() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java 
b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 4cfd57e..ed6e8df 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -63,7 +63,7 @@ public class Planner {
   public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
 
   public static final ResourceProfile MIN_PER_HOST_RESOURCES =
-      new ResourceProfile(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0);
+      ResourceProfile.withMinReservation(MIN_PER_HOST_MEM_ESTIMATE_BYTES, 0);
 
   private final PlannerContext ctx_;
 
@@ -262,9 +262,9 @@ public class Planner {
       TQueryExecRequest request, TExplainLevel explainLevel) {
     StringBuilder str = new StringBuilder();
     boolean hasHeader = false;
-    if (request.isSetPer_host_min_reservation()) {
+    if (request.query_ctx.isSetPer_host_min_reservation()) {
       str.append(String.format("Per-Host Resource Reservation: Memory=%s\n",
-              PrintUtils.printBytes(request.getPer_host_min_reservation()))) ;
+          
PrintUtils.printBytes(request.query_ctx.getPer_host_min_reservation())));
       hasHeader = true;
     }
     if (request.isSetPer_host_mem_estimate()) {
@@ -344,7 +344,7 @@ public class Planner {
    * per-host resource values in 'request'.
    */
   public void computeResourceReqs(List<PlanFragment> planRoots,
-      TQueryExecRequest request) {
+      TQueryCtx queryCtx, TQueryExecRequest request) {
     Preconditions.checkState(!planRoots.isEmpty());
     Preconditions.checkNotNull(request);
     TQueryOptions queryOptions = ctx_.getRootAnalyzer().getQueryOptions();
@@ -389,8 +389,8 @@ public class Planner {
     perHostPeakResources = MIN_PER_HOST_RESOURCES.max(perHostPeakResources);
 
     
request.setPer_host_mem_estimate(perHostPeakResources.getMemEstimateBytes());
-    
request.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes());
-    
request.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal);
+    
queryCtx.setPer_host_min_reservation(perHostPeakResources.getMinReservationBytes());
+    
queryCtx.setPer_host_initial_reservation_total_claims(perHostInitialReservationTotal);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Per-host min buffer : " + 
perHostPeakResources.getMinReservationBytes());
       LOG.trace(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java 
b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
index 18cde7e..3c13812 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -18,6 +18,7 @@
 package org.apache.impala.planner;
 
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.thrift.TBackendResourceProfile;
 import org.apache.impala.util.MathUtil;
 
 /**
@@ -35,25 +36,56 @@ public class ResourceProfile {
   private final long memEstimateBytes_;
 
   // Minimum buffer reservation required to execute in bytes.
+  // The valid range is [0, maxReservationBytes_].
   private final long minReservationBytes_;
 
-  private ResourceProfile(boolean isValid, long memEstimateBytes, long 
minReservationBytes) {
+  // Maximum buffer reservation allowed for this plan node.
+  // The valid range is [minReservationBytes_, Long.MAX_VALUE].
+  private final long maxReservationBytes_;
+
+  // The spillable buffer size to use in a plan node. Only valid for resource 
profiles
+  // for spilling PlanNodes. Operations like sum(), max(), etc., produce 
profiles without
+  // valid spillableBufferBytes_ values. -1 means invalid.
+  private final long spillableBufferBytes_;
+
+  private ResourceProfile(boolean isValid, long memEstimateBytes,
+      long minReservationBytes, long maxReservationBytes, long 
spillableBufferBytes) {
     isValid_ = isValid;
     memEstimateBytes_ = memEstimateBytes;
     minReservationBytes_ = minReservationBytes;
+    maxReservationBytes_ = maxReservationBytes;
+    spillableBufferBytes_ = spillableBufferBytes;
+  }
+
+  // Create a resource profile with zero min or max reservation.
+  public static ResourceProfile noReservation(long memEstimateBytes) {
+    return new ResourceProfile(true, memEstimateBytes, 0, 0, -1);
+  }
+
+  // Create a resource profile with a minimum reservation (but no maximum).
+  public static ResourceProfile withMinReservation(long memEstimateBytes,
+      long minReservationBytes) {
+    return new ResourceProfile(
+        true, memEstimateBytes, minReservationBytes, Long.MAX_VALUE, -1);
   }
 
-  public ResourceProfile(long memEstimateBytes, long minReservationBytes) {
-    this(true, memEstimateBytes, minReservationBytes);
+  // Create a resource profile with a minimum reservation (but no maximum) and 
a
+  // spillable buffer size.
+  public static ResourceProfile spillableWithMinReservation(long 
memEstimateBytes,
+      long minReservationBytes, long spillableBufferBytes) {
+    return new ResourceProfile(true, memEstimateBytes, minReservationBytes,
+        Long.MAX_VALUE, spillableBufferBytes);
   }
 
   public static ResourceProfile invalid() {
-    return new ResourceProfile(false, -1, -1);
+    return new ResourceProfile(false, -1, -1, -1, -1);
   }
 
   public boolean isValid() { return isValid_; }
   public long getMemEstimateBytes() { return memEstimateBytes_; }
   public long getMinReservationBytes() { return minReservationBytes_; }
+  public long getMaxReservationBytes() { return maxReservationBytes_; }
+  public long getSpillableBufferBytes() { return spillableBufferBytes_; }
 
   // Return a string with the resource profile information suitable for 
display in an
   // explain plan in a format like: "resource1=value resource2=value"
@@ -63,6 +95,12 @@ public class ResourceProfile {
     output.append(isValid_ ? PrintUtils.printBytes(memEstimateBytes_) : 
"invalid");
     output.append(" mem-reservation=");
     output.append(isValid_ ? PrintUtils.printBytes(minReservationBytes_) : 
"invalid");
+    // TODO: output maxReservation_ here if the planner becomes more 
sophisticated in
+    // choosing it (beyond 0/unlimited).
+    if (isValid_ && spillableBufferBytes_ != -1) {
+      output.append(" spill-buffer=");
+      output.append(PrintUtils.printBytes(spillableBufferBytes_));
+    }
     return output.toString();
   }
 
@@ -70,25 +108,39 @@ public class ResourceProfile {
   public ResourceProfile max(ResourceProfile other) {
     if (!isValid()) return other;
     if (!other.isValid()) return this;
-    return new ResourceProfile(
+    return new ResourceProfile(true,
         Math.max(getMemEstimateBytes(), other.getMemEstimateBytes()),
-        Math.max(getMinReservationBytes(), other.getMinReservationBytes()));
+        Math.max(getMinReservationBytes(), other.getMinReservationBytes()),
+        Math.max(getMaxReservationBytes(), other.getMaxReservationBytes()), 
-1);
   }
 
   // Returns a profile with the sum of each value in 'this' and 'other'.
   public ResourceProfile sum(ResourceProfile other) {
     if (!isValid()) return other;
     if (!other.isValid()) return this;
-    return new ResourceProfile(
+    return new ResourceProfile(true,
         MathUtil.saturatingAdd(getMemEstimateBytes(), 
other.getMemEstimateBytes()),
-        MathUtil.saturatingAdd(getMinReservationBytes(), 
other.getMinReservationBytes()));
+        
MathUtil.saturatingAdd(getMinReservationBytes(),other.getMinReservationBytes()),
+        MathUtil.saturatingAdd(getMaxReservationBytes(), 
other.getMaxReservationBytes()),
+        -1);
   }
 
   // Returns a profile with all values multiplied by 'factor'.
   public ResourceProfile multiply(int factor) {
     if (!isValid()) return this;
-    return new ResourceProfile(
+    return new ResourceProfile(true,
         MathUtil.saturatingMultiply(memEstimateBytes_, factor),
-        MathUtil.saturatingMultiply(minReservationBytes_, factor));
+        MathUtil.saturatingMultiply(minReservationBytes_, factor),
+        MathUtil.saturatingMultiply(maxReservationBytes_, factor), -1);
+  }
+
+  public TBackendResourceProfile toThrift() {
+    TBackendResourceProfile result = new TBackendResourceProfile();
+    result.setMin_reservation(minReservationBytes_);
+    result.setMax_reservation(maxReservationBytes_);
+    if (spillableBufferBytes_ != -1) {
+      result.setSpillable_buffer_size(spillableBufferBytes_);
+    }
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SelectNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java 
b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
index 97dfa5b..3ffc975 100644
--- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java
@@ -84,7 +84,7 @@ public class SelectNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java 
b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
index bed1c9a..bdf3a01 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingularRowSrcNode.java
@@ -68,7 +68,7 @@ public class SingularRowSrcNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SortNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java 
b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index aee8fda..75e8034 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -29,7 +29,6 @@ import org.apache.impala.analysis.SlotDescriptor;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.SortInfo;
 import org.apache.impala.common.InternalException;
-import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TPlanNode;
 import org.apache.impala.thrift.TPlanNodeType;
@@ -255,7 +254,7 @@ public class SortNode extends PlanNode {
     if (type_ == TSortType.TOPN) {
       long perInstanceMemEstimate =
               (long) Math.ceil((cardinality_ + offset_) * avgRowSize_);
-      nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 0);
+      nodeResourceProfile_ = 
ResourceProfile.noReservation(perInstanceMemEstimate);
       return;
     }
 
@@ -265,44 +264,40 @@ public class SortNode extends PlanNode {
     // size sqrt(N) blocks, and we could merge sqrt(N) such runs with sqrt(N) 
blocks
     // of memory.
     double fullInputSize = getChild(0).cardinality_ * avgRowSize_;
-    boolean hasVarLenSlots = false;
+    boolean usesVarLenBlocks = false;
     for (SlotDescriptor slotDesc: info_.getSortTupleDescriptor().getSlots()) {
       if (slotDesc.isMaterialized() && 
!slotDesc.getType().isFixedLengthType()) {
-        hasVarLenSlots = true;
+        usesVarLenBlocks = true;
         break;
       }
     }
 
-    // The block size used by the sorter is the same as the configured I/O 
read size.
-    long blockSize = BackendConfig.INSTANCE.getReadSize();
-    // The external sorter writes fixed-len and var-len data in separate 
sequences of
-    // blocks on disk and reads from both sequences when merging. This 
effectively
-    // doubles the block size when there are var-len columns present.
-    if (hasVarLenSlots) blockSize *= 2;
+    // Sort always uses the default spillable buffer size.
+    long bufferSize = queryOptions.getDefault_spillable_buffer_size();
 
+    // The external sorter writes fixed-len and var-len data in separate 
sequences of
+    // pages on disk and reads from both sequences when merging. This 
effectively
+    // doubles the number of pages required when there are var-len columns 
present.
+    // Must be kept in sync with ComputeMinReservation() in Sorter in be.
+    int pageMultiplier = usesVarLenBlocks ? 2 : 1;
+    long perInstanceMemEstimate;
+    long perInstanceMinReservation;
     if (type_ == TSortType.PARTIAL) {
       // The memory limit cannot be less than the size of the required blocks.
-      long mem_limit =
-          PARTIAL_SORT_MEM_LIMIT > blockSize ? PARTIAL_SORT_MEM_LIMIT : 
blockSize;
+      long mem_limit = Math.max(PARTIAL_SORT_MEM_LIMIT, bufferSize * 
pageMultiplier);
       // 'fullInputSize' will be negative if stats are missing, just use the 
limit.
-      long perInstanceMemEstimate = fullInputSize < 0 ?
+      perInstanceMemEstimate = fullInputSize < 0 ?
           mem_limit :
           Math.min((long) Math.ceil(fullInputSize), mem_limit);
-      nodeResourceProfile_ = new ResourceProfile(perInstanceMemEstimate, 
blockSize);
+      perInstanceMinReservation = bufferSize * pageMultiplier;
     } else {
-      Preconditions.checkState(type_ == TSortType.TOTAL);
-      double numInputBlocks = Math.ceil(fullInputSize / blockSize);
-      long perInstanceMemEstimate =
-          blockSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
-
-      // Must be kept in sync with min_buffers_required in Sorter in be.
-      long perInstanceMinReservation = 3 * getDefaultSpillableBufferBytes();
-      if (info_.getSortTupleDescriptor().hasVarLenSlots()) {
-        perInstanceMinReservation *= 2;
-      }
-      nodeResourceProfile_ =
-          new ResourceProfile(perInstanceMemEstimate, 
perInstanceMinReservation);
+      double numInputBlocks = Math.ceil(fullInputSize / (bufferSize * 
pageMultiplier));
+      perInstanceMemEstimate =
+          bufferSize * (long) Math.ceil(Math.sqrt(numInputBlocks));
+      perInstanceMinReservation = 3 * bufferSize * pageMultiplier;
     }
+    nodeResourceProfile_ = ResourceProfile.spillableWithMinReservation(
+        perInstanceMemEstimate, perInstanceMinReservation, bufferSize);
   }
 
   private static String getDisplayName(TSortType type) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java 
b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index c09efe5..e41290e 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -95,7 +95,7 @@ public class SubplanNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/UnionNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
index 44e2967..302f62d 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java
@@ -131,7 +131,7 @@ public class UnionNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java 
b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
index 695ec24..7e0a87e 100644
--- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java
@@ -76,7 +76,7 @@ public class UnnestNode extends PlanNode {
   @Override
   public void computeNodeResourceProfile(TQueryOptions queryOptions) {
     // TODO: add an estimate
-    nodeResourceProfile_ = new ResourceProfile(0, 0);
+    nodeResourceProfile_ = ResourceProfile.noReservation(0);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java 
b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 2c71a9b..60e84b4 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1005,7 +1005,7 @@ public class Frontend {
     }
 
     // Compute resource requirements of the final plans.
-    planner.computeResourceReqs(planRoots, result);
+    planner.computeResourceReqs(planRoots, queryCtx, result);
 
     // create per-plan exec info;
     // also assemble list of names of tables with missing or corrupt stats for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 8289ee8..b0f1e2e 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -425,8 +425,6 @@ public class PlannerTest extends PlannerTestBase {
     TQueryOptions options = defaultQueryOptions();
     options.setExplain_level(TExplainLevel.EXTENDED);
     options.setNum_scanner_threads(1); // Required so that output doesn't vary 
by machine
-    // TODO: IMPALA-3200 - this should become a query option.
-    RuntimeEnv.INSTANCE.setMinSpillableBufferBytes(64 * 1024);
     runPlannerTestFile("spillable-buffer-sizing", options, false);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index ed4f684..f4ae6c3 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -102,7 +102,7 @@ having 1024 * 1024 * count(*) % 2 = 0
   and (sm between 5 and 10)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=264.00MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -110,7 +110,7 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: sum(2 + id) <= 10, sum(2 + id) > 1, sum(2 + id) >= 5, 1048576 * 
count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=264.00MB
+|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=17B cardinality=0
 |
 00:SCAN HDFS [functional.alltypes]
@@ -129,7 +129,7 @@ left outer join functional.alltypes b
 where round(1.11 + 2.22 + 3.33 + 4.44, 1) < cast(b.double_col as decimal(3, 2))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=256.02MB mem-reservation=136.00MB
+|  Per-Host Resources: mem-estimate=256.02MB mem-reservation=1.06MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -138,7 +138,7 @@ PLAN-ROOT SINK
 |  fk/pk conjuncts: assumed fk/pk
 |  other join predicates: a.int_col <= b.bigint_col + 97, a.int_col >= 0 + 
b.bigint_col
 |  other predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
-|  mem-estimate=15.68KB mem-reservation=136.00MB
+|  mem-estimate=15.68KB mem-reservation=1.06MB spill-buffer=64.00KB
 |  tuple-ids=0,1N row-size=28B cardinality=7300
 |
 |--01:SCAN HDFS [functional.alltypes b]
@@ -203,7 +203,7 @@ group by timestamp_col = cast('2015-11-15' as timestamp) + 
interval 1 year
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=528.00MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=2.12MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -211,13 +211,13 @@ PLAN-ROOT SINK
 |  output: sum(2 + id), count:merge(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00'
 |  having: 1048576 * count(*) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=264.00MB
+|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
 |  tuple-ids=2 row-size=17B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: timestamp_col = TIMESTAMP '2016-11-15 00:00:00', 2 + id
-|  mem-estimate=10.00MB mem-reservation=264.00MB
+|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=17B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -234,20 +234,20 @@ from functional.alltypes
 having 1024 * 1024 * count(*) % 2 = 0
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=264.00MB
+|  Per-Host Resources: mem-estimate=138.00MB mem-reservation=1.06MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
 |  having: 1048576 * zeroifnull(count(*)) % 2 = 0
-|  mem-estimate=10.00MB mem-reservation=0B
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
 |  tuple-ids=2 row-size=16B cardinality=0
 |
 01:AGGREGATE
 |  output: count(*)
 |  group by: 2 + id
-|  mem-estimate=10.00MB mem-reservation=264.00MB
+|  mem-estimate=10.00MB mem-reservation=1.06MB spill-buffer=64.00KB
 |  tuple-ids=1 row-size=16B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -265,7 +265,7 @@ select first_value(1 + 1 + int_col - (1 - 1)) over
 from functional.alltypes
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=144.00MB mem-reservation=64.00MB
+|  Per-Host Resources: mem-estimate=130.00MB mem-reservation=16.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
@@ -274,13 +274,13 @@ PLAN-ROOT SINK
 |  partition by: concat('ab', string_col)
 |  order by: greatest(20, bigint_col) ASC
 |  window: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-|  mem-estimate=0B mem-reservation=16.00MB
+|  mem-estimate=0B mem-reservation=4.00MB spill-buffer=2.00MB
 |  tuple-ids=3,2 row-size=61B cardinality=7300
 |
 01:SORT
 |  order by: concat('ab', string_col) ASC NULLS FIRST, greatest(20, 
bigint_col) ASC
 |  materialized: concat('ab', string_col), greatest(20, bigint_col)
-|  mem-estimate=16.00MB mem-reservation=48.00MB
+|  mem-estimate=2.00MB mem-reservation=12.00MB spill-buffer=2.00MB
 |  tuple-ids=3 row-size=53B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -296,13 +296,13 @@ select int_col from functional.alltypes
 order by id * abs((factorial(5) / power(2, 4)))
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=136.00MB mem-reservation=24.00MB
+|  Per-Host Resources: mem-estimate=130.00MB mem-reservation=6.00MB
 PLAN-ROOT SINK
 |  mem-estimate=0B mem-reservation=0B
 |
 01:SORT
 |  order by: id * 7.5 ASC
-|  mem-estimate=8.00MB mem-reservation=24.00MB
+|  mem-estimate=2.00MB mem-reservation=6.00MB spill-buffer=2.00MB
 |  tuple-ids=1 row-size=8B cardinality=7300
 |
 00:SCAN HDFS [functional.alltypes]
@@ -347,7 +347,7 @@ PLAN-ROOT SINK
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(id + 10 + 20 + 30)
-|  mem-estimate=10.00MB mem-reservation=0B
+|  mem-estimate=10.00MB mem-reservation=0B spill-buffer=2.00MB
 |  tuple-ids=4 row-size=8B cardinality=1
 |
 00:SCAN HDFS [functional.alltypes]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
index e64691a..d367424 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test
@@ -78,7 +78,7 @@ select count(*)
 from functional.alltypes t1
 join functional.alltypestiny t2 on t1.id = t2.id
 ---- DISTRIBUTEDPLAN
-Per-Host Resource Reservation: Memory=136.00MB
+Per-Host Resource Reservation: Memory=1.06MB
 Per-Host Resource Estimates: Memory=180.00MB
 Codegen disabled by planner
 

Reply via email to