http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index 7067961..fc0a4a6 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -31,10 +31,12 @@
 #include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 #include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -111,7 +113,6 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     needs_finalize_(tnode.agg_node.need_finalize),
     is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation),
     needs_serialize_(false),
-    block_mgr_client_(NULL),
     output_partition_(NULL),
     process_batch_no_grouping_fn_(NULL),
     process_batch_fn_(NULL),
@@ -224,24 +225,6 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
     RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_,
         grouping_exprs_, true, vector<bool>(build_exprs_.size(), true),
         state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), 
&ht_ctx_));
-    RETURN_IF_ERROR(state_->block_mgr()->RegisterClient(
-        Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this),
-        MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
-  }
-
-  // TODO: Is there a need to create the stream here? If memory reservations 
work we may
-  // be able to create this stream lazily and only whenever we need to spill.
-  if (!is_streaming_preagg_ && needs_serialize_ && block_mgr_client_ != NULL) {
-    serialize_stream_.reset(new BufferedTupleStream(state, 
&intermediate_row_desc_,
-        state->block_mgr(), block_mgr_client_, false /* 
use_initial_small_buffers */,
-        false /* read_write */));
-    RETURN_IF_ERROR(serialize_stream_->Init(id(), runtime_profile(), false));
-    bool got_buffer;
-    RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
-    if (!got_buffer) {
-      return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
-    }
-    DCHECK(serialize_stream_->has_write_block());
   }
   AddCodegenDisabledMessage(state);
   return Status::OK();
@@ -265,8 +248,16 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   // Open the child before consuming resources in this node.
   RETURN_IF_ERROR(child(0)->Open(state));
-
   RETURN_IF_ERROR(ExecNode::Open(state));
+
+  // Claim reservation after the child has been opened to reduce the peak 
reservation
+  // requirement.
+  if (!buffer_pool_client_.is_registered() && !grouping_exprs_.empty()) {
+    DCHECK_GE(resource_profile_.min_reservation,
+        resource_profile_.spillable_buffer_size * MinRequiredBuffers());
+    RETURN_IF_ERROR(ClaimBufferReservation(state));
+  }
+
   if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state));
   RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state));
   if (grouping_exprs_.empty()) {
@@ -278,6 +269,25 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
     RETURN_IF_ERROR(state_->GetQueryStatus());
     singleton_output_tuple_returned_ = false;
   } else {
+    if (ht_allocator_ == nullptr) {
+      // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() 
call.
+      ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(),
+          &buffer_pool_client_, resource_profile_.spillable_buffer_size));
+
+      if (!is_streaming_preagg_ && needs_serialize_) {
+        serialize_stream_.reset(new BufferedTupleStreamV2(state, 
&intermediate_row_desc_,
+            &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+            resource_profile_.spillable_buffer_size));
+        RETURN_IF_ERROR(serialize_stream_->Init(id(), false));
+        bool got_buffer;
+        // Reserve the memory for 'serialize_stream_' so we don't need to 
scrounge up
+        // another buffer during spilling.
+        RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer));
+        DCHECK(got_buffer)
+            << "Accounted in min reservation" << 
buffer_pool_client_.DebugString();
+        DCHECK(serialize_stream_->has_write_iterator());
+      }
+    }
     RETURN_IF_ERROR(CreateHashPartitions(0));
   }
 
@@ -520,9 +530,12 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     bool ht_needs_expansion = false;
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
       HashTable* hash_tbl = GetHashTable(i);
-      DCHECK(hash_tbl != NULL);
-      remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
-      ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
+      if (hash_tbl == nullptr) {
+        remaining_capacity[i] = 0;
+      } else {
+        remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
+        ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
+      }
     }
 
     // Stop expanding hash tables if we're not reducing the input 
sufficiently. As our
@@ -533,9 +546,12 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
       for (int i = 0; i < PARTITION_FANOUT; ++i) {
         HashTable* ht = GetHashTable(i);
-        if (remaining_capacity[i] < child_batch_->num_rows()) {
+        if (ht != nullptr && remaining_capacity[i] < child_batch_->num_rows()) 
{
           SCOPED_TIMER(ht_resize_timer_);
-          if (ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get())) {
+          bool resized;
+          RETURN_IF_ERROR(
+              ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), 
&resized));
+          if (resized) {
             remaining_capacity[i] = ht->NumInsertsBeforeResize();
           }
         }
@@ -548,7 +564,7 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
           child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
     } else {
       RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, prefetch_mode,
-          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity ));
+          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
     }
 
     child_batch_->Reset(); // All rows from child_batch_ were processed.
@@ -557,7 +573,7 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
   if (child_eos_) {
     child(0)->Close(state);
     child_batch_.reset();
-    MoveHashPartitions(child(0)->rows_returned());
+    RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned()));
   }
 
   num_rows_returned_ += out_batch->num_rows();
@@ -570,8 +586,10 @@ bool 
PartitionedAggregationNode::ShouldExpandPreaggHashTables() const {
   int64_t ht_rows = 0;
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     HashTable* ht = hash_partitions_[i]->hash_tbl.get();
-    ht_mem += ht->CurrentMemSize();
-    ht_rows += ht->size();
+    if (ht != nullptr) {
+      ht_mem += ht->CurrentMemSize();
+      ht_rows += ht->size();
+    }
   }
 
   // Need some rows in tables to have valid statistics.
@@ -678,7 +696,6 @@ void PartitionedAggregationNode::Close(RuntimeState* state) 
{
   if (serialize_stream_.get() != nullptr) {
     serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
-  if (block_mgr_client_ != nullptr) 
state->block_mgr()->ClearReservations(block_mgr_client_);
   ScalarExpr::Close(grouping_exprs_);
   ScalarExpr::Close(build_exprs_);
   AggFn::Close(agg_fns_);
@@ -705,56 +722,55 @@ Status 
PartitionedAggregationNode::Partition::InitStreams() {
     }
   }
 
-  aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
-      &parent->intermediate_row_desc_, parent->state_->block_mgr(),
-      parent->block_mgr_client_, true /* use_initial_small_buffers */,
-      false /* read_write */, external_varlen_slots));
-  RETURN_IF_ERROR(
-      aggregated_row_stream->Init(parent->id(), parent->runtime_profile(), 
true));
+  aggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+      &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
+      parent->resource_profile_.spillable_buffer_size,
+      parent->resource_profile_.spillable_buffer_size, external_varlen_slots));
+  RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true));
   bool got_buffer;
   RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
   if (!got_buffer) {
-    return parent->state_->block_mgr()->MemLimitTooLowError(
-        parent->block_mgr_client_, parent->id());
+    stringstream ss;
+    parent->DebugString(2, &ss);
+    DCHECK(parent->is_streaming_preagg_)
+        << "Merge agg should have enough reservation " << parent->id_ << "\n"
+        << parent->buffer_pool_client_.DebugString() << "\n"
+        << ss.str();
+    DiscardAggregatedRowStream();
   }
 
   if (!parent->is_streaming_preagg_) {
-    unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
-        parent->child(0)->row_desc(), parent->state_->block_mgr(),
-      parent->block_mgr_client_, true /* use_initial_small_buffers */,
-        false /* read_write */));
+    unaggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_,
+        parent->child(0)->row_desc(), &parent->buffer_pool_client_,
+        parent->resource_profile_.spillable_buffer_size,
+        parent->resource_profile_.spillable_buffer_size));
     // This stream is only used to spill, no need to ever have this pinned.
-    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), 
parent->runtime_profile(),
-        false));
-    // TODO: allocate this buffer later only if we spill the partition.
-    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    if (!got_buffer) {
-      return parent->state_->block_mgr()->MemLimitTooLowError(
-          parent->block_mgr_client_, parent->id());
-    }
-    DCHECK(unaggregated_row_stream->has_write_block());
+    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false));
+    // Save memory by waiting until we spill to allocate the write buffer for 
the
+    // unaggregated row stream.
+    DCHECK(!unaggregated_row_stream->has_write_iterator());
   }
   return Status::OK();
 }
 
-bool PartitionedAggregationNode::Partition::InitHashTable() {
-  DCHECK(hash_tbl.get() == NULL);
+Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) {
+  DCHECK(aggregated_row_stream != nullptr);
+  DCHECK(hash_tbl == nullptr);
   // We use the upper PARTITION_FANOUT num bits to pick the partition so only 
the
   // remaining bits can be used for the hash table.
   // TODO: we could switch to 64 bit hashes and then we don't need a max size.
   // It might be reasonable to limit individual hash table size for other 
reasons
   // though. Always start with small buffers.
-  hash_tbl.reset(HashTable::Create(parent->state_, parent->block_mgr_client_,
-      false, 1, NULL, 1L << (32 - NUM_PARTITIONING_BITS),
-      PAGG_DEFAULT_HASH_TABLE_SZ));
+  hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, 
nullptr,
+      1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ));
   // Please update the error message in CreateHashPartitions() if initial size 
of
   // hash table changes.
-  return hash_tbl->Init();
+  return hash_tbl->Init(got_memory);
 }
 
 Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
   DCHECK(!parent->is_streaming_preagg_);
-  if (parent->needs_serialize_ && aggregated_row_stream->num_rows() != 0) {
+  if (parent->needs_serialize_) {
     // We need to do a lot more work in this case. This step effectively does 
a merge
     // aggregation in this node. We need to serialize the intermediates, spill 
the
     // intermediates and then feed them into the aggregate function's merge 
step.
@@ -767,70 +783,69 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     // for those UDAs.
     DCHECK(parent->serialize_stream_.get() != NULL);
     DCHECK(!parent->serialize_stream_->is_pinned());
-    DCHECK(parent->serialize_stream_->has_write_block());
 
     // Serialize and copy the spilled partition's stream into the new stream.
-    Status status = Status::OK();
-    bool failed_to_add = false;
-    BufferedTupleStream* new_stream = parent->serialize_stream_.get();
+    Status status;
+    BufferedTupleStreamV2* new_stream = parent->serialize_stream_.get();
     HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       it.Next();
       AggFnEvaluator::Serialize(agg_fn_evals, tuple);
       if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), 
&status))) {
-        failed_to_add = true;
-        break;
+        DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on 
error";
+        // Even if we can't add to new_stream, finish up processing this agg 
stream to make
+        // clean up easier (someone has to finalize this stream and we don't 
want to remember
+        // where we are).
+        parent->CleanupHashTbl(agg_fn_evals, it);
+        hash_tbl->Close();
+        hash_tbl.reset();
+        aggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+        return status;
       }
     }
 
-    // Even if we can't add to new_stream, finish up processing this agg 
stream to make
-    // clean up easier (someone has to finalize this stream and we don't want 
to remember
-    // where we are).
-    if (failed_to_add) {
-      parent->CleanupHashTbl(agg_fn_evals, it);
-      hash_tbl->Close();
-      hash_tbl.reset();
-      aggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-      RETURN_IF_ERROR(status);
-      return 
parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_,
-          parent->id());
-    }
-    DCHECK(status.ok());
-
     aggregated_row_stream->Close(NULL, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     aggregated_row_stream.swap(parent->serialize_stream_);
     // Recreate the serialize_stream (and reserve 1 buffer) now in preparation 
for
     // when we need to spill again. We need to have this available before we 
need
     // to spill to make sure it is available. This should be acquirable since 
we just
     // freed at least one buffer from this partition's (old) 
aggregated_row_stream.
-    parent->serialize_stream_.reset(
-        new BufferedTupleStream(parent->state_, 
&parent->intermediate_row_desc_,
-            parent->state_->block_mgr(), parent->block_mgr_client_,
-            false /* use_initial_small_buffers */, false /* read_write */));
-    status = parent->serialize_stream_->Init(parent->id(), 
parent->runtime_profile(),
-        false);
+    parent->serialize_stream_.reset(new BufferedTupleStreamV2(parent->state_,
+        &parent->intermediate_row_desc_, &parent->buffer_pool_client_,
+        parent->resource_profile_.spillable_buffer_size,
+        parent->resource_profile_.spillable_buffer_size));
+    status = parent->serialize_stream_->Init(parent->id(), false);
     if (status.ok()) {
       bool got_buffer;
       status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
-      if (status.ok() && !got_buffer) {
-        status = parent->state_->block_mgr()->MemLimitTooLowError(
-            parent->block_mgr_client_, parent->id());
-      }
+      DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation";
     }
     if (!status.ok()) {
       hash_tbl->Close();
       hash_tbl.reset();
       return status;
     }
-    DCHECK(parent->serialize_stream_->has_write_block());
+    DCHECK(parent->serialize_stream_->has_write_iterator());
   }
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::Partition::Spill() {
+void PartitionedAggregationNode::Partition::DiscardAggregatedRowStream() {
+  DCHECK(parent->is_streaming_preagg_);
+  DCHECK(aggregated_row_stream != nullptr);
+  DCHECK_EQ(aggregated_row_stream->num_rows(), 0);
+  if (hash_tbl != nullptr) hash_tbl->Close();
+  hash_tbl.reset();
+  aggregated_row_stream->Close(nullptr, 
RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  aggregated_row_stream.reset();
+}
+
+Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) {
+  DCHECK(!parent->is_streaming_preagg_);
   DCHECK(!is_closed);
   DCHECK(!is_spilled());
+  RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker()));
 
   RETURN_IF_ERROR(SerializeStreamForSpilling());
 
@@ -846,34 +861,18 @@ Status PartitionedAggregationNode::Partition::Spill() {
   hash_tbl->Close();
   hash_tbl.reset();
 
-  // Try to switch both streams to IO-sized buffers to avoid allocating small 
buffers
-  // for spilled partition.
-  bool got_buffer = true;
-  if (aggregated_row_stream->using_small_buffers()) {
-    RETURN_IF_ERROR(aggregated_row_stream->SwitchToIoBuffers(&got_buffer));
-  }
-  // Unpin the stream as soon as possible to increase the chances that the
-  // SwitchToIoBuffers() call below will succeed.  If we're repartitioning, 
rows that
-  // were already aggregated (rows from the input partition's aggregated 
stream) will
-  // need to be added to this hash partition's aggregated stream, so we need 
to leave
-  // the write block pinned.
-  // TODO: when not repartitioning, don't leave the write block pinned.
-  DCHECK(!got_buffer || aggregated_row_stream->has_write_block())
-      << aggregated_row_stream->DebugString();
-  RETURN_IF_ERROR(
-      
aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-
-  if (got_buffer && unaggregated_row_stream->using_small_buffers()) {
-    RETURN_IF_ERROR(unaggregated_row_stream->SwitchToIoBuffers(&got_buffer));
-  }
-  if (!got_buffer) {
-    // We'll try again to get the buffers when the stream fills up the small 
buffers.
-    VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for 
partition "
-               << this << " of agg=" << parent->id_ << " agg small buffers="
-               << aggregated_row_stream->using_small_buffers()
-               << " unagg small buffers="
-               << unaggregated_row_stream->using_small_buffers();
-    VLOG_FILE << GetStackTrace();
+  // Unpin the stream to free memory, but leave a write buffer in place so we 
can
+  // continue appending rows to one of the streams in the partition.
+  DCHECK(aggregated_row_stream->has_write_iterator());
+  DCHECK(!unaggregated_row_stream->has_write_iterator());
+  if (more_aggregate_rows) {
+    
aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+  } else {
+    aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    bool got_buffer;
+    RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    DCHECK(got_buffer)
+        << "Accounted in min reservation" << 
parent->buffer_pool_client_.DebugString();
   }
 
   COUNTER_ADD(parent->num_spilled_partitions_, 1);
@@ -933,33 +932,27 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream,
+    const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStreamV2* stream,
     Status* status) noexcept {
   DCHECK(stream != NULL && status != NULL);
   // Allocate space for the entire tuple in the stream.
   const int fixed_size = intermediate_tuple_desc_->byte_size();
   const int varlen_size = GroupingExprsVarlenSize();
-  uint8_t* varlen_buffer;
-  uint8_t* fixed_buffer = stream->AllocateRow(fixed_size, varlen_size, 
&varlen_buffer,
-      status);
-  if (UNLIKELY(fixed_buffer == NULL)) {
-    if (!status->ok() || !stream->using_small_buffers()) return NULL;
-    // IMPALA-2352: Make a best effort to switch to IO buffers and re-allocate.
-    // If SwitchToIoBuffers() fails the caller of this function can try to free
-    // some space, e.g. through spilling, and re-attempt to allocate space for
-    // this row.
-    bool got_buffer;
-    *status = stream->SwitchToIoBuffers(&got_buffer);
-    if (!status->ok() || !got_buffer) return NULL;
-    fixed_buffer = stream->AllocateRow(fixed_size, varlen_size, 
&varlen_buffer, status);
-    if (fixed_buffer == NULL) return NULL;
-  }
-
-  Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(fixed_buffer);
-  intermediate_tuple->Init(fixed_size);
-  CopyGroupingValues(intermediate_tuple, varlen_buffer, varlen_size);
-  InitAggSlots(agg_fn_evals, intermediate_tuple);
-  return intermediate_tuple;
+  const int tuple_size = fixed_size + varlen_size;
+  uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status);
+  if (UNLIKELY(tuple_data == nullptr)) {
+    // If we failed to allocate and did not hit an error (indicated by a 
non-ok status),
+    // the caller of this function can try to free some space, e.g. through 
spilling, and
+    // re-attempt to allocate space for this row.
+    return nullptr;
+  }
+  Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data);
+  tuple->Init(fixed_size);
+  uint8_t* varlen_buffer = tuple_data + fixed_size;
+  CopyGroupingValues(tuple, varlen_buffer, varlen_size);
+  InitAggSlots(agg_fn_evals, tuple);
+  stream->AddRowCustomEnd(tuple_size);
+  return tuple;
 }
 
 int PartitionedAggregationNode::GroupingExprsVarlenSize() {
@@ -1079,30 +1072,30 @@ Tuple* PartitionedAggregationNode::GetOutputTuple(
   return dst;
 }
 
-Status PartitionedAggregationNode::AppendSpilledRow(BufferedTupleStream* 
stream,
-    TupleRow* row) {
-  DCHECK(stream != NULL);
+template <bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::AppendSpilledRow(
+    Partition* __restrict__ partition, TupleRow* __restrict__ row) {
+  DCHECK(!is_streaming_preagg_);
+  DCHECK(partition->is_spilled());
+  BufferedTupleStreamV2* stream = AGGREGATED_ROWS ?
+      partition->aggregated_row_stream.get() :
+      partition->unaggregated_row_stream.get();
   DCHECK(!stream->is_pinned());
-  DCHECK(stream->has_write_block());
-  if (LIKELY(stream->AddRow(row, &process_batch_status_))) return Status::OK();
+  Status status;
+  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  RETURN_IF_ERROR(status);
 
-  // Adding fails iff either we hit an error or haven't switched to I/O 
buffers.
-  RETURN_IF_ERROR(process_batch_status_);
+  // Keep trying to free memory by spilling until we succeed or hit an error.
+  // Running out of partitions to spill is treated as an error by 
SpillPartition().
   while (true) {
-    bool got_buffer;
-    RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
-    if (got_buffer) break;
-    RETURN_IF_ERROR(SpillPartition());
+    RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
+    if (stream->AddRow(row, &status)) return Status::OK();
+    RETURN_IF_ERROR(status);
   }
-
-  // Adding the row should succeed after the I/O buffer switch.
-  if (stream->AddRow(row, &process_batch_status_)) return Status::OK();
-  DCHECK(!process_batch_status_.ok());
-  return process_batch_status_;
 }
 
-void PartitionedAggregationNode::DebugString(int indentation_level,
-    stringstream* out) const {
+void PartitionedAggregationNode::DebugString(
+    int indentation_level, stringstream* out) const {
   *out << string(indentation_level * 2, ' ');
   *out << "PartitionedAggregationNode("
        << "intermediate_tuple_id=" << intermediate_tuple_id_
@@ -1114,85 +1107,100 @@ void PartitionedAggregationNode::DebugString(int 
indentation_level,
   *out << ")";
 }
 
-Status PartitionedAggregationNode::CreateHashPartitions(int level) {
+Status PartitionedAggregationNode::CreateHashPartitions(
+    int level, int single_partition_idx) {
   if (is_streaming_preagg_) DCHECK_EQ(level, 0);
   if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
-    return Status(TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, 
MAX_PARTITION_DEPTH);
+    return Status(
+        TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, 
MAX_PARTITION_DEPTH);
   }
   ht_ctx_->set_level(level);
 
   DCHECK(hash_partitions_.empty());
+  int num_partitions_created = 0;
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* new_partition = new Partition(this, level);
-    DCHECK(new_partition != NULL);
-    hash_partitions_.push_back(partition_pool_->Add(new_partition));
-    RETURN_IF_ERROR(new_partition->InitStreams());
-    hash_tbls_[i] = NULL;
-  }
-  if (!is_streaming_preagg_) {
-    
DCHECK_GT(state_->block_mgr()->num_reserved_buffers_remaining(block_mgr_client_),
 0);
+    hash_tbls_[i] = nullptr;
+    if (single_partition_idx == -1 || i == single_partition_idx) {
+      Partition* new_partition = partition_pool_->Add(new Partition(this, 
level, i));
+      ++num_partitions_created;
+      hash_partitions_.push_back(new_partition);
+      RETURN_IF_ERROR(new_partition->InitStreams());
+    } else {
+      hash_partitions_.push_back(nullptr);
+    }
   }
-
   // Now that all the streams are reserved (meaning we have enough memory to 
execute
   // the algorithm), allocate the hash tables. These can fail and we can still 
continue.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    if (UNLIKELY(!hash_partitions_[i]->InitHashTable())) {
-      // We don't spill on preaggregations. If we have so little memory that 
we can't
-      // allocate small hash tables, the mem limit is just too low.
-      if (is_streaming_preagg_) {
-        int64_t alloc_size = PAGG_DEFAULT_HASH_TABLE_SZ * 
HashTable::BucketSize();
-        string details = Substitute("Cannot perform aggregation at node with 
id $0."
-            " Failed to initialize hash table in preaggregation. The memory 
limit"
-            " is too low to execute the query.", id_);
-        return mem_tracker()->MemLimitExceeded(state_, details, alloc_size);
+    Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
+    if (partition->aggregated_row_stream == nullptr) {
+      // Failed to create the aggregated row stream - cannot create a hash 
table.
+      // Just continue with a NULL hash table so rows will be passed through.
+      DCHECK(is_streaming_preagg_);
+    } else {
+      bool got_memory;
+      RETURN_IF_ERROR(partition->InitHashTable(&got_memory));
+      // Spill the partition if we cannot create a hash table for a merge 
aggregation.
+      if (UNLIKELY(!got_memory)) {
+        if (is_streaming_preagg_) {
+          partition->DiscardAggregatedRowStream();
+        } else {
+          // If we're repartitioning, we will be writing aggregated rows first.
+          RETURN_IF_ERROR(partition->Spill(level > 0));
+        }
       }
-      RETURN_IF_ERROR(hash_partitions_[i]->Spill());
     }
-    hash_tbls_[i] = hash_partitions_[i]->hash_tbl.get();
+    hash_tbls_[i] = partition->hash_tbl.get();
   }
 
-  COUNTER_ADD(partitions_created_, hash_partitions_.size());
+  COUNTER_ADD(partitions_created_, num_partitions_created);
   if (!is_streaming_preagg_) {
     COUNTER_SET(max_partition_level_, level);
   }
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CheckAndResizeHashPartitions(int num_rows,
-    const HashTableCtx* ht_ctx) {
+Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
+    bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* 
ht_ctx) {
   DCHECK(!is_streaming_preagg_);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     Partition* partition = hash_partitions_[i];
+    if (partition == nullptr) continue;
     while (!partition->is_spilled()) {
       {
         SCOPED_TIMER(ht_resize_timer_);
-        if (partition->hash_tbl->CheckAndResize(num_rows, ht_ctx)) break;
+        bool resized;
+        RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, 
&resized));
+        if (resized) break;
       }
-      RETURN_IF_ERROR(SpillPartition());
+      RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows));
     }
   }
   return Status::OK();
 }
 
 int64_t PartitionedAggregationNode::LargestSpilledPartition() const {
+  DCHECK(!is_streaming_preagg_);
   int64_t max_rows = 0;
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     Partition* partition = hash_partitions_[i];
-    if (partition->is_closed || !partition->is_spilled()) continue;
-    int64_t rows = partition->aggregated_row_stream->num_rows() +
-        partition->unaggregated_row_stream->num_rows();
+    if (partition == nullptr || partition->is_closed || 
!partition->is_spilled()) {
+      continue;
+    }
+    int64_t rows = partition->aggregated_row_stream->num_rows()
+        + partition->unaggregated_row_stream->num_rows();
     if (rows > max_rows) max_rows = rows;
   }
   return max_rows;
 }
 
 Status PartitionedAggregationNode::NextPartition() {
-  DCHECK(output_partition_ == NULL);
+  DCHECK(output_partition_ == nullptr);
 
   // Keep looping until we get to a partition that fits in memory.
-  Partition* partition = NULL;
+  Partition* partition = nullptr;
   while (true) {
-    partition = NULL;
     // First return partitions that are fully aggregated (and in memory).
     if (!aggregated_partitions_.empty()) {
       partition = aggregated_partitions_.front();
@@ -1201,56 +1209,23 @@ Status PartitionedAggregationNode::NextPartition() {
       break;
     }
 
-    if (partition == NULL) {
-      DCHECK(!spilled_partitions_.empty());
-      DCHECK(!is_streaming_preagg_);
-      DCHECK_EQ(state_->block_mgr()->num_pinned_buffers(block_mgr_client_),
-          needs_serialize_ ? 1 : 0);
-
-      // TODO: we can probably do better than just picking the first 
partition. We
-      // can base this on the amount written to disk, etc.
-      partition = spilled_partitions_.front();
-      DCHECK(partition->is_spilled());
-
-      // Create the new hash partitions to repartition into.
-      // TODO: we don't need to repartition here. We are now working on 1 / 
FANOUT
-      // of the input so it's reasonably likely it can fit. We should look at 
this
-      // partitions size and just do the aggregation if it fits in memory.
-      RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
-      COUNTER_ADD(num_repartitions_, 1);
-
-      // Rows in this partition could have been spilled into two streams, 
depending
-      // on if it is an aggregated intermediate, or an unaggregated row.
-      // Note: we must process the aggregated rows first to save a hash table 
lookup
-      // in ProcessBatch().
-      
RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
-      
RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
-
-      COUNTER_ADD(num_row_repartitioned_, 
partition->aggregated_row_stream->num_rows());
-      COUNTER_ADD(num_row_repartitioned_,
-          partition->unaggregated_row_stream->num_rows());
+    // No aggregated partitions in memory - we should not be using any 
reservation aside
+    // from 'serialize_stream_'.
+    DCHECK_EQ(serialize_stream_ != nullptr ? 
serialize_stream_->BytesPinned(false) : 0,
+        buffer_pool_client_.GetUsedReservation()) << 
buffer_pool_client_.DebugString();
 
-      partition->Close(false);
-      spilled_partitions_.pop_front();
-
-      // Done processing this partition. Move the new partitions into
-      // spilled_partitions_/aggregated_partitions_.
-      int64_t num_input_rows = partition->aggregated_row_stream->num_rows() +
-          partition->unaggregated_row_stream->num_rows();
-
-      // Check if there was any reduction in the size of partitions after 
repartitioning.
-      int64_t largest_partition = LargestSpilledPartition();
-      DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition 
with "
-          "more rows than the input";
-      if (UNLIKELY(num_input_rows == largest_partition)) {
-        return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_,
-            partition->level + 1, num_input_rows);
-      }
-      RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
-    }
-  }
+    // Try to fit a single spilled partition in memory. We can often do this 
because
+    // we only need to fit 1/PARTITION_FANOUT of the data in memory.
+    // TODO: in some cases when the partition probably won't fit in memory it 
could
+    // be better to skip directly to repartitioning.
+    RETURN_IF_ERROR(BuildSpilledPartition(&partition));
+    if (partition != nullptr) break;
 
-  DCHECK(partition->hash_tbl.get() != NULL);
+    // If we can't fit the partition in memory, repartition it.
+    RETURN_IF_ERROR(RepartitionSpilledPartition());
+  }
+  DCHECK(!partition->is_spilled());
+  DCHECK(partition->hash_tbl.get() != nullptr);
   DCHECK(partition->aggregated_row_stream->is_pinned());
 
   output_partition_ = partition;
@@ -1259,8 +1234,105 @@ Status PartitionedAggregationNode::NextPartition() {
   return Status::OK();
 }
 
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* 
input_stream) {
+Status PartitionedAggregationNode::BuildSpilledPartition(Partition** 
built_partition) {
+  DCHECK(!spilled_partitions_.empty());
+  DCHECK(!is_streaming_preagg_);
+  // Leave the partition in 'spilled_partitions_' to be closed if we hit an 
error.
+  Partition* src_partition = spilled_partitions_.front();
+  DCHECK(src_partition->is_spilled());
+
+  // Create a new hash partition from the rows of the spilled partition. This 
is simpler
+  // than trying to finish building a partially-built partition in place. We 
only
+  // initialise one hash partition that all rows in 'src_partition' will hash 
to.
+  RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, 
src_partition->idx));
+  Partition* dst_partition = hash_partitions_[src_partition->idx];
+  DCHECK(dst_partition != nullptr);
+
+  // Rebuild the hash table over spilled aggregate rows then start adding 
unaggregated
+  // rows to the hash table. It's possible the partition will spill at either 
stage.
+  // In that case we need to finish processing 'src_partition' so that all 
rows are
+  // appended to 'dst_partition'.
+  // TODO: if the partition spills again but the aggregation reduces the input
+  // significantly, we could do better here by keeping the incomplete hash 
table in
+  // memory and only spilling unaggregated rows that didn't fit in the hash 
table
+  // (somewhat similar to the passthrough pre-aggregation).
+  
RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
+  
RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
+  src_partition->Close(false);
+  spilled_partitions_.pop_front();
+  hash_partitions_.clear();
+
+  if (dst_partition->is_spilled()) {
+    PushSpilledPartition(dst_partition);
+    *built_partition = nullptr;
+    // Spilled the partition - we should not be using any reservation except 
from
+    // 'serialize_stream_'.
+    DCHECK_EQ(serialize_stream_ != nullptr ? 
serialize_stream_->BytesPinned(false) : 0,
+        buffer_pool_client_.GetUsedReservation()) << 
buffer_pool_client_.DebugString();
+  } else {
+    *built_partition = dst_partition;
+  }
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::RepartitionSpilledPartition() {
+  DCHECK(!spilled_partitions_.empty());
+  DCHECK(!is_streaming_preagg_);
+  // Leave the partition in 'spilled_partitions_' to be closed if we hit an 
error.
+  Partition* partition = spilled_partitions_.front();
+  DCHECK(partition->is_spilled());
+
+  // Create the new hash partitions to repartition into. This will allocate a
+  // write buffer for each partition's aggregated row stream.
+  RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1));
+  COUNTER_ADD(num_repartitions_, 1);
+
+  // Rows in this partition could have been spilled into two streams, depending
+  // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
+  // rows are processed first to save a hash table lookup in ProcessBatch().
+  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
+
+  // Prepare write buffers so we can append spilled rows to unaggregated 
partitions.
+  for (Partition* hash_partition : hash_partitions_) {
+    if (!hash_partition->is_spilled()) continue;
+    // The aggregated rows have been repartitioned. Free up at least a 
buffer's worth of
+    // reservation and use it to pin the unaggregated write buffer.
+    
hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+    bool got_buffer;
+    RETURN_IF_ERROR(
+        hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
+    DCHECK(got_buffer)
+        << "Accounted in min reservation" << buffer_pool_client_.DebugString();
+  }
+  
RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
+
+  COUNTER_ADD(num_row_repartitioned_, 
partition->aggregated_row_stream->num_rows());
+  COUNTER_ADD(num_row_repartitioned_, 
partition->unaggregated_row_stream->num_rows());
+
+  partition->Close(false);
+  spilled_partitions_.pop_front();
+
+  // Done processing this partition. Move the new partitions into
+  // spilled_partitions_/aggregated_partitions_.
+  int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
+      + partition->unaggregated_row_stream->num_rows();
+
+  // Check if there was any reduction in the size of partitions after 
repartitioning.
+  int64_t largest_partition = LargestSpilledPartition();
+  DCHECK_GE(num_input_rows, largest_partition) << "Partition had more rows 
than input";
+  if (UNLIKELY(num_input_rows == largest_partition)) {
+    stringstream ss;
+    DebugString(2, &ss);
+    return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_,
+        partition->level + 1, num_input_rows, 
buffer_pool_client_.DebugString(),
+        ss.str());
+  }
+  RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
+  return Status::OK();
+}
+
+template <bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::ProcessStream(BufferedTupleStreamV2* 
input_stream) {
   DCHECK(!is_streaming_preagg_);
   if (input_stream->num_rows() > 0) {
     while (true) {
@@ -1268,7 +1340,7 @@ Status 
PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
       RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
       if (got_buffer) break;
       // Did not have a buffer to read the input stream. Spill and try again.
-      RETURN_IF_ERROR(SpillPartition());
+      RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
     }
 
     TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
@@ -1288,16 +1360,17 @@ Status 
PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::SpillPartition() {
+Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) {
   int64_t max_freed_mem = 0;
   int partition_idx = -1;
 
   // Iterate over the partitions and pick the largest partition that is not 
spilled.
   for (int i = 0; i < hash_partitions_.size(); ++i) {
+    if (hash_partitions_[i] == nullptr) continue;
     if (hash_partitions_[i]->is_closed) continue;
     if (hash_partitions_[i]->is_spilled()) continue;
     // Pass 'true' because we need to keep the write block pinned. See 
Partition::Spill().
-    int64_t mem = 
hash_partitions_[i]->aggregated_row_stream->bytes_in_mem(true);
+    int64_t mem = 
hash_partitions_[i]->aggregated_row_stream->BytesPinned(true);
     mem += hash_partitions_[i]->hash_tbl->ByteSize();
     mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes();
     DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory.
@@ -1306,26 +1379,26 @@ Status PartitionedAggregationNode::SpillPartition() {
       partition_idx = i;
     }
   }
-  if (partition_idx == -1) {
-    // Could not find a partition to spill. This means the mem limit was just 
too low.
-    return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
-  }
-
+  DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition 
to "
+                               << "reclaim memory: " << 
buffer_pool_client_.DebugString();
   hash_tbls_[partition_idx] = NULL;
-  return hash_partitions_[partition_idx]->Spill();
+  return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
 }
 
 Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
   DCHECK(!hash_partitions_.empty());
   stringstream ss;
-  ss << "PA(node_id=" << id() << ") partitioned(level="
-     << hash_partitions_[0]->level << ") "
-     << num_input_rows << " rows into:" << endl;
+  ss << "PA(node_id=" << id() << ") partitioned(level=" << 
hash_partitions_[0]->level
+     << ") " << num_input_rows << " rows into:" << endl;
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     Partition* partition = hash_partitions_[i];
-    int64_t aggregated_rows = partition->aggregated_row_stream->num_rows();
+    if (partition == nullptr) continue;
+    int64_t aggregated_rows = 0;
+    if (partition->aggregated_row_stream != nullptr) {
+      aggregated_rows = partition->aggregated_row_stream->num_rows();
+    }
     int64_t unaggregated_rows = 0;
-    if (partition->unaggregated_row_stream != NULL) {
+    if (partition->unaggregated_row_stream != nullptr) {
       unaggregated_rows = partition->unaggregated_row_stream->num_rows();
     }
     double total_rows = aggregated_rows + unaggregated_rows;
@@ -1341,54 +1414,46 @@ Status 
PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) {
     if (total_rows == 0) {
       partition->Close(false);
     } else if (partition->is_spilled()) {
-      DCHECK(partition->hash_tbl.get() == NULL);
-      // We need to unpin all the spilled partitions to make room to allocate 
new
-      // hash_partitions_ when we repartition the spilled partitions.
-      // TODO: we only need to do this when we have memory pressure. This 
might be
-      // okay though since the block mgr should only write these to disk if 
there
-      // is memory pressure.
-      RETURN_IF_ERROR(
-          
partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL));
-      RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream(
-          BufferedTupleStream::UNPIN_ALL));
-
-      // Push new created partitions at the front. This means a depth first 
walk
-      // (more finely partitioned partitions are processed first). This allows 
us
-      // to delete blocks earlier and bottom out the recursion earlier.
-      spilled_partitions_.push_front(partition);
+      PushSpilledPartition(partition);
     } else {
       aggregated_partitions_.push_back(partition);
     }
-
   }
   VLOG(2) << ss.str();
   hash_partitions_.clear();
   return Status::OK();
 }
 
+void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) {
+  DCHECK(partition->is_spilled());
+  DCHECK(partition->hash_tbl == nullptr);
+  // Ensure all pages in the spilled partition's streams are unpinned by 
invalidating
+  // the streams' read and write iterators. We may need all the memory to 
process the
+  // next spilled partitions.
+  
partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+  
partition->unaggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+  spilled_partitions_.push_front(partition);
+}
+
 void PartitionedAggregationNode::ClosePartitions() {
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    hash_partitions_[i]->Close(true);
-  }
-  for (list<Partition*>::iterator it = aggregated_partitions_.begin();
-      it != aggregated_partitions_.end(); ++it) {
-    (*it)->Close(true);
-  }
-  for (list<Partition*>::iterator it = spilled_partitions_.begin();
-      it != spilled_partitions_.end(); ++it) {
-    (*it)->Close(true);
+  for (Partition* partition : hash_partitions_) {
+    if (partition != nullptr) partition->Close(true);
   }
+  hash_partitions_.clear();
+  for (Partition* partition : aggregated_partitions_) partition->Close(true);
   aggregated_partitions_.clear();
+  for (Partition* partition : spilled_partitions_) partition->Close(true);
   spilled_partitions_.clear();
-  hash_partitions_.clear();
   memset(hash_tbls_, 0, sizeof(hash_tbls_));
   partition_pool_->Clear();
 }
 
 Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) {
   AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_);
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    AggFnEvaluator::FreeLocalAllocations(hash_partitions_[i]->agg_fn_evals);
+  for (Partition* partition : hash_partitions_) {
+    if (partition != nullptr) {
+      AggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals);
+    }
   }
   if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations();
   return ExecNode::QueryMaintenance(state);
@@ -1972,4 +2037,8 @@ Status 
PartitionedAggregationNode::CodegenProcessBatchStreaming(
   return Status::OK();
 }
 
+// Instantiate required templates.
+template Status PartitionedAggregationNode::AppendSpilledRow<false>(
+    Partition*, TupleRow*);
+template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, 
TupleRow*);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index 066dc28..4f8b622 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -19,13 +19,15 @@
 #ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
 #define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H
 
+#include <deque>
+
 #include <boost/scoped_ptr.hpp>
 
 #include "exec/exec-node.h"
 #include "exec/hash-table.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
-#include "runtime/descriptors.h"  // for TupleId
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/bufferpool/suballocator.h"
+#include "runtime/descriptors.h" // for TupleId
 #include "runtime/mem-pool.h"
 #include "runtime/string-value.h"
 
@@ -229,7 +231,9 @@ class PartitionedAggregationNode : public ExecNode {
   std::vector<int> string_grouping_exprs_;
 
   RuntimeState* state_;
-  BufferedBlockMgr::Client* block_mgr_client_;
+
+  /// Allocator for hash table memory.
+  boost::scoped_ptr<Suballocator> ht_allocator_;
 
   /// MemPool used to allocate memory for when we don't have grouping and 
don't initialize
   /// the partitioning structures, or during Close() when creating new output 
tuples.
@@ -337,12 +341,12 @@ class PartitionedAggregationNode : public ExecNode {
   HashTable* hash_tbls_[PARTITION_FANOUT];
 
   /// All partitions that have been spilled and need further processing.
-  std::list<Partition*> spilled_partitions_;
+  std::deque<Partition*> spilled_partitions_;
 
   /// All partitions that are aggregated and can just return the results in 
GetNext().
   /// After consuming all the input, hash_partitions_ is split into 
spilled_partitions_
   /// and aggregated_partitions_, depending on if it was spilled or not.
-  std::list<Partition*> aggregated_partitions_;
+  std::deque<Partition*> aggregated_partitions_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
@@ -352,31 +356,42 @@ class PartitionedAggregationNode : public ExecNode {
   /// initially use small buffers. Streaming pre-aggregations do not spill and 
do not
   /// require an unaggregated stream.
   struct Partition {
-    Partition(PartitionedAggregationNode* parent, int level)
-      : parent(parent), is_closed(false), level(level) {}
+    Partition(PartitionedAggregationNode* parent, int level, int idx)
+      : parent(parent), is_closed(false), level(level), idx(idx) {}
 
     ~Partition();
 
     /// Initializes aggregated_row_stream and unaggregated_row_stream (if a 
spilling
-    /// aggregation), reserving one buffer for each. The buffers backing these 
streams
-    /// are reserved, so this function will not fail with a continuable OOM. 
If we fail
-    /// to init these buffers, the mem limit is too low to run this algorithm.
-    Status InitStreams();
+    /// aggregation), allocating one buffer for each. Spilling merge 
aggregations must
+    /// have enough reservation for the initial buffer for the stream, so this 
should
+    /// not fail due to OOM. Preaggregations do not reserve any buffers: if 
does not
+    /// have enough reservation for the initial buffer, the aggregated row 
stream is not
+    /// created and an OK status is returned.
+    Status InitStreams() WARN_UNUSED_RESULT;
 
-    /// Initializes the hash table. Returns false on OOM.
-    bool InitHashTable();
+    /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
+    /// Sets 'got_memory' to true if the hash table was initialised or false 
on OOM.
+    Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT;
 
     /// Called in case we need to serialize aggregated rows. This step 
effectively does
     /// a merge aggregation in this node.
-    Status SerializeStreamForSpilling();
+    Status SerializeStreamForSpilling() WARN_UNUSED_RESULT;
 
     /// Closes this partition. If finalize_rows is true, this iterates over 
all rows
     /// in aggregated_row_stream and finalizes them (this is only used in the 
cancellation
     /// path).
     void Close(bool finalize_rows);
 
-    /// Spills this partition, unpinning streams and cleaning up hash tables 
as necessary.
-    Status Spill();
+    /// Spill this partition. 'more_aggregate_rows' = true means that more 
aggregate rows
+    /// may be appended to the the partition before appending unaggregated 
rows. On
+    /// success, one of the streams is left with a write iterator: the 
aggregated stream
+    /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
+    Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT;
+
+    /// Discards the aggregated row stream and hash table. Only valid to call 
if this is
+    /// a streaming preaggregation and the initial memory allocation for hash 
tables or
+    /// the aggregated stream failed. The aggregated stream must have 0 rows.
+    void DiscardAggregatedRowStream();
 
     bool is_spilled() const { return hash_tbl.get() == NULL; }
 
@@ -390,9 +405,12 @@ class PartitionedAggregationNode : public ExecNode {
     /// etc.
     const int level;
 
+    /// The index of this partition within 'hash_partitions_' at its level.
+    const int idx;
+
     /// Hash table for this partition.
     /// Can be NULL if this partition is no longer maintaining a hash table 
(i.e.
-    /// is spilled).
+    /// is spilled or we are passing through all rows for this partition).
     boost::scoped_ptr<HashTable> hash_tbl;
 
     /// Clone of parent's agg_fn_evals_ and backing MemPool.
@@ -401,18 +419,24 @@ class PartitionedAggregationNode : public ExecNode {
 
     /// Tuple stream used to store aggregated rows. When the partition is not 
spilled,
     /// (meaning the hash table is maintained), this stream is pinned and 
contains the
-    /// memory referenced by the hash table. When it is spilled, aggregate 
rows are
-    /// just appended to this stream.
-    boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
+    /// memory referenced by the hash table. When it is spilled, this consumes 
reservation
+    /// for a write buffer only during repartitioning of aggregated rows.
+    ///
+    /// For streaming preaggs, this may be NULL if sufficient memory is not 
available.
+    /// In that case hash_tbl is also NULL and all rows for the partition will 
be passed
+    /// through.
+    boost::scoped_ptr<BufferedTupleStreamV2> aggregated_row_stream;
 
     /// Unaggregated rows that are spilled. Always NULL for streaming 
pre-aggregations.
-    boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
+    /// Always unpinned. Has a write buffer allocated when the partition is 
spilled and
+    /// unaggregated rows are being processed.
+    boost::scoped_ptr<BufferedTupleStreamV2> unaggregated_row_stream;
   };
 
   /// Stream used to store serialized spilled rows. Only used if 
needs_serialize_
   /// is set. This stream is never pinned and only used in Partition::Spill as 
a
   /// a temporary buffer.
-  boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
+  boost::scoped_ptr<BufferedTupleStreamV2> serialize_stream_;
 
   /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
   HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
@@ -447,7 +471,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// FunctionContexts, so is stored outside the stream. If stream's small 
buffers get
   /// full, it will attempt to switch to IO-buffers.
   Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& 
agg_fn_evals,
-      BufferedTupleStream* stream, Status* status) noexcept;
+      BufferedTupleStreamV2* stream, Status* status) noexcept;
 
   /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
   /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
@@ -495,7 +519,7 @@ class PartitionedAggregationNode : public ExecNode {
 
   /// Do the aggregation for all tuple rows in the batch when there is no 
grouping.
   /// This function is replaced by codegen.
-  Status ProcessBatchNoGrouping(RowBatch* batch);
+  Status ProcessBatchNoGrouping(RowBatch* batch) WARN_UNUSED_RESULT;
 
   /// Processes a batch of rows. This is the core function of the algorithm. 
We partition
   /// the rows into hash_partitions_, spilling as necessary.
@@ -507,9 +531,9 @@ class PartitionedAggregationNode : public ExecNode {
   //
   /// This function is replaced by codegen. We pass in ht_ctx_.get() as an 
argument for
   /// performance.
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch,
-      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, TPrefetchMode::type 
prefetch_mode,
+      HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
 
   /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the 
results in
   /// the expression values cache in 'ht_ctx'. The number of rows evaluated 
depends on
@@ -524,7 +548,8 @@ class PartitionedAggregationNode : public ExecNode {
   /// ProcessBatch for codegen to substitute function calls with codegen'd 
versions.
   /// May spill partitions if not enough memory is available.
   template <bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx);
+  Status IR_ALWAYS_INLINE ProcessRow(
+      TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
 
   /// Create a new intermediate tuple in partition, initialized with row. 
ht_ctx is
   /// the context for the partition's hash table and hash is the precomputed 
hash of
@@ -533,35 +558,33 @@ class PartitionedAggregationNode : public ExecNode {
   /// tuple to the partition's stream. Must be inlined into ProcessBatch for 
codegen
   /// to substitute function calls with codegen'd versions.  insert_it is an 
iterator
   /// for insertion returned from HashTable::FindBuildRowBucket().
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition,
-      TupleRow* row, uint32_t hash, HashTable::Iterator insert_it);
-
-  /// Append a row to a spilled partition. May spill partitions if needed to 
switch to
-  /// I/O buffers. Selects the correct stream according to the argument. 
Inlined into
-  /// ProcessBatch().
-  template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE AppendSpilledRow(Partition* partition, TupleRow* 
row);
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* 
row,
+      uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT;
 
-  /// Append a row to a stream of a spilled partition. May spill partitions if 
needed
-  /// to append the row.
-  Status AppendSpilledRow(BufferedTupleStream* stream, TupleRow* row);
+  /// Append a row to a spilled partition. The row may be aggregated or 
unaggregated
+  /// according to AGGREGATED_ROWS. May spill partitions if needed to append 
the row
+  /// buffers.
+  template <bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE AppendSpilledRow(
+      Partition* partition, TupleRow* row) WARN_UNUSED_RESULT;
 
   /// Reads all the rows from input_stream and process them by calling 
ProcessBatch().
-  template<bool AGGREGATED_ROWS>
-  Status ProcessStream(BufferedTupleStream* input_stream);
+  template <bool AGGREGATED_ROWS>
+  Status ProcessStream(BufferedTupleStreamV2* input_stream) WARN_UNUSED_RESULT;
 
   /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
   void GetSingletonOutput(RowBatch* row_batch);
 
   /// Get rows for the next rowbatch from the next partition. Sets 
'partition_eos_' to
   /// true if all rows from all partitions have been returned or the limit is 
reached.
-  Status GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch);
+  Status GetRowsFromPartition(
+      RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Get output rows from child for streaming pre-aggregation. Aggregates 
some rows with
   /// hash table and passes through other rows converted into the intermediate
   /// tuple format. Sets 'child_eos_' once all rows from child have been 
returned.
-  Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch);
+  Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) 
WARN_UNUSED_RESULT;
 
   /// Return true if we should keep expanding hash tables in the preagg. If 
false,
   /// the preagg should pass through any rows it can't fit in its tables.
@@ -582,7 +605,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the 
optimiser.
   Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type 
prefetch_mode,
       RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
-      int remaining_capacity[PARTITION_FANOUT]);
+      int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT;
 
   /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' 
for streaming
   /// aggregation. The input row must have been evaluated with 'ht_ctx', with 
'hash' set
@@ -592,18 +615,24 @@ class PartitionedAggregationNode : public ExecNode {
   /// keeps track of how many more entries can be added to the hash table so 
we can avoid
   /// retrying inserts. It is decremented if an insert succeeds and set to 
zero if an
   /// insert fails. If an error occurs, returns false and sets 'status'.
-  bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx,
-      Partition* partition, HashTable* hash_tbl, TupleRow* in_row, uint32_t 
hash,
-      int* remaining_capacity, Status* status);
+  bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* 
partition,
+      HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* 
remaining_capacity,
+      Status* status) WARN_UNUSED_RESULT;
 
   /// Initializes hash_partitions_. 'level' is the level for the partitions to 
create.
+  /// If 'single_partition_idx' is provided, it must be a number in range
+  /// [0, PARTITION_FANOUT), and only that partition is created - the others 
are
+  /// initialized to NULL.
   /// Also sets ht_ctx_'s level to 'level'.
-  Status CreateHashPartitions(int level);
+  Status CreateHashPartitions(
+      int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;
 
   /// Ensure that hash tables for all in-memory partitions are large enough to 
fit
   /// 'num_rows' additional hash table entries. If there is not enough memory 
to
-  /// resize the hash tables, may spill partitions.
-  Status CheckAndResizeHashPartitions(int num_rows, const HashTableCtx* 
ht_ctx);
+  /// resize the hash tables, may spill partitions. 'aggregated_rows' is true 
if
+  /// we're currently partitioning aggregated rows.
+  Status CheckAndResizeHashPartitions(
+      bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) 
WARN_UNUSED_RESULT;
 
   /// Iterates over all the partitions in hash_partitions_ and returns the 
number of rows
   /// of the largest spilled partition (in terms of number of aggregated and 
unaggregated
@@ -614,16 +643,39 @@ class PartitionedAggregationNode : public ExecNode {
   /// initializes output_iterator_ and output_partition_. This either removes
   /// a partition from aggregated_partitions_ (and is done) or removes the next
   /// partition from aggregated_partitions_ and repartitions it.
-  Status NextPartition();
-
-  /// Picks a partition from hash_partitions_ to spill.
-  Status SpillPartition();
+  Status NextPartition() WARN_UNUSED_RESULT;
+
+  /// Tries to build the first partition in 'spilled_partitions_'.
+  /// If successful, set *built_partition to the partition. The caller owns 
the partition
+  /// and is responsible for closing it. If unsuccessful because the partition 
could not
+  /// fit in memory, set *built_partition to NULL and append the spilled 
partition to the
+  /// head of 'spilled_partitions_' so it can be processed by
+  /// RepartitionSpilledPartition().
+  Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT;
+
+  /// Repartitions the first partition in 'spilled_partitions_' into 
PARTITION_FANOUT
+  /// output partitions. On success, each output partition is either:
+  /// * closed, if no rows were added to the partition.
+  /// * in 'spilled_partitions_', if the partition spilled.
+  /// * in 'aggregated_partitions_', if the output partition was not spilled.
+  Status RepartitionSpilledPartition() WARN_UNUSED_RESULT;
+
+  /// Picks a partition from 'hash_partitions_' to spill. 
'more_aggregate_rows' is passed
+  /// to Partition::Spill() when spilling the partition. See the 
Partition::Spill()
+  /// comment for further explanation.
+  Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT;
 
   /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
   /// spilled_partitions_. Partitions moved to spilled_partitions_ are 
unpinned.
   /// input_rows is the number of input rows that have been repartitioned.
   /// Used for diagnostics.
-  Status MoveHashPartitions(int64_t input_rows);
+  Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT;
+
+  /// Adds a partition to the front of 'spilled_partitions_' for later 
processing.
+  /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions 
are processed
+  /// first). This allows us to delete pages earlier and bottom out the 
recursion
+  /// earlier and also improves time locality of access to spilled data on 
disk.
+  void PushSpilledPartition(Partition* partition);
 
   /// Calls Close() on every Partition in 'aggregated_partitions_',
   /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
@@ -638,7 +690,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// and returns the IR function in 'fn'. Returns non-OK status if codegen
   /// is unsuccessful.
   Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
-      SlotDescriptor* slot_desc, llvm::Function** fn);
+      SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
 
   /// Codegen a call to a function implementing the UDA interface with input 
values
   /// from 'input_vals'. 'dst_val' should contain the previous value of the 
aggregate
@@ -647,10 +699,10 @@ class PartitionedAggregationNode : public ExecNode {
   /// the insert position of 'builder'.
   Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* 
agg_fn,
       llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& 
input_vals,
-      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val);
+      const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) 
WARN_UNUSED_RESULT;
 
   /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
-  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn);
+  Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) 
WARN_UNUSED_RESULT;
 
   /// Codegen the non-streaming process row batch loop. The loop has already 
been
   /// compiled to IR and loaded into the codegen object. UpdateAggTuple has 
also been
@@ -659,26 +711,28 @@ class PartitionedAggregationNode : public ExecNode {
   /// 'process_batch_no_grouping_fn_' will be updated with the codegened 
function
   /// depending on whether this is a grouping or non-grouping aggregation.
   /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenProcessBatch(LlvmCodeGen* codegen, TPrefetchMode::type 
prefetch_mode);
+  Status CodegenProcessBatch(
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
 
   /// Codegen the materialization loop for streaming preaggregations.
   /// 'process_batch_streaming_fn_' will be updated with the codegened 
function.
   Status CodegenProcessBatchStreaming(
-      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode);
+      LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) 
WARN_UNUSED_RESULT;
 
-  /// We need two buffers per partition, one for the aggregated stream and one
-  /// for the unaggregated stream. We need an additional buffer to read the 
stream
-  /// we are currently repartitioning.
+  /// Compute minimum buffer requirement for grouping aggregations.
+  /// We need one buffer per partition, which is used either as the write 
buffer for the
+  /// aggregated stream or the unaggregated stream. We need an additional 
buffer to read
+  /// the stream we are currently repartitioning.
   /// If we need to serialize, we need an additional buffer while spilling a 
partition
   /// as the partitions aggregate stream needs to be serialized and rewritten.
   /// We do not spill streaming preaggregations, so we do not need to reserve 
any buffers.
   int MinRequiredBuffers() const {
-    // Must be kept in sync with AggregationNode.computeResourceProfile() in 
fe.
-    if (is_streaming_preagg_) return 0;
-    return 2 * PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
+    DCHECK(!grouping_exprs_.empty());
+    // Must be kept in sync with AggregationNode.computeNodeResourceProfile() 
in fe.
+    if (is_streaming_preagg_) return 0; // Need 0 buffers to pass through rows.
+    return PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
   }
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc 
b/be/src/exec/partitioned-hash-join-builder-ir.cc
index e5f649e..df58036 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -19,7 +19,7 @@
 
 #include "codegen/impala-ir.h"
 #include "exec/hash-table.inline.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.h"
@@ -30,7 +30,7 @@
 using namespace impala;
 
 inline bool PhjBuilder::AppendRow(
-    BufferedTupleStream* stream, TupleRow* row, Status* status) {
+    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
   if (LIKELY(stream->AddRow(row, status))) return true;
   if (UNLIKELY(!status->ok())) return false;
   return AppendRowStreamFull(stream, row, status);
@@ -73,12 +73,12 @@ Status PhjBuilder::ProcessBuildBatch(
 
 bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode,
     HashTableCtx* ht_ctx, RowBatch* batch,
-    const vector<BufferedTupleStream::RowIdx>& indices) {
+    const vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, Status* 
status) {
   // Compute the hash values and prefetch the hash table buckets.
   const int num_rows = batch->num_rows();
   HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   const int prefetch_size = expr_vals_cache->capacity();
-  const BufferedTupleStream::RowIdx* row_indices = indices.data();
+  const BufferedTupleStreamV2::FlatRowPtr* flat_rows_data = flat_rows.data();
   for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
        prefetch_group_row += prefetch_size) {
     int cur_row = prefetch_group_row;
@@ -97,9 +97,9 @@ bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type 
prefetch_mode,
     expr_vals_cache->ResetForRead();
     FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
       TupleRow* row = batch_iter.Get();
-      BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+      BufferedTupleStreamV2::FlatRowPtr flat_row = flat_rows_data[cur_row];
       if (!expr_vals_cache->IsRowNull()
-          && UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
+          && UNLIKELY(!hash_tbl_->Insert(ht_ctx, flat_row, row, status))) {
         return false;
       }
       expr_vals_cache->NextRow();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 4a5885b..a2f7c96 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -25,8 +25,10 @@
 #include "exec/hash-table.inline.h"
 #include "exprs/scalar-expr.h"
 #include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/runtime-filter.h"
@@ -44,19 +46,23 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
     "the memory limit may help this query to complete successfully.";
 
 using namespace impala;
-using namespace llvm;
-using namespace strings;
-using std::unique_ptr;
+using llvm::ConstantInt;
+using llvm::Function;
+using llvm::Type;
+using llvm::Value;
+using strings::Substitute;
 
 PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
     const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
-    RuntimeState* state)
+    RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
+    int64_t spillable_buffer_size)
   : DataSink(build_row_desc),
     runtime_state_(state),
     join_node_id_(join_node_id),
     join_op_(join_op),
     probe_row_desc_(probe_row_desc),
-    block_mgr_client_(NULL),
+    buffer_pool_client_(buffer_pool_client),
+    spillable_buffer_size_(spillable_buffer_size),
     non_empty_build_(false),
     partitions_created_(NULL),
     largest_partition_percent_(NULL),
@@ -137,9 +143,6 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker)
     RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
&pool_,
         expr_mem_pool(), &filter_ctxs_[i].expr_eval));
   }
-  RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
-      Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
-      MinRequiredBuffers(), true, mem_tracker_.get(), state, 
&block_mgr_client_));
 
   partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
   largest_partition_percent_ =
@@ -169,6 +172,11 @@ Status PhjBuilder::Open(RuntimeState* state) {
   for (const FilterContext& ctx : filter_ctxs_) {
     RETURN_IF_ERROR(ctx.expr_eval->Open(state));
   }
+  if (ht_allocator_ == nullptr) {
+    // Create 'ht_allocator_' on the first call to Open().
+    ht_allocator_.reset(new Suballocator(
+        state->exec_env()->buffer_pool(), buffer_pool_client_, 
spillable_buffer_size_));
+  }
   RETURN_IF_ERROR(CreateHashPartitions(0));
   AllocateRuntimeFilters();
 
@@ -248,7 +256,6 @@ void PhjBuilder::Close(RuntimeState* state) {
     if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state);
   }
   ScalarExpr::Close(filter_exprs_);
-  if (block_mgr_client_ != NULL) 
state->block_mgr()->ClearReservations(block_mgr_client_);
   ScalarExpr::Close(build_exprs_);
   pool_.Clear();
   DataSink::Close(state);
@@ -264,13 +271,11 @@ void PhjBuilder::Reset() {
 Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) 
{
   all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
   *partition = all_partitions_.back().get();
-  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), 
true));
+  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, true));
   bool got_buffer;
   RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
-  if (!got_buffer) {
-    return runtime_state_->block_mgr()->MemLimitTooLowError(
-        block_mgr_client_, join_node_id_);
-  }
+  DCHECK(got_buffer)
+      << "Accounted in min reservation" << buffer_pool_client_->DebugString();
   return Status::OK();
 }
 
@@ -288,22 +293,11 @@ Status PhjBuilder::CreateHashPartitions(int level) {
 }
 
 bool PhjBuilder::AppendRowStreamFull(
-    BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
+    BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept {
   while (true) {
-    // Check if the stream is still using small buffers and try to switch to 
IO-buffers.
-    if (stream->using_small_buffers()) {
-      bool got_buffer;
-      *status = stream->SwitchToIoBuffers(&got_buffer);
-      if (!status->ok()) return false;
-
-      if (got_buffer) {
-        if (LIKELY(stream->AddRow(row, status))) return true;
-        if (!status->ok()) return false;
-      }
-    }
     // We ran out of memory. Pick a partition to spill. If we ran out of 
unspilled
     // partitions, SpillPartition() will return an error status.
-    *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    *status = SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
     if (!status->ok()) return false;
     if (stream->AddRow(row, status)) return true;
     if (!status->ok()) return false;
@@ -313,7 +307,7 @@ bool PhjBuilder::AppendRowStreamFull(
 }
 
 // TODO: can we do better with a different spilling heuristic?
-Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
+Status PhjBuilder::SpillPartition(BufferedTupleStreamV2::UnpinMode mode) {
   DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
   int64_t max_freed_mem = 0;
   int partition_idx = -1;
@@ -323,7 +317,7 @@ Status 
PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
     Partition* candidate = hash_partitions_[i];
     if (candidate->IsClosed()) continue;
     if (candidate->is_spilled()) continue;
-    int64_t mem = candidate->build_rows()->bytes_in_mem(false);
+    int64_t mem = candidate->build_rows()->BytesPinned(false);
     if (candidate->hash_tbl() != NULL) {
       // The hash table should not have matches, since we have not probed it 
yet.
       // Losing match info would lead to incorrect results (IMPALA-1488).
@@ -337,9 +331,9 @@ Status 
PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) {
   }
 
   if (partition_idx == -1) {
-    // Could not find a partition to spill. This means the mem limit was just 
too low.
-    return runtime_state_->block_mgr()->MemLimitTooLowError(
-        block_mgr_client_, join_node_id_);
+    return Status(Substitute("Internal error: could not find a partition to 
spill in "
+                             " hash join $1: \n$2\nClient:\n$3",
+        join_node_id_, DebugString(), buffer_pool_client_->DebugString()));
   }
 
   VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString();
@@ -373,8 +367,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
       partition->Close(NULL);
     } else if (partition->is_spilled()) {
       // We don't need any build-side data for spilled partitions in memory.
-      RETURN_IF_ERROR(
-          
partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+      partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
     }
   }
 
@@ -393,7 +386,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
     RETURN_IF_ERROR(partition->BuildHashTable(&built));
     // If we did not have enough memory to build this hash table, we need to 
spill this
     // partition (clean up the hash table, unpin build).
-    if (!built) 
RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
+    if (!built) 
RETURN_IF_ERROR(partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
   }
 
   // We may have spilled additional partitions while building hash tables, we 
need to
@@ -429,11 +422,11 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
 
   while (probe_streams_to_create > 0) {
     // Create stream in vector, so that it will be cleaned up after any 
failure.
-    
spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>(
-        runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), 
block_mgr_client_,
-        false /* use_initial_small_buffers */, false /* read_write */));
-    BufferedTupleStream* probe_stream = 
spilled_partition_probe_streams_.back().get();
-    RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false));
+    spilled_partition_probe_streams_.emplace_back(
+        make_unique<BufferedTupleStreamV2>(runtime_state_, probe_row_desc_,
+            buffer_pool_client_, spillable_buffer_size_, 
spillable_buffer_size_));
+    BufferedTupleStreamV2* probe_stream = 
spilled_partition_probe_streams_.back().get();
+    RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false));
 
     // Loop until either the stream gets a buffer or all partitions are 
spilled (in which
     // case SpillPartition() returns an error).
@@ -442,7 +435,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
       RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
       if (got_buffer) break;
 
-      RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL));
+      RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL));
       ++probe_streams_to_create;
     }
     --probe_streams_to_create;
@@ -450,7 +443,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
   return Status::OK();
 }
 
-vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
+vector<unique_ptr<BufferedTupleStreamV2>> PhjBuilder::TransferProbeStreams() {
   return std::move(spilled_partition_probe_streams_);
 }
 
@@ -460,7 +453,7 @@ void PhjBuilder::CloseAndDeletePartitions() {
   all_partitions_.clear();
   hash_partitions_.clear();
   null_aware_partition_ = NULL;
-  for (unique_ptr<BufferedTupleStream>& stream : 
spilled_partition_probe_streams_) {
+  for (unique_ptr<BufferedTupleStreamV2>& stream : 
spilled_partition_probe_streams_) {
     stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   spilled_partition_probe_streams_.clear();
@@ -512,14 +505,14 @@ void PhjBuilder::PublishRuntimeFilters(int64_t 
num_build_rows) {
 }
 
 Status PhjBuilder::RepartitionBuildInput(
-    Partition* input_partition, int level, BufferedTupleStream* 
input_probe_rows) {
+    Partition* input_partition, int level, BufferedTupleStreamV2* 
input_probe_rows) {
   DCHECK_GE(level, 1);
   SCOPED_TIMER(repartition_timer_);
   COUNTER_ADD(num_repartitions_, 1);
   RuntimeState* state = runtime_state_;
 
   // Setup the read buffer and the new partitions.
-  BufferedTupleStream* build_rows = input_partition->build_rows();
+  BufferedTupleStreamV2* build_rows = input_partition->build_rows();
   DCHECK(build_rows != NULL);
   bool got_read_buffer;
   RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
@@ -552,7 +545,7 @@ Status PhjBuilder::RepartitionBuildInput(
     bool got_buffer;
     RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer));
     if (got_buffer) break;
-    
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    
RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT));
   }
 
   RETURN_IF_ERROR(FlushFinal(state));
@@ -580,12 +573,9 @@ bool PhjBuilder::HashTableStoresNulls() const {
 
 PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int 
level)
   : parent_(parent), is_spilled_(false), level_(level) {
-  // If we're repartitioning, we can assume the build input is fairly large 
and small
-  // buffers will most likely just waste memory.
-  bool use_initial_small_buffers = level == 0;
-  build_rows_ =
-      std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, 
state->block_mgr(),
-          parent_->block_mgr_client_, use_initial_small_buffers, false /* 
read_write */);
+  build_rows_ = make_unique<BufferedTupleStreamV2>(state, parent_->row_desc_,
+      parent_->buffer_pool_client_, parent->spillable_buffer_size_,
+      parent->spillable_buffer_size_);
 }
 
 PhjBuilder::Partition::~Partition() {
@@ -612,30 +602,15 @@ void PhjBuilder::Partition::Close(RowBatch* batch) {
   }
 }
 
-Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) {
+Status PhjBuilder::Partition::Spill(BufferedTupleStreamV2::UnpinMode mode) {
   DCHECK(!IsClosed());
-  // Close the hash table as soon as possible to release memory.
+  
RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker()));
+  // Close the hash table and unpin the stream backing it to free memory.
   if (hash_tbl() != NULL) {
     hash_tbl_->Close();
     hash_tbl_.reset();
   }
-
-  // Unpin the stream as soon as possible to increase the chances that the
-  // SwitchToIoBuffers() call below will succeed.
-  RETURN_IF_ERROR(build_rows_->UnpinStream(mode));
-
-  if (build_rows_->using_small_buffers()) {
-    bool got_buffer;
-    RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
-    if (!got_buffer) {
-      // We'll try again to get the buffers when the stream fills up the small 
buffers.
-      VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for 
partition "
-                 << this << " of join=" << parent_->join_node_id_
-                 << " build small buffers=" << 
build_rows_->using_small_buffers();
-      VLOG_FILE << GetStackTrace();
-    }
-  }
-
+  build_rows_->UnpinStream(mode);
   if (!is_spilled_) {
     COUNTER_ADD(parent_->num_spilled_partitions_, 1);
     if (parent_->num_spilled_partitions_->value() == 1) {
@@ -652,14 +627,14 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) 
{
   *built = false;
 
   // Before building the hash table, we need to pin the rows in memory.
-  RETURN_IF_ERROR(build_rows_->PinStream(false, built));
+  RETURN_IF_ERROR(build_rows_->PinStream(built));
   if (!*built) return Status::OK();
 
   RuntimeState* state = parent_->runtime_state_;
   HashTableCtx* ctx = parent_->ht_ctx_.get();
   ctx->set_level(level()); // Set the hash function for building the hash 
table.
   RowBatch batch(parent_->row_desc_, state->batch_size(), 
parent_->mem_tracker());
-  vector<BufferedTupleStream::RowIdx> indices;
+  vector<BufferedTupleStreamV2::FlatRowPtr> flat_rows;
   bool eos = false;
 
   // Allocate the partition-local hash table. Initialize the number of buckets 
based on
@@ -674,22 +649,22 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) 
{
   //
   // TODO: Try to allocate the hash table before pinning the stream to avoid 
needlessly
   // reading all of the spilled rows from disk when we won't succeed anyway.
-  int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
-      HashTable::EstimateNumBuckets(build_rows()->num_rows()) :
-      state->batch_size() * 2;
-  hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
+  int64_t estimated_num_buckets = 
HashTable::EstimateNumBuckets(build_rows()->num_rows());
+  hash_tbl_.reset(HashTable::Create(parent_->ht_allocator_.get(),
       true /* store_duplicates */, 
parent_->row_desc_->tuple_descriptors().size(),
       build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
-  if (!hash_tbl_->Init()) goto not_built;
+  bool success;
+  Status status = hash_tbl_->Init(&success);
+  if (!status.ok() || !success) goto not_built;
+  status = build_rows_->PrepareForRead(false, &success);
+  if (!status.ok()) goto not_built;
+  DCHECK(success) << "Stream was already pinned.";
 
-  bool got_read_buffer;
-  RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
-  DCHECK(got_read_buffer) << "Stream was already pinned.";
   do {
-    RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
-    DCHECK_EQ(batch.num_rows(), indices.size());
-    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
-        << build_rows()->RowConsumesMemory();
+    status = build_rows_->GetNext(&batch, &eos, &flat_rows);
+    if (!status.ok()) goto not_built;
+    DCHECK_EQ(batch.num_rows(), flat_rows.size());
+    DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets());
     TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     if (parent_->insert_batch_fn_ != NULL) {
       InsertBatchFn insert_batch_fn;
@@ -699,11 +674,12 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) 
{
         insert_batch_fn = parent_->insert_batch_fn_;
       }
       DCHECK(insert_batch_fn != NULL);
-      if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, 
indices))) {
+      if (UNLIKELY(
+              !insert_batch_fn(this, prefetch_mode, ctx, &batch, flat_rows, 
&status))) {
         goto not_built;
       }
-    } else {
-      if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto 
not_built;
+    } else if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, flat_rows, 
&status))) {
+      goto not_built;
     }
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->GetQueryStatus());
@@ -725,7 +701,7 @@ not_built:
     hash_tbl_->Close();
     hash_tbl_.reset();
   }
-  return Status::OK();
+  return status;
 }
 
 void PhjBuilder::Codegen(LlvmCodeGen* codegen) {
@@ -774,7 +750,8 @@ string PhjBuilder::DebugString() const {
     DCHECK(partition->build_rows() != NULL);
     ss << endl
        << "    Build Rows: " << partition->build_rows()->num_rows()
-       << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << 
")" << endl;
+       << " (Bytes pinned: " << partition->build_rows()->BytesPinned(false) << 
")"
+       << endl;
     if (partition->hash_tbl() != NULL) {
       ss << "    Hash Table Rows: " << partition->hash_tbl()->size() << endl;
     }

Reply via email to