http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 7067961..fc0a4a6 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -31,10 +31,12 @@ #include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "gutil/strings/substitute.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" #include "runtime/descriptors.h" +#include "runtime/exec-env.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" +#include "runtime/query-state.h" #include "runtime/raw-value.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -111,7 +113,6 @@ PartitionedAggregationNode::PartitionedAggregationNode( needs_finalize_(tnode.agg_node.need_finalize), is_streaming_preagg_(tnode.agg_node.use_streaming_preaggregation), needs_serialize_(false), - block_mgr_client_(NULL), output_partition_(NULL), process_batch_no_grouping_fn_(NULL), process_batch_fn_(NULL), @@ -224,24 +225,6 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, grouping_exprs_, true, vector<bool>(build_exprs_.size(), true), state->fragment_hash_seed(), MAX_PARTITION_DEPTH, 1, expr_mem_pool(), &ht_ctx_)); - RETURN_IF_ERROR(state_->block_mgr()->RegisterClient( - Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this), - MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_)); - } - - // TODO: Is there a need to create the stream here? If memory reservations work we may - // be able to create this stream lazily and only whenever we need to spill. - if (!is_streaming_preagg_ && needs_serialize_ && block_mgr_client_ != NULL) { - serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_, - state->block_mgr(), block_mgr_client_, false /* use_initial_small_buffers */, - false /* read_write */)); - RETURN_IF_ERROR(serialize_stream_->Init(id(), runtime_profile(), false)); - bool got_buffer; - RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer)); - if (!got_buffer) { - return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id()); - } - DCHECK(serialize_stream_->has_write_block()); } AddCodegenDisabledMessage(state); return Status::OK(); @@ -265,8 +248,16 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); // Open the child before consuming resources in this node. RETURN_IF_ERROR(child(0)->Open(state)); - RETURN_IF_ERROR(ExecNode::Open(state)); + + // Claim reservation after the child has been opened to reduce the peak reservation + // requirement. + if (!buffer_pool_client_.is_registered() && !grouping_exprs_.empty()) { + DCHECK_GE(resource_profile_.min_reservation, + resource_profile_.spillable_buffer_size * MinRequiredBuffers()); + RETURN_IF_ERROR(ClaimBufferReservation(state)); + } + if (ht_ctx_.get() != nullptr) RETURN_IF_ERROR(ht_ctx_->Open(state)); RETURN_IF_ERROR(AggFnEvaluator::Open(agg_fn_evals_, state)); if (grouping_exprs_.empty()) { @@ -278,6 +269,25 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { RETURN_IF_ERROR(state_->GetQueryStatus()); singleton_output_tuple_returned_ = false; } else { + if (ht_allocator_ == nullptr) { + // Allocate 'serialize_stream_' and 'ht_allocator_' on the first Open() call. + ht_allocator_.reset(new Suballocator(state_->exec_env()->buffer_pool(), + &buffer_pool_client_, resource_profile_.spillable_buffer_size)); + + if (!is_streaming_preagg_ && needs_serialize_) { + serialize_stream_.reset(new BufferedTupleStreamV2(state, &intermediate_row_desc_, + &buffer_pool_client_, resource_profile_.spillable_buffer_size, + resource_profile_.spillable_buffer_size)); + RETURN_IF_ERROR(serialize_stream_->Init(id(), false)); + bool got_buffer; + // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up + // another buffer during spilling. + RETURN_IF_ERROR(serialize_stream_->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) + << "Accounted in min reservation" << buffer_pool_client_.DebugString(); + DCHECK(serialize_stream_->has_write_iterator()); + } + } RETURN_IF_ERROR(CreateHashPartitions(0)); } @@ -520,9 +530,12 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, bool ht_needs_expansion = false; for (int i = 0; i < PARTITION_FANOUT; ++i) { HashTable* hash_tbl = GetHashTable(i); - DCHECK(hash_tbl != NULL); - remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize(); - ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows(); + if (hash_tbl == nullptr) { + remaining_capacity[i] = 0; + } else { + remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize(); + ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows(); + } } // Stop expanding hash tables if we're not reducing the input sufficiently. As our @@ -533,9 +546,12 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, if (ht_needs_expansion && ShouldExpandPreaggHashTables()) { for (int i = 0; i < PARTITION_FANOUT; ++i) { HashTable* ht = GetHashTable(i); - if (remaining_capacity[i] < child_batch_->num_rows()) { + if (ht != nullptr && remaining_capacity[i] < child_batch_->num_rows()) { SCOPED_TIMER(ht_resize_timer_); - if (ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get())) { + bool resized; + RETURN_IF_ERROR( + ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get(), &resized)); + if (resized) { remaining_capacity[i] = ht->NumInsertsBeforeResize(); } } @@ -548,7 +564,7 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity)); } else { RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, prefetch_mode, - child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity )); + child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity)); } child_batch_->Reset(); // All rows from child_batch_ were processed. @@ -557,7 +573,7 @@ Status PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state, if (child_eos_) { child(0)->Close(state); child_batch_.reset(); - MoveHashPartitions(child(0)->rows_returned()); + RETURN_IF_ERROR(MoveHashPartitions(child(0)->rows_returned())); } num_rows_returned_ += out_batch->num_rows(); @@ -570,8 +586,10 @@ bool PartitionedAggregationNode::ShouldExpandPreaggHashTables() const { int64_t ht_rows = 0; for (int i = 0; i < PARTITION_FANOUT; ++i) { HashTable* ht = hash_partitions_[i]->hash_tbl.get(); - ht_mem += ht->CurrentMemSize(); - ht_rows += ht->size(); + if (ht != nullptr) { + ht_mem += ht->CurrentMemSize(); + ht_rows += ht->size(); + } } // Need some rows in tables to have valid statistics. @@ -678,7 +696,6 @@ void PartitionedAggregationNode::Close(RuntimeState* state) { if (serialize_stream_.get() != nullptr) { serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } - if (block_mgr_client_ != nullptr) state->block_mgr()->ClearReservations(block_mgr_client_); ScalarExpr::Close(grouping_exprs_); ScalarExpr::Close(build_exprs_); AggFn::Close(agg_fns_); @@ -705,56 +722,55 @@ Status PartitionedAggregationNode::Partition::InitStreams() { } } - aggregated_row_stream.reset(new BufferedTupleStream(parent->state_, - &parent->intermediate_row_desc_, parent->state_->block_mgr(), - parent->block_mgr_client_, true /* use_initial_small_buffers */, - false /* read_write */, external_varlen_slots)); - RETURN_IF_ERROR( - aggregated_row_stream->Init(parent->id(), parent->runtime_profile(), true)); + aggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_, + &parent->intermediate_row_desc_, &parent->buffer_pool_client_, + parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.spillable_buffer_size, external_varlen_slots)); + RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id(), true)); bool got_buffer; RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer)); if (!got_buffer) { - return parent->state_->block_mgr()->MemLimitTooLowError( - parent->block_mgr_client_, parent->id()); + stringstream ss; + parent->DebugString(2, &ss); + DCHECK(parent->is_streaming_preagg_) + << "Merge agg should have enough reservation " << parent->id_ << "\n" + << parent->buffer_pool_client_.DebugString() << "\n" + << ss.str(); + DiscardAggregatedRowStream(); } if (!parent->is_streaming_preagg_) { - unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_, - parent->child(0)->row_desc(), parent->state_->block_mgr(), - parent->block_mgr_client_, true /* use_initial_small_buffers */, - false /* read_write */)); + unaggregated_row_stream.reset(new BufferedTupleStreamV2(parent->state_, + parent->child(0)->row_desc(), &parent->buffer_pool_client_, + parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.spillable_buffer_size)); // This stream is only used to spill, no need to ever have this pinned. - RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), parent->runtime_profile(), - false)); - // TODO: allocate this buffer later only if we spill the partition. - RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); - if (!got_buffer) { - return parent->state_->block_mgr()->MemLimitTooLowError( - parent->block_mgr_client_, parent->id()); - } - DCHECK(unaggregated_row_stream->has_write_block()); + RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id(), false)); + // Save memory by waiting until we spill to allocate the write buffer for the + // unaggregated row stream. + DCHECK(!unaggregated_row_stream->has_write_iterator()); } return Status::OK(); } -bool PartitionedAggregationNode::Partition::InitHashTable() { - DCHECK(hash_tbl.get() == NULL); +Status PartitionedAggregationNode::Partition::InitHashTable(bool* got_memory) { + DCHECK(aggregated_row_stream != nullptr); + DCHECK(hash_tbl == nullptr); // We use the upper PARTITION_FANOUT num bits to pick the partition so only the // remaining bits can be used for the hash table. // TODO: we could switch to 64 bit hashes and then we don't need a max size. // It might be reasonable to limit individual hash table size for other reasons // though. Always start with small buffers. - hash_tbl.reset(HashTable::Create(parent->state_, parent->block_mgr_client_, - false, 1, NULL, 1L << (32 - NUM_PARTITIONING_BITS), - PAGG_DEFAULT_HASH_TABLE_SZ)); + hash_tbl.reset(HashTable::Create(parent->ht_allocator_.get(), false, 1, nullptr, + 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); // Please update the error message in CreateHashPartitions() if initial size of // hash table changes. - return hash_tbl->Init(); + return hash_tbl->Init(got_memory); } Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { DCHECK(!parent->is_streaming_preagg_); - if (parent->needs_serialize_ && aggregated_row_stream->num_rows() != 0) { + if (parent->needs_serialize_) { // We need to do a lot more work in this case. This step effectively does a merge // aggregation in this node. We need to serialize the intermediates, spill the // intermediates and then feed them into the aggregate function's merge step. @@ -767,70 +783,69 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() { // for those UDAs. DCHECK(parent->serialize_stream_.get() != NULL); DCHECK(!parent->serialize_stream_->is_pinned()); - DCHECK(parent->serialize_stream_->has_write_block()); // Serialize and copy the spilled partition's stream into the new stream. - Status status = Status::OK(); - bool failed_to_add = false; - BufferedTupleStream* new_stream = parent->serialize_stream_.get(); + Status status; + BufferedTupleStreamV2* new_stream = parent->serialize_stream_.get(); HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get()); while (!it.AtEnd()) { Tuple* tuple = it.GetTuple(); it.Next(); AggFnEvaluator::Serialize(agg_fn_evals, tuple); if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) { - failed_to_add = true; - break; + DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error"; + // Even if we can't add to new_stream, finish up processing this agg stream to make + // clean up easier (someone has to finalize this stream and we don't want to remember + // where we are). + parent->CleanupHashTbl(agg_fn_evals, it); + hash_tbl->Close(); + hash_tbl.reset(); + aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + return status; } } - // Even if we can't add to new_stream, finish up processing this agg stream to make - // clean up easier (someone has to finalize this stream and we don't want to remember - // where we are). - if (failed_to_add) { - parent->CleanupHashTbl(agg_fn_evals, it); - hash_tbl->Close(); - hash_tbl.reset(); - aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); - RETURN_IF_ERROR(status); - return parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_, - parent->id()); - } - DCHECK(status.ok()); - aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); aggregated_row_stream.swap(parent->serialize_stream_); // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for // when we need to spill again. We need to have this available before we need // to spill to make sure it is available. This should be acquirable since we just // freed at least one buffer from this partition's (old) aggregated_row_stream. - parent->serialize_stream_.reset( - new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_, - parent->state_->block_mgr(), parent->block_mgr_client_, - false /* use_initial_small_buffers */, false /* read_write */)); - status = parent->serialize_stream_->Init(parent->id(), parent->runtime_profile(), - false); + parent->serialize_stream_.reset(new BufferedTupleStreamV2(parent->state_, + &parent->intermediate_row_desc_, &parent->buffer_pool_client_, + parent->resource_profile_.spillable_buffer_size, + parent->resource_profile_.spillable_buffer_size)); + status = parent->serialize_stream_->Init(parent->id(), false); if (status.ok()) { bool got_buffer; status = parent->serialize_stream_->PrepareForWrite(&got_buffer); - if (status.ok() && !got_buffer) { - status = parent->state_->block_mgr()->MemLimitTooLowError( - parent->block_mgr_client_, parent->id()); - } + DCHECK(!status.ok() || got_buffer) << "Accounted in min reservation"; } if (!status.ok()) { hash_tbl->Close(); hash_tbl.reset(); return status; } - DCHECK(parent->serialize_stream_->has_write_block()); + DCHECK(parent->serialize_stream_->has_write_iterator()); } return Status::OK(); } -Status PartitionedAggregationNode::Partition::Spill() { +void PartitionedAggregationNode::Partition::DiscardAggregatedRowStream() { + DCHECK(parent->is_streaming_preagg_); + DCHECK(aggregated_row_stream != nullptr); + DCHECK_EQ(aggregated_row_stream->num_rows(), 0); + if (hash_tbl != nullptr) hash_tbl->Close(); + hash_tbl.reset(); + aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); + aggregated_row_stream.reset(); +} + +Status PartitionedAggregationNode::Partition::Spill(bool more_aggregate_rows) { + DCHECK(!parent->is_streaming_preagg_); DCHECK(!is_closed); DCHECK(!is_spilled()); + RETURN_IF_ERROR(parent->state_->StartSpilling(parent->mem_tracker())); RETURN_IF_ERROR(SerializeStreamForSpilling()); @@ -846,34 +861,18 @@ Status PartitionedAggregationNode::Partition::Spill() { hash_tbl->Close(); hash_tbl.reset(); - // Try to switch both streams to IO-sized buffers to avoid allocating small buffers - // for spilled partition. - bool got_buffer = true; - if (aggregated_row_stream->using_small_buffers()) { - RETURN_IF_ERROR(aggregated_row_stream->SwitchToIoBuffers(&got_buffer)); - } - // Unpin the stream as soon as possible to increase the chances that the - // SwitchToIoBuffers() call below will succeed. If we're repartitioning, rows that - // were already aggregated (rows from the input partition's aggregated stream) will - // need to be added to this hash partition's aggregated stream, so we need to leave - // the write block pinned. - // TODO: when not repartitioning, don't leave the write block pinned. - DCHECK(!got_buffer || aggregated_row_stream->has_write_block()) - << aggregated_row_stream->DebugString(); - RETURN_IF_ERROR( - aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); - - if (got_buffer && unaggregated_row_stream->using_small_buffers()) { - RETURN_IF_ERROR(unaggregated_row_stream->SwitchToIoBuffers(&got_buffer)); - } - if (!got_buffer) { - // We'll try again to get the buffers when the stream fills up the small buffers. - VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition " - << this << " of agg=" << parent->id_ << " agg small buffers=" - << aggregated_row_stream->using_small_buffers() - << " unagg small buffers=" - << unaggregated_row_stream->using_small_buffers(); - VLOG_FILE << GetStackTrace(); + // Unpin the stream to free memory, but leave a write buffer in place so we can + // continue appending rows to one of the streams in the partition. + DCHECK(aggregated_row_stream->has_write_iterator()); + DCHECK(!unaggregated_row_stream->has_write_iterator()); + if (more_aggregate_rows) { + aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + } else { + aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + bool got_buffer; + RETURN_IF_ERROR(unaggregated_row_stream->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) + << "Accounted in min reservation" << parent->buffer_pool_client_.DebugString(); } COUNTER_ADD(parent->num_spilled_partitions_, 1); @@ -933,33 +932,27 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStream* stream, + const vector<AggFnEvaluator*>& agg_fn_evals, BufferedTupleStreamV2* stream, Status* status) noexcept { DCHECK(stream != NULL && status != NULL); // Allocate space for the entire tuple in the stream. const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); - uint8_t* varlen_buffer; - uint8_t* fixed_buffer = stream->AllocateRow(fixed_size, varlen_size, &varlen_buffer, - status); - if (UNLIKELY(fixed_buffer == NULL)) { - if (!status->ok() || !stream->using_small_buffers()) return NULL; - // IMPALA-2352: Make a best effort to switch to IO buffers and re-allocate. - // If SwitchToIoBuffers() fails the caller of this function can try to free - // some space, e.g. through spilling, and re-attempt to allocate space for - // this row. - bool got_buffer; - *status = stream->SwitchToIoBuffers(&got_buffer); - if (!status->ok() || !got_buffer) return NULL; - fixed_buffer = stream->AllocateRow(fixed_size, varlen_size, &varlen_buffer, status); - if (fixed_buffer == NULL) return NULL; - } - - Tuple* intermediate_tuple = reinterpret_cast<Tuple*>(fixed_buffer); - intermediate_tuple->Init(fixed_size); - CopyGroupingValues(intermediate_tuple, varlen_buffer, varlen_size); - InitAggSlots(agg_fn_evals, intermediate_tuple); - return intermediate_tuple; + const int tuple_size = fixed_size + varlen_size; + uint8_t* tuple_data = stream->AddRowCustomBegin(tuple_size, status); + if (UNLIKELY(tuple_data == nullptr)) { + // If we failed to allocate and did not hit an error (indicated by a non-ok status), + // the caller of this function can try to free some space, e.g. through spilling, and + // re-attempt to allocate space for this row. + return nullptr; + } + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_data); + tuple->Init(fixed_size); + uint8_t* varlen_buffer = tuple_data + fixed_size; + CopyGroupingValues(tuple, varlen_buffer, varlen_size); + InitAggSlots(agg_fn_evals, tuple); + stream->AddRowCustomEnd(tuple_size); + return tuple; } int PartitionedAggregationNode::GroupingExprsVarlenSize() { @@ -1079,30 +1072,30 @@ Tuple* PartitionedAggregationNode::GetOutputTuple( return dst; } -Status PartitionedAggregationNode::AppendSpilledRow(BufferedTupleStream* stream, - TupleRow* row) { - DCHECK(stream != NULL); +template <bool AGGREGATED_ROWS> +Status PartitionedAggregationNode::AppendSpilledRow( + Partition* __restrict__ partition, TupleRow* __restrict__ row) { + DCHECK(!is_streaming_preagg_); + DCHECK(partition->is_spilled()); + BufferedTupleStreamV2* stream = AGGREGATED_ROWS ? + partition->aggregated_row_stream.get() : + partition->unaggregated_row_stream.get(); DCHECK(!stream->is_pinned()); - DCHECK(stream->has_write_block()); - if (LIKELY(stream->AddRow(row, &process_batch_status_))) return Status::OK(); + Status status; + if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); + RETURN_IF_ERROR(status); - // Adding fails iff either we hit an error or haven't switched to I/O buffers. - RETURN_IF_ERROR(process_batch_status_); + // Keep trying to free memory by spilling until we succeed or hit an error. + // Running out of partitions to spill is treated as an error by SpillPartition(). while (true) { - bool got_buffer; - RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer)); - if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition()); + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); + if (stream->AddRow(row, &status)) return Status::OK(); + RETURN_IF_ERROR(status); } - - // Adding the row should succeed after the I/O buffer switch. - if (stream->AddRow(row, &process_batch_status_)) return Status::OK(); - DCHECK(!process_batch_status_.ok()); - return process_batch_status_; } -void PartitionedAggregationNode::DebugString(int indentation_level, - stringstream* out) const { +void PartitionedAggregationNode::DebugString( + int indentation_level, stringstream* out) const { *out << string(indentation_level * 2, ' '); *out << "PartitionedAggregationNode(" << "intermediate_tuple_id=" << intermediate_tuple_id_ @@ -1114,85 +1107,100 @@ void PartitionedAggregationNode::DebugString(int indentation_level, *out << ")"; } -Status PartitionedAggregationNode::CreateHashPartitions(int level) { +Status PartitionedAggregationNode::CreateHashPartitions( + int level, int single_partition_idx) { if (is_streaming_preagg_) DCHECK_EQ(level, 0); if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) { - return Status(TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH); + return Status( + TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH); } ht_ctx_->set_level(level); DCHECK(hash_partitions_.empty()); + int num_partitions_created = 0; for (int i = 0; i < PARTITION_FANOUT; ++i) { - Partition* new_partition = new Partition(this, level); - DCHECK(new_partition != NULL); - hash_partitions_.push_back(partition_pool_->Add(new_partition)); - RETURN_IF_ERROR(new_partition->InitStreams()); - hash_tbls_[i] = NULL; - } - if (!is_streaming_preagg_) { - DCHECK_GT(state_->block_mgr()->num_reserved_buffers_remaining(block_mgr_client_), 0); + hash_tbls_[i] = nullptr; + if (single_partition_idx == -1 || i == single_partition_idx) { + Partition* new_partition = partition_pool_->Add(new Partition(this, level, i)); + ++num_partitions_created; + hash_partitions_.push_back(new_partition); + RETURN_IF_ERROR(new_partition->InitStreams()); + } else { + hash_partitions_.push_back(nullptr); + } } - // Now that all the streams are reserved (meaning we have enough memory to execute // the algorithm), allocate the hash tables. These can fail and we can still continue. for (int i = 0; i < PARTITION_FANOUT; ++i) { - if (UNLIKELY(!hash_partitions_[i]->InitHashTable())) { - // We don't spill on preaggregations. If we have so little memory that we can't - // allocate small hash tables, the mem limit is just too low. - if (is_streaming_preagg_) { - int64_t alloc_size = PAGG_DEFAULT_HASH_TABLE_SZ * HashTable::BucketSize(); - string details = Substitute("Cannot perform aggregation at node with id $0." - " Failed to initialize hash table in preaggregation. The memory limit" - " is too low to execute the query.", id_); - return mem_tracker()->MemLimitExceeded(state_, details, alloc_size); + Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; + if (partition->aggregated_row_stream == nullptr) { + // Failed to create the aggregated row stream - cannot create a hash table. + // Just continue with a NULL hash table so rows will be passed through. + DCHECK(is_streaming_preagg_); + } else { + bool got_memory; + RETURN_IF_ERROR(partition->InitHashTable(&got_memory)); + // Spill the partition if we cannot create a hash table for a merge aggregation. + if (UNLIKELY(!got_memory)) { + if (is_streaming_preagg_) { + partition->DiscardAggregatedRowStream(); + } else { + // If we're repartitioning, we will be writing aggregated rows first. + RETURN_IF_ERROR(partition->Spill(level > 0)); + } } - RETURN_IF_ERROR(hash_partitions_[i]->Spill()); } - hash_tbls_[i] = hash_partitions_[i]->hash_tbl.get(); + hash_tbls_[i] = partition->hash_tbl.get(); } - COUNTER_ADD(partitions_created_, hash_partitions_.size()); + COUNTER_ADD(partitions_created_, num_partitions_created); if (!is_streaming_preagg_) { COUNTER_SET(max_partition_level_, level); } return Status::OK(); } -Status PartitionedAggregationNode::CheckAndResizeHashPartitions(int num_rows, - const HashTableCtx* ht_ctx) { +Status PartitionedAggregationNode::CheckAndResizeHashPartitions( + bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) { DCHECK(!is_streaming_preagg_); for (int i = 0; i < PARTITION_FANOUT; ++i) { Partition* partition = hash_partitions_[i]; + if (partition == nullptr) continue; while (!partition->is_spilled()) { { SCOPED_TIMER(ht_resize_timer_); - if (partition->hash_tbl->CheckAndResize(num_rows, ht_ctx)) break; + bool resized; + RETURN_IF_ERROR(partition->hash_tbl->CheckAndResize(num_rows, ht_ctx, &resized)); + if (resized) break; } - RETURN_IF_ERROR(SpillPartition()); + RETURN_IF_ERROR(SpillPartition(partitioning_aggregated_rows)); } } return Status::OK(); } int64_t PartitionedAggregationNode::LargestSpilledPartition() const { + DCHECK(!is_streaming_preagg_); int64_t max_rows = 0; for (int i = 0; i < hash_partitions_.size(); ++i) { Partition* partition = hash_partitions_[i]; - if (partition->is_closed || !partition->is_spilled()) continue; - int64_t rows = partition->aggregated_row_stream->num_rows() + - partition->unaggregated_row_stream->num_rows(); + if (partition == nullptr || partition->is_closed || !partition->is_spilled()) { + continue; + } + int64_t rows = partition->aggregated_row_stream->num_rows() + + partition->unaggregated_row_stream->num_rows(); if (rows > max_rows) max_rows = rows; } return max_rows; } Status PartitionedAggregationNode::NextPartition() { - DCHECK(output_partition_ == NULL); + DCHECK(output_partition_ == nullptr); // Keep looping until we get to a partition that fits in memory. - Partition* partition = NULL; + Partition* partition = nullptr; while (true) { - partition = NULL; // First return partitions that are fully aggregated (and in memory). if (!aggregated_partitions_.empty()) { partition = aggregated_partitions_.front(); @@ -1201,56 +1209,23 @@ Status PartitionedAggregationNode::NextPartition() { break; } - if (partition == NULL) { - DCHECK(!spilled_partitions_.empty()); - DCHECK(!is_streaming_preagg_); - DCHECK_EQ(state_->block_mgr()->num_pinned_buffers(block_mgr_client_), - needs_serialize_ ? 1 : 0); - - // TODO: we can probably do better than just picking the first partition. We - // can base this on the amount written to disk, etc. - partition = spilled_partitions_.front(); - DCHECK(partition->is_spilled()); - - // Create the new hash partitions to repartition into. - // TODO: we don't need to repartition here. We are now working on 1 / FANOUT - // of the input so it's reasonably likely it can fit. We should look at this - // partitions size and just do the aggregation if it fits in memory. - RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1)); - COUNTER_ADD(num_repartitions_, 1); - - // Rows in this partition could have been spilled into two streams, depending - // on if it is an aggregated intermediate, or an unaggregated row. - // Note: we must process the aggregated rows first to save a hash table lookup - // in ProcessBatch(). - RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get())); - RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get())); - - COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows()); - COUNTER_ADD(num_row_repartitioned_, - partition->unaggregated_row_stream->num_rows()); + // No aggregated partitions in memory - we should not be using any reservation aside + // from 'serialize_stream_'. + DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, + buffer_pool_client_.GetUsedReservation()) << buffer_pool_client_.DebugString(); - partition->Close(false); - spilled_partitions_.pop_front(); - - // Done processing this partition. Move the new partitions into - // spilled_partitions_/aggregated_partitions_. - int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + - partition->unaggregated_row_stream->num_rows(); - - // Check if there was any reduction in the size of partitions after repartitioning. - int64_t largest_partition = LargestSpilledPartition(); - DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with " - "more rows than the input"; - if (UNLIKELY(num_input_rows == largest_partition)) { - return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_, - partition->level + 1, num_input_rows); - } - RETURN_IF_ERROR(MoveHashPartitions(num_input_rows)); - } - } + // Try to fit a single spilled partition in memory. We can often do this because + // we only need to fit 1/PARTITION_FANOUT of the data in memory. + // TODO: in some cases when the partition probably won't fit in memory it could + // be better to skip directly to repartitioning. + RETURN_IF_ERROR(BuildSpilledPartition(&partition)); + if (partition != nullptr) break; - DCHECK(partition->hash_tbl.get() != NULL); + // If we can't fit the partition in memory, repartition it. + RETURN_IF_ERROR(RepartitionSpilledPartition()); + } + DCHECK(!partition->is_spilled()); + DCHECK(partition->hash_tbl.get() != nullptr); DCHECK(partition->aggregated_row_stream->is_pinned()); output_partition_ = partition; @@ -1259,8 +1234,105 @@ Status PartitionedAggregationNode::NextPartition() { return Status::OK(); } -template<bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stream) { +Status PartitionedAggregationNode::BuildSpilledPartition(Partition** built_partition) { + DCHECK(!spilled_partitions_.empty()); + DCHECK(!is_streaming_preagg_); + // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. + Partition* src_partition = spilled_partitions_.front(); + DCHECK(src_partition->is_spilled()); + + // Create a new hash partition from the rows of the spilled partition. This is simpler + // than trying to finish building a partially-built partition in place. We only + // initialise one hash partition that all rows in 'src_partition' will hash to. + RETURN_IF_ERROR(CreateHashPartitions(src_partition->level, src_partition->idx)); + Partition* dst_partition = hash_partitions_[src_partition->idx]; + DCHECK(dst_partition != nullptr); + + // Rebuild the hash table over spilled aggregate rows then start adding unaggregated + // rows to the hash table. It's possible the partition will spill at either stage. + // In that case we need to finish processing 'src_partition' so that all rows are + // appended to 'dst_partition'. + // TODO: if the partition spills again but the aggregation reduces the input + // significantly, we could do better here by keeping the incomplete hash table in + // memory and only spilling unaggregated rows that didn't fit in the hash table + // (somewhat similar to the passthrough pre-aggregation). + RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get())); + RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get())); + src_partition->Close(false); + spilled_partitions_.pop_front(); + hash_partitions_.clear(); + + if (dst_partition->is_spilled()) { + PushSpilledPartition(dst_partition); + *built_partition = nullptr; + // Spilled the partition - we should not be using any reservation except from + // 'serialize_stream_'. + DCHECK_EQ(serialize_stream_ != nullptr ? serialize_stream_->BytesPinned(false) : 0, + buffer_pool_client_.GetUsedReservation()) << buffer_pool_client_.DebugString(); + } else { + *built_partition = dst_partition; + } + return Status::OK(); +} + +Status PartitionedAggregationNode::RepartitionSpilledPartition() { + DCHECK(!spilled_partitions_.empty()); + DCHECK(!is_streaming_preagg_); + // Leave the partition in 'spilled_partitions_' to be closed if we hit an error. + Partition* partition = spilled_partitions_.front(); + DCHECK(partition->is_spilled()); + + // Create the new hash partitions to repartition into. This will allocate a + // write buffer for each partition's aggregated row stream. + RETURN_IF_ERROR(CreateHashPartitions(partition->level + 1)); + COUNTER_ADD(num_repartitions_, 1); + + // Rows in this partition could have been spilled into two streams, depending + // on if it is an aggregated intermediate, or an unaggregated row. Aggregated + // rows are processed first to save a hash table lookup in ProcessBatch(). + RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get())); + + // Prepare write buffers so we can append spilled rows to unaggregated partitions. + for (Partition* hash_partition : hash_partitions_) { + if (!hash_partition->is_spilled()) continue; + // The aggregated rows have been repartitioned. Free up at least a buffer's worth of + // reservation and use it to pin the unaggregated write buffer. + hash_partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + bool got_buffer; + RETURN_IF_ERROR( + hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer)); + DCHECK(got_buffer) + << "Accounted in min reservation" << buffer_pool_client_.DebugString(); + } + RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get())); + + COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows()); + COUNTER_ADD(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows()); + + partition->Close(false); + spilled_partitions_.pop_front(); + + // Done processing this partition. Move the new partitions into + // spilled_partitions_/aggregated_partitions_. + int64_t num_input_rows = partition->aggregated_row_stream->num_rows() + + partition->unaggregated_row_stream->num_rows(); + + // Check if there was any reduction in the size of partitions after repartitioning. + int64_t largest_partition = LargestSpilledPartition(); + DCHECK_GE(num_input_rows, largest_partition) << "Partition had more rows than input"; + if (UNLIKELY(num_input_rows == largest_partition)) { + stringstream ss; + DebugString(2, &ss); + return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_, + partition->level + 1, num_input_rows, buffer_pool_client_.DebugString(), + ss.str()); + } + RETURN_IF_ERROR(MoveHashPartitions(num_input_rows)); + return Status::OK(); +} + +template <bool AGGREGATED_ROWS> +Status PartitionedAggregationNode::ProcessStream(BufferedTupleStreamV2* input_stream) { DCHECK(!is_streaming_preagg_); if (input_stream->num_rows() > 0) { while (true) { @@ -1268,7 +1340,7 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer)); if (got_buffer) break; // Did not have a buffer to read the input stream. Spill and try again. - RETURN_IF_ERROR(SpillPartition()); + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); } TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode; @@ -1288,16 +1360,17 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre return Status::OK(); } -Status PartitionedAggregationNode::SpillPartition() { +Status PartitionedAggregationNode::SpillPartition(bool more_aggregate_rows) { int64_t max_freed_mem = 0; int partition_idx = -1; // Iterate over the partitions and pick the largest partition that is not spilled. for (int i = 0; i < hash_partitions_.size(); ++i) { + if (hash_partitions_[i] == nullptr) continue; if (hash_partitions_[i]->is_closed) continue; if (hash_partitions_[i]->is_spilled()) continue; // Pass 'true' because we need to keep the write block pinned. See Partition::Spill(). - int64_t mem = hash_partitions_[i]->aggregated_row_stream->bytes_in_mem(true); + int64_t mem = hash_partitions_[i]->aggregated_row_stream->BytesPinned(true); mem += hash_partitions_[i]->hash_tbl->ByteSize(); mem += hash_partitions_[i]->agg_fn_pool->total_reserved_bytes(); DCHECK_GT(mem, 0); // At least the hash table buckets should occupy memory. @@ -1306,26 +1379,26 @@ Status PartitionedAggregationNode::SpillPartition() { partition_idx = i; } } - if (partition_idx == -1) { - // Could not find a partition to spill. This means the mem limit was just too low. - return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id()); - } - + DCHECK_NE(partition_idx, -1) << "Should have been able to spill a partition to " + << "reclaim memory: " << buffer_pool_client_.DebugString(); hash_tbls_[partition_idx] = NULL; - return hash_partitions_[partition_idx]->Spill(); + return hash_partitions_[partition_idx]->Spill(more_aggregate_rows); } Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { DCHECK(!hash_partitions_.empty()); stringstream ss; - ss << "PA(node_id=" << id() << ") partitioned(level=" - << hash_partitions_[0]->level << ") " - << num_input_rows << " rows into:" << endl; + ss << "PA(node_id=" << id() << ") partitioned(level=" << hash_partitions_[0]->level + << ") " << num_input_rows << " rows into:" << endl; for (int i = 0; i < hash_partitions_.size(); ++i) { Partition* partition = hash_partitions_[i]; - int64_t aggregated_rows = partition->aggregated_row_stream->num_rows(); + if (partition == nullptr) continue; + int64_t aggregated_rows = 0; + if (partition->aggregated_row_stream != nullptr) { + aggregated_rows = partition->aggregated_row_stream->num_rows(); + } int64_t unaggregated_rows = 0; - if (partition->unaggregated_row_stream != NULL) { + if (partition->unaggregated_row_stream != nullptr) { unaggregated_rows = partition->unaggregated_row_stream->num_rows(); } double total_rows = aggregated_rows + unaggregated_rows; @@ -1341,54 +1414,46 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { if (total_rows == 0) { partition->Close(false); } else if (partition->is_spilled()) { - DCHECK(partition->hash_tbl.get() == NULL); - // We need to unpin all the spilled partitions to make room to allocate new - // hash_partitions_ when we repartition the spilled partitions. - // TODO: we only need to do this when we have memory pressure. This might be - // okay though since the block mgr should only write these to disk if there - // is memory pressure. - RETURN_IF_ERROR( - partition->aggregated_row_stream->UnpinStream(BufferedTupleStream::UNPIN_ALL)); - RETURN_IF_ERROR(partition->unaggregated_row_stream->UnpinStream( - BufferedTupleStream::UNPIN_ALL)); - - // Push new created partitions at the front. This means a depth first walk - // (more finely partitioned partitions are processed first). This allows us - // to delete blocks earlier and bottom out the recursion earlier. - spilled_partitions_.push_front(partition); + PushSpilledPartition(partition); } else { aggregated_partitions_.push_back(partition); } - } VLOG(2) << ss.str(); hash_partitions_.clear(); return Status::OK(); } +void PartitionedAggregationNode::PushSpilledPartition(Partition* partition) { + DCHECK(partition->is_spilled()); + DCHECK(partition->hash_tbl == nullptr); + // Ensure all pages in the spilled partition's streams are unpinned by invalidating + // the streams' read and write iterators. We may need all the memory to process the + // next spilled partitions. + partition->aggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + partition->unaggregated_row_stream->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + spilled_partitions_.push_front(partition); +} + void PartitionedAggregationNode::ClosePartitions() { - for (int i = 0; i < hash_partitions_.size(); ++i) { - hash_partitions_[i]->Close(true); - } - for (list<Partition*>::iterator it = aggregated_partitions_.begin(); - it != aggregated_partitions_.end(); ++it) { - (*it)->Close(true); - } - for (list<Partition*>::iterator it = spilled_partitions_.begin(); - it != spilled_partitions_.end(); ++it) { - (*it)->Close(true); + for (Partition* partition : hash_partitions_) { + if (partition != nullptr) partition->Close(true); } + hash_partitions_.clear(); + for (Partition* partition : aggregated_partitions_) partition->Close(true); aggregated_partitions_.clear(); + for (Partition* partition : spilled_partitions_) partition->Close(true); spilled_partitions_.clear(); - hash_partitions_.clear(); memset(hash_tbls_, 0, sizeof(hash_tbls_)); partition_pool_->Clear(); } Status PartitionedAggregationNode::QueryMaintenance(RuntimeState* state) { AggFnEvaluator::FreeLocalAllocations(agg_fn_evals_); - for (int i = 0; i < hash_partitions_.size(); ++i) { - AggFnEvaluator::FreeLocalAllocations(hash_partitions_[i]->agg_fn_evals); + for (Partition* partition : hash_partitions_) { + if (partition != nullptr) { + AggFnEvaluator::FreeLocalAllocations(partition->agg_fn_evals); + } } if (ht_ctx_.get() != nullptr) ht_ctx_->FreeLocalAllocations(); return ExecNode::QueryMaintenance(state); @@ -1972,4 +2037,8 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming( return Status::OK(); } +// Instantiate required templates. +template Status PartitionedAggregationNode::AppendSpilledRow<false>( + Partition*, TupleRow*); +template Status PartitionedAggregationNode::AppendSpilledRow<true>(Partition*, TupleRow*); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 066dc28..4f8b622 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -19,13 +19,15 @@ #ifndef IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H #define IMPALA_EXEC_PARTITIONED_AGGREGATION_NODE_H +#include <deque> + #include <boost/scoped_ptr.hpp> #include "exec/exec-node.h" #include "exec/hash-table.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/buffered-tuple-stream.h" -#include "runtime/descriptors.h" // for TupleId +#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/bufferpool/suballocator.h" +#include "runtime/descriptors.h" // for TupleId #include "runtime/mem-pool.h" #include "runtime/string-value.h" @@ -229,7 +231,9 @@ class PartitionedAggregationNode : public ExecNode { std::vector<int> string_grouping_exprs_; RuntimeState* state_; - BufferedBlockMgr::Client* block_mgr_client_; + + /// Allocator for hash table memory. + boost::scoped_ptr<Suballocator> ht_allocator_; /// MemPool used to allocate memory for when we don't have grouping and don't initialize /// the partitioning structures, or during Close() when creating new output tuples. @@ -337,12 +341,12 @@ class PartitionedAggregationNode : public ExecNode { HashTable* hash_tbls_[PARTITION_FANOUT]; /// All partitions that have been spilled and need further processing. - std::list<Partition*> spilled_partitions_; + std::deque<Partition*> spilled_partitions_; /// All partitions that are aggregated and can just return the results in GetNext(). /// After consuming all the input, hash_partitions_ is split into spilled_partitions_ /// and aggregated_partitions_, depending on if it was spilled or not. - std::list<Partition*> aggregated_partitions_; + std::deque<Partition*> aggregated_partitions_; /// END: Members that must be Reset() ///////////////////////////////////////// @@ -352,31 +356,42 @@ class PartitionedAggregationNode : public ExecNode { /// initially use small buffers. Streaming pre-aggregations do not spill and do not /// require an unaggregated stream. struct Partition { - Partition(PartitionedAggregationNode* parent, int level) - : parent(parent), is_closed(false), level(level) {} + Partition(PartitionedAggregationNode* parent, int level, int idx) + : parent(parent), is_closed(false), level(level), idx(idx) {} ~Partition(); /// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling - /// aggregation), reserving one buffer for each. The buffers backing these streams - /// are reserved, so this function will not fail with a continuable OOM. If we fail - /// to init these buffers, the mem limit is too low to run this algorithm. - Status InitStreams(); + /// aggregation), allocating one buffer for each. Spilling merge aggregations must + /// have enough reservation for the initial buffer for the stream, so this should + /// not fail due to OOM. Preaggregations do not reserve any buffers: if does not + /// have enough reservation for the initial buffer, the aggregated row stream is not + /// created and an OK status is returned. + Status InitStreams() WARN_UNUSED_RESULT; - /// Initializes the hash table. Returns false on OOM. - bool InitHashTable(); + /// Initializes the hash table. 'aggregated_row_stream' must be non-NULL. + /// Sets 'got_memory' to true if the hash table was initialised or false on OOM. + Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT; /// Called in case we need to serialize aggregated rows. This step effectively does /// a merge aggregation in this node. - Status SerializeStreamForSpilling(); + Status SerializeStreamForSpilling() WARN_UNUSED_RESULT; /// Closes this partition. If finalize_rows is true, this iterates over all rows /// in aggregated_row_stream and finalizes them (this is only used in the cancellation /// path). void Close(bool finalize_rows); - /// Spills this partition, unpinning streams and cleaning up hash tables as necessary. - Status Spill(); + /// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows + /// may be appended to the the partition before appending unaggregated rows. On + /// success, one of the streams is left with a write iterator: the aggregated stream + /// if 'more_aggregate_rows' is true or the unaggregated stream otherwise. + Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT; + + /// Discards the aggregated row stream and hash table. Only valid to call if this is + /// a streaming preaggregation and the initial memory allocation for hash tables or + /// the aggregated stream failed. The aggregated stream must have 0 rows. + void DiscardAggregatedRowStream(); bool is_spilled() const { return hash_tbl.get() == NULL; } @@ -390,9 +405,12 @@ class PartitionedAggregationNode : public ExecNode { /// etc. const int level; + /// The index of this partition within 'hash_partitions_' at its level. + const int idx; + /// Hash table for this partition. /// Can be NULL if this partition is no longer maintaining a hash table (i.e. - /// is spilled). + /// is spilled or we are passing through all rows for this partition). boost::scoped_ptr<HashTable> hash_tbl; /// Clone of parent's agg_fn_evals_ and backing MemPool. @@ -401,18 +419,24 @@ class PartitionedAggregationNode : public ExecNode { /// Tuple stream used to store aggregated rows. When the partition is not spilled, /// (meaning the hash table is maintained), this stream is pinned and contains the - /// memory referenced by the hash table. When it is spilled, aggregate rows are - /// just appended to this stream. - boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream; + /// memory referenced by the hash table. When it is spilled, this consumes reservation + /// for a write buffer only during repartitioning of aggregated rows. + /// + /// For streaming preaggs, this may be NULL if sufficient memory is not available. + /// In that case hash_tbl is also NULL and all rows for the partition will be passed + /// through. + boost::scoped_ptr<BufferedTupleStreamV2> aggregated_row_stream; /// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations. - boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream; + /// Always unpinned. Has a write buffer allocated when the partition is spilled and + /// unaggregated rows are being processed. + boost::scoped_ptr<BufferedTupleStreamV2> unaggregated_row_stream; }; /// Stream used to store serialized spilled rows. Only used if needs_serialize_ /// is set. This stream is never pinned and only used in Partition::Spill as a /// a temporary buffer. - boost::scoped_ptr<BufferedTupleStream> serialize_stream_; + boost::scoped_ptr<BufferedTupleStreamV2> serialize_stream_; /// Accessor for 'hash_tbls_' that verifies consistency with the partitions. HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) { @@ -447,7 +471,7 @@ class PartitionedAggregationNode : public ExecNode { /// FunctionContexts, so is stored outside the stream. If stream's small buffers get /// full, it will attempt to switch to IO-buffers. Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals, - BufferedTupleStream* stream, Status* status) noexcept; + BufferedTupleStreamV2* stream, Status* status) noexcept; /// Constructs intermediate tuple, allocating memory from pool instead of the stream. /// Returns NULL and sets status if there is not enough memory to allocate the tuple. @@ -495,7 +519,7 @@ class PartitionedAggregationNode : public ExecNode { /// Do the aggregation for all tuple rows in the batch when there is no grouping. /// This function is replaced by codegen. - Status ProcessBatchNoGrouping(RowBatch* batch); + Status ProcessBatchNoGrouping(RowBatch* batch) WARN_UNUSED_RESULT; /// Processes a batch of rows. This is the core function of the algorithm. We partition /// the rows into hash_partitions_, spilling as necessary. @@ -507,9 +531,9 @@ class PartitionedAggregationNode : public ExecNode { // /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for /// performance. - template<bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, - TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx); + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, TPrefetchMode::type prefetch_mode, + HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on @@ -524,7 +548,8 @@ class PartitionedAggregationNode : public ExecNode { /// ProcessBatch for codegen to substitute function calls with codegen'd versions. /// May spill partitions if not enough memory is available. template <bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx); + Status IR_ALWAYS_INLINE ProcessRow( + TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is /// the context for the partition's hash table and hash is the precomputed hash of @@ -533,35 +558,33 @@ class PartitionedAggregationNode : public ExecNode { /// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen /// to substitute function calls with codegen'd versions. insert_it is an iterator /// for insertion returned from HashTable::FindBuildRowBucket(). - template<bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, - TupleRow* row, uint32_t hash, HashTable::Iterator insert_it); - - /// Append a row to a spilled partition. May spill partitions if needed to switch to - /// I/O buffers. Selects the correct stream according to the argument. Inlined into - /// ProcessBatch(). - template<bool AGGREGATED_ROWS> - Status IR_ALWAYS_INLINE AppendSpilledRow(Partition* partition, TupleRow* row); + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* row, + uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT; - /// Append a row to a stream of a spilled partition. May spill partitions if needed - /// to append the row. - Status AppendSpilledRow(BufferedTupleStream* stream, TupleRow* row); + /// Append a row to a spilled partition. The row may be aggregated or unaggregated + /// according to AGGREGATED_ROWS. May spill partitions if needed to append the row + /// buffers. + template <bool AGGREGATED_ROWS> + Status IR_ALWAYS_INLINE AppendSpilledRow( + Partition* partition, TupleRow* row) WARN_UNUSED_RESULT; /// Reads all the rows from input_stream and process them by calling ProcessBatch(). - template<bool AGGREGATED_ROWS> - Status ProcessStream(BufferedTupleStream* input_stream); + template <bool AGGREGATED_ROWS> + Status ProcessStream(BufferedTupleStreamV2* input_stream) WARN_UNUSED_RESULT; /// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'. void GetSingletonOutput(RowBatch* row_batch); /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to /// true if all rows from all partitions have been returned or the limit is reached. - Status GetRowsFromPartition(RuntimeState* state, RowBatch* row_batch); + Status GetRowsFromPartition( + RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; /// Get output rows from child for streaming pre-aggregation. Aggregates some rows with /// hash table and passes through other rows converted into the intermediate /// tuple format. Sets 'child_eos_' once all rows from child have been returned. - Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch); + Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT; /// Return true if we should keep expanding hash tables in the preagg. If false, /// the preagg should pass through any rows it can't fit in its tables. @@ -582,7 +605,7 @@ class PartitionedAggregationNode : public ExecNode { /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser. Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx, - int remaining_capacity[PARTITION_FANOUT]); + int remaining_capacity[PARTITION_FANOUT]) WARN_UNUSED_RESULT; /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming /// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set @@ -592,18 +615,24 @@ class PartitionedAggregationNode : public ExecNode { /// keeps track of how many more entries can be added to the hash table so we can avoid /// retrying inserts. It is decremented if an insert succeeds and set to zero if an /// insert fails. If an error occurs, returns false and sets 'status'. - bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, - Partition* partition, HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, - int* remaining_capacity, Status* status); + bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* partition, + HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity, + Status* status) WARN_UNUSED_RESULT; /// Initializes hash_partitions_. 'level' is the level for the partitions to create. + /// If 'single_partition_idx' is provided, it must be a number in range + /// [0, PARTITION_FANOUT), and only that partition is created - the others are + /// initialized to NULL. /// Also sets ht_ctx_'s level to 'level'. - Status CreateHashPartitions(int level); + Status CreateHashPartitions( + int level, int single_partition_idx = -1) WARN_UNUSED_RESULT; /// Ensure that hash tables for all in-memory partitions are large enough to fit /// 'num_rows' additional hash table entries. If there is not enough memory to - /// resize the hash tables, may spill partitions. - Status CheckAndResizeHashPartitions(int num_rows, const HashTableCtx* ht_ctx); + /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if + /// we're currently partitioning aggregated rows. + Status CheckAndResizeHashPartitions( + bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT; /// Iterates over all the partitions in hash_partitions_ and returns the number of rows /// of the largest spilled partition (in terms of number of aggregated and unaggregated @@ -614,16 +643,39 @@ class PartitionedAggregationNode : public ExecNode { /// initializes output_iterator_ and output_partition_. This either removes /// a partition from aggregated_partitions_ (and is done) or removes the next /// partition from aggregated_partitions_ and repartitions it. - Status NextPartition(); - - /// Picks a partition from hash_partitions_ to spill. - Status SpillPartition(); + Status NextPartition() WARN_UNUSED_RESULT; + + /// Tries to build the first partition in 'spilled_partitions_'. + /// If successful, set *built_partition to the partition. The caller owns the partition + /// and is responsible for closing it. If unsuccessful because the partition could not + /// fit in memory, set *built_partition to NULL and append the spilled partition to the + /// head of 'spilled_partitions_' so it can be processed by + /// RepartitionSpilledPartition(). + Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT; + + /// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT + /// output partitions. On success, each output partition is either: + /// * closed, if no rows were added to the partition. + /// * in 'spilled_partitions_', if the partition spilled. + /// * in 'aggregated_partitions_', if the output partition was not spilled. + Status RepartitionSpilledPartition() WARN_UNUSED_RESULT; + + /// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed + /// to Partition::Spill() when spilling the partition. See the Partition::Spill() + /// comment for further explanation. + Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT; /// Moves the partitions in hash_partitions_ to aggregated_partitions_ or /// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned. /// input_rows is the number of input rows that have been repartitioned. /// Used for diagnostics. - Status MoveHashPartitions(int64_t input_rows); + Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT; + + /// Adds a partition to the front of 'spilled_partitions_' for later processing. + /// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed + /// first). This allows us to delete pages earlier and bottom out the recursion + /// earlier and also improves time locality of access to spilled data on disk. + void PushSpilledPartition(Partition* partition); /// Calls Close() on every Partition in 'aggregated_partitions_', /// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists, @@ -638,7 +690,7 @@ class PartitionedAggregationNode : public ExecNode { /// and returns the IR function in 'fn'. Returns non-OK status if codegen /// is unsuccessful. Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx, - SlotDescriptor* slot_desc, llvm::Function** fn); + SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT; /// Codegen a call to a function implementing the UDA interface with input values /// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate @@ -647,10 +699,10 @@ class PartitionedAggregationNode : public ExecNode { /// the insert position of 'builder'. Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn, llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals, - const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val); + const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT; /// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful. - Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn); + Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT; /// Codegen the non-streaming process row batch loop. The loop has already been /// compiled to IR and loaded into the codegen object. UpdateAggTuple has also been @@ -659,26 +711,28 @@ class PartitionedAggregationNode : public ExecNode { /// 'process_batch_no_grouping_fn_' will be updated with the codegened function /// depending on whether this is a grouping or non-grouping aggregation. /// Assumes AGGREGATED_ROWS = false. - Status CodegenProcessBatch(LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode); + Status CodegenProcessBatch( + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; /// Codegen the materialization loop for streaming preaggregations. /// 'process_batch_streaming_fn_' will be updated with the codegened function. Status CodegenProcessBatchStreaming( - LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode); + LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT; - /// We need two buffers per partition, one for the aggregated stream and one - /// for the unaggregated stream. We need an additional buffer to read the stream - /// we are currently repartitioning. + /// Compute minimum buffer requirement for grouping aggregations. + /// We need one buffer per partition, which is used either as the write buffer for the + /// aggregated stream or the unaggregated stream. We need an additional buffer to read + /// the stream we are currently repartitioning. /// If we need to serialize, we need an additional buffer while spilling a partition /// as the partitions aggregate stream needs to be serialized and rewritten. /// We do not spill streaming preaggregations, so we do not need to reserve any buffers. int MinRequiredBuffers() const { - // Must be kept in sync with AggregationNode.computeResourceProfile() in fe. - if (is_streaming_preagg_) return 0; - return 2 * PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0); + DCHECK(!grouping_exprs_.empty()); + // Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe. + if (is_streaming_preagg_) return 0; // Need 0 buffers to pass through rows. + return PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0); } }; - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc index e5f649e..df58036 100644 --- a/be/src/exec/partitioned-hash-join-builder-ir.cc +++ b/be/src/exec/partitioned-hash-join-builder-ir.cc @@ -19,7 +19,7 @@ #include "codegen/impala-ir.h" #include "exec/hash-table.inline.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" #include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter.h" @@ -30,7 +30,7 @@ using namespace impala; inline bool PhjBuilder::AppendRow( - BufferedTupleStream* stream, TupleRow* row, Status* status) { + BufferedTupleStreamV2* stream, TupleRow* row, Status* status) { if (LIKELY(stream->AddRow(row, status))) return true; if (UNLIKELY(!status->ok())) return false; return AppendRowStreamFull(stream, row, status); @@ -73,12 +73,12 @@ Status PhjBuilder::ProcessBuildBatch( bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch, - const vector<BufferedTupleStream::RowIdx>& indices) { + const vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, Status* status) { // Compute the hash values and prefetch the hash table buckets. const int num_rows = batch->num_rows(); HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int prefetch_size = expr_vals_cache->capacity(); - const BufferedTupleStream::RowIdx* row_indices = indices.data(); + const BufferedTupleStreamV2::FlatRowPtr* flat_rows_data = flat_rows.data(); for (int prefetch_group_row = 0; prefetch_group_row < num_rows; prefetch_group_row += prefetch_size) { int cur_row = prefetch_group_row; @@ -97,9 +97,9 @@ bool PhjBuilder::Partition::InsertBatch(TPrefetchMode::type prefetch_mode, expr_vals_cache->ResetForRead(); FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { TupleRow* row = batch_iter.Get(); - BufferedTupleStream::RowIdx row_idx = row_indices[cur_row]; + BufferedTupleStreamV2::FlatRowPtr flat_row = flat_rows_data[cur_row]; if (!expr_vals_cache->IsRowNull() - && UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) { + && UNLIKELY(!hash_tbl_->Insert(ht_ctx, flat_row, row, status))) { return false; } expr_vals_cache->NextRow(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index 4a5885b..a2f7c96 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -25,8 +25,10 @@ #include "exec/hash-table.inline.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream.h" +#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/exec-env.h" #include "runtime/mem-tracker.h" +#include "runtime/query-state.h" #include "runtime/row-batch.h" #include "runtime/runtime-filter-bank.h" #include "runtime/runtime-filter.h" @@ -44,19 +46,23 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "the memory limit may help this query to complete successfully."; using namespace impala; -using namespace llvm; -using namespace strings; -using std::unique_ptr; +using llvm::ConstantInt; +using llvm::Function; +using llvm::Type; +using llvm::Value; +using strings::Substitute; PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc, - RuntimeState* state) + RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client, + int64_t spillable_buffer_size) : DataSink(build_row_desc), runtime_state_(state), join_node_id_(join_node_id), join_op_(join_op), probe_row_desc_(probe_row_desc), - block_mgr_client_(NULL), + buffer_pool_client_(buffer_pool_client), + spillable_buffer_size_(spillable_buffer_size), non_empty_build_(false), partitions_created_(NULL), largest_partition_percent_(NULL), @@ -137,9 +143,6 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, &pool_, expr_mem_pool(), &filter_ctxs_[i].expr_eval)); } - RETURN_IF_ERROR(state->block_mgr()->RegisterClient( - Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this), - MinRequiredBuffers(), true, mem_tracker_.get(), state, &block_mgr_client_)); partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); largest_partition_percent_ = @@ -169,6 +172,11 @@ Status PhjBuilder::Open(RuntimeState* state) { for (const FilterContext& ctx : filter_ctxs_) { RETURN_IF_ERROR(ctx.expr_eval->Open(state)); } + if (ht_allocator_ == nullptr) { + // Create 'ht_allocator_' on the first call to Open(). + ht_allocator_.reset(new Suballocator( + state->exec_env()->buffer_pool(), buffer_pool_client_, spillable_buffer_size_)); + } RETURN_IF_ERROR(CreateHashPartitions(0)); AllocateRuntimeFilters(); @@ -248,7 +256,6 @@ void PhjBuilder::Close(RuntimeState* state) { if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state); } ScalarExpr::Close(filter_exprs_); - if (block_mgr_client_ != NULL) state->block_mgr()->ClearReservations(block_mgr_client_); ScalarExpr::Close(build_exprs_); pool_.Clear(); DataSink::Close(state); @@ -264,13 +271,11 @@ void PhjBuilder::Reset() { Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) { all_partitions_.emplace_back(new Partition(runtime_state_, this, level)); *partition = all_partitions_.back().get(); - RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, profile(), true)); + RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, true)); bool got_buffer; RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer)); - if (!got_buffer) { - return runtime_state_->block_mgr()->MemLimitTooLowError( - block_mgr_client_, join_node_id_); - } + DCHECK(got_buffer) + << "Accounted in min reservation" << buffer_pool_client_->DebugString(); return Status::OK(); } @@ -288,22 +293,11 @@ Status PhjBuilder::CreateHashPartitions(int level) { } bool PhjBuilder::AppendRowStreamFull( - BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { + BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept { while (true) { - // Check if the stream is still using small buffers and try to switch to IO-buffers. - if (stream->using_small_buffers()) { - bool got_buffer; - *status = stream->SwitchToIoBuffers(&got_buffer); - if (!status->ok()) return false; - - if (got_buffer) { - if (LIKELY(stream->AddRow(row, status))) return true; - if (!status->ok()) return false; - } - } // We ran out of memory. Pick a partition to spill. If we ran out of unspilled // partitions, SpillPartition() will return an error status. - *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + *status = SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); if (!status->ok()) return false; if (stream->AddRow(row, status)) return true; if (!status->ok()) return false; @@ -313,7 +307,7 @@ bool PhjBuilder::AppendRowStreamFull( } // TODO: can we do better with a different spilling heuristic? -Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { +Status PhjBuilder::SpillPartition(BufferedTupleStreamV2::UnpinMode mode) { DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT); int64_t max_freed_mem = 0; int partition_idx = -1; @@ -323,7 +317,7 @@ Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { Partition* candidate = hash_partitions_[i]; if (candidate->IsClosed()) continue; if (candidate->is_spilled()) continue; - int64_t mem = candidate->build_rows()->bytes_in_mem(false); + int64_t mem = candidate->build_rows()->BytesPinned(false); if (candidate->hash_tbl() != NULL) { // The hash table should not have matches, since we have not probed it yet. // Losing match info would lead to incorrect results (IMPALA-1488). @@ -337,9 +331,9 @@ Status PhjBuilder::SpillPartition(BufferedTupleStream::UnpinMode mode) { } if (partition_idx == -1) { - // Could not find a partition to spill. This means the mem limit was just too low. - return runtime_state_->block_mgr()->MemLimitTooLowError( - block_mgr_client_, join_node_id_); + return Status(Substitute("Internal error: could not find a partition to spill in " + " hash join $1: \n$2\nClient:\n$3", + join_node_id_, DebugString(), buffer_pool_client_->DebugString())); } VLOG(2) << "Spilling partition: " << partition_idx << endl << DebugString(); @@ -373,8 +367,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { partition->Close(NULL); } else if (partition->is_spilled()) { // We don't need any build-side data for spilled partitions in memory. - RETURN_IF_ERROR( - partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); + partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); } } @@ -393,7 +386,7 @@ Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() { RETURN_IF_ERROR(partition->BuildHashTable(&built)); // If we did not have enough memory to build this hash table, we need to spill this // partition (clean up the hash table, unpin build). - if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL)); + if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStreamV2::UNPIN_ALL)); } // We may have spilled additional partitions while building hash tables, we need to @@ -429,11 +422,11 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { while (probe_streams_to_create > 0) { // Create stream in vector, so that it will be cleaned up after any failure. - spilled_partition_probe_streams_.emplace_back(std::make_unique<BufferedTupleStream>( - runtime_state_, probe_row_desc_, runtime_state_->block_mgr(), block_mgr_client_, - false /* use_initial_small_buffers */, false /* read_write */)); - BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get(); - RETURN_IF_ERROR(probe_stream->Init(join_node_id_, profile(), false)); + spilled_partition_probe_streams_.emplace_back( + make_unique<BufferedTupleStreamV2>(runtime_state_, probe_row_desc_, + buffer_pool_client_, spillable_buffer_size_, spillable_buffer_size_)); + BufferedTupleStreamV2* probe_stream = spilled_partition_probe_streams_.back().get(); + RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false)); // Loop until either the stream gets a buffer or all partitions are spilled (in which // case SpillPartition() returns an error). @@ -442,7 +435,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer)); if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL)); + RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL)); ++probe_streams_to_create; } --probe_streams_to_create; @@ -450,7 +443,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() { return Status::OK(); } -vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() { +vector<unique_ptr<BufferedTupleStreamV2>> PhjBuilder::TransferProbeStreams() { return std::move(spilled_partition_probe_streams_); } @@ -460,7 +453,7 @@ void PhjBuilder::CloseAndDeletePartitions() { all_partitions_.clear(); hash_partitions_.clear(); null_aware_partition_ = NULL; - for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) { + for (unique_ptr<BufferedTupleStreamV2>& stream : spilled_partition_probe_streams_) { stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES); } spilled_partition_probe_streams_.clear(); @@ -512,14 +505,14 @@ void PhjBuilder::PublishRuntimeFilters(int64_t num_build_rows) { } Status PhjBuilder::RepartitionBuildInput( - Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) { + Partition* input_partition, int level, BufferedTupleStreamV2* input_probe_rows) { DCHECK_GE(level, 1); SCOPED_TIMER(repartition_timer_); COUNTER_ADD(num_repartitions_, 1); RuntimeState* state = runtime_state_; // Setup the read buffer and the new partitions. - BufferedTupleStream* build_rows = input_partition->build_rows(); + BufferedTupleStreamV2* build_rows = input_partition->build_rows(); DCHECK(build_rows != NULL); bool got_read_buffer; RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer)); @@ -552,7 +545,7 @@ Status PhjBuilder::RepartitionBuildInput( bool got_buffer; RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer)); if (got_buffer) break; - RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); + RETURN_IF_ERROR(SpillPartition(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT)); } RETURN_IF_ERROR(FlushFinal(state)); @@ -580,12 +573,9 @@ bool PhjBuilder::HashTableStoresNulls() const { PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level) : parent_(parent), is_spilled_(false), level_(level) { - // If we're repartitioning, we can assume the build input is fairly large and small - // buffers will most likely just waste memory. - bool use_initial_small_buffers = level == 0; - build_rows_ = - std::make_unique<BufferedTupleStream>(state, parent_->row_desc_, state->block_mgr(), - parent_->block_mgr_client_, use_initial_small_buffers, false /* read_write */); + build_rows_ = make_unique<BufferedTupleStreamV2>(state, parent_->row_desc_, + parent_->buffer_pool_client_, parent->spillable_buffer_size_, + parent->spillable_buffer_size_); } PhjBuilder::Partition::~Partition() { @@ -612,30 +602,15 @@ void PhjBuilder::Partition::Close(RowBatch* batch) { } } -Status PhjBuilder::Partition::Spill(BufferedTupleStream::UnpinMode mode) { +Status PhjBuilder::Partition::Spill(BufferedTupleStreamV2::UnpinMode mode) { DCHECK(!IsClosed()); - // Close the hash table as soon as possible to release memory. + RETURN_IF_ERROR(parent_->runtime_state_->StartSpilling(parent_->mem_tracker())); + // Close the hash table and unpin the stream backing it to free memory. if (hash_tbl() != NULL) { hash_tbl_->Close(); hash_tbl_.reset(); } - - // Unpin the stream as soon as possible to increase the chances that the - // SwitchToIoBuffers() call below will succeed. - RETURN_IF_ERROR(build_rows_->UnpinStream(mode)); - - if (build_rows_->using_small_buffers()) { - bool got_buffer; - RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer)); - if (!got_buffer) { - // We'll try again to get the buffers when the stream fills up the small buffers. - VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition " - << this << " of join=" << parent_->join_node_id_ - << " build small buffers=" << build_rows_->using_small_buffers(); - VLOG_FILE << GetStackTrace(); - } - } - + build_rows_->UnpinStream(mode); if (!is_spilled_) { COUNTER_ADD(parent_->num_spilled_partitions_, 1); if (parent_->num_spilled_partitions_->value() == 1) { @@ -652,14 +627,14 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) { *built = false; // Before building the hash table, we need to pin the rows in memory. - RETURN_IF_ERROR(build_rows_->PinStream(false, built)); + RETURN_IF_ERROR(build_rows_->PinStream(built)); if (!*built) return Status::OK(); RuntimeState* state = parent_->runtime_state_; HashTableCtx* ctx = parent_->ht_ctx_.get(); ctx->set_level(level()); // Set the hash function for building the hash table. RowBatch batch(parent_->row_desc_, state->batch_size(), parent_->mem_tracker()); - vector<BufferedTupleStream::RowIdx> indices; + vector<BufferedTupleStreamV2::FlatRowPtr> flat_rows; bool eos = false; // Allocate the partition-local hash table. Initialize the number of buckets based on @@ -674,22 +649,22 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) { // // TODO: Try to allocate the hash table before pinning the stream to avoid needlessly // reading all of the spilled rows from disk when we won't succeed anyway. - int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ? - HashTable::EstimateNumBuckets(build_rows()->num_rows()) : - state->batch_size() * 2; - hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_, + int64_t estimated_num_buckets = HashTable::EstimateNumBuckets(build_rows()->num_rows()); + hash_tbl_.reset(HashTable::Create(parent_->ht_allocator_.get(), true /* store_duplicates */, parent_->row_desc_->tuple_descriptors().size(), build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); - if (!hash_tbl_->Init()) goto not_built; + bool success; + Status status = hash_tbl_->Init(&success); + if (!status.ok() || !success) goto not_built; + status = build_rows_->PrepareForRead(false, &success); + if (!status.ok()) goto not_built; + DCHECK(success) << "Stream was already pinned."; - bool got_read_buffer; - RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer)); - DCHECK(got_read_buffer) << "Stream was already pinned."; do { - RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices)); - DCHECK_EQ(batch.num_rows(), indices.size()); - DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()) - << build_rows()->RowConsumesMemory(); + status = build_rows_->GetNext(&batch, &eos, &flat_rows); + if (!status.ok()) goto not_built; + DCHECK_EQ(batch.num_rows(), flat_rows.size()); + DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()); TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; if (parent_->insert_batch_fn_ != NULL) { InsertBatchFn insert_batch_fn; @@ -699,11 +674,12 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) { insert_batch_fn = parent_->insert_batch_fn_; } DCHECK(insert_batch_fn != NULL); - if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) { + if (UNLIKELY( + !insert_batch_fn(this, prefetch_mode, ctx, &batch, flat_rows, &status))) { goto not_built; } - } else { - if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) goto not_built; + } else if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, flat_rows, &status))) { + goto not_built; } RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->GetQueryStatus()); @@ -725,7 +701,7 @@ not_built: hash_tbl_->Close(); hash_tbl_.reset(); } - return Status::OK(); + return status; } void PhjBuilder::Codegen(LlvmCodeGen* codegen) { @@ -774,7 +750,8 @@ string PhjBuilder::DebugString() const { DCHECK(partition->build_rows() != NULL); ss << endl << " Build Rows: " << partition->build_rows()->num_rows() - << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")" << endl; + << " (Bytes pinned: " << partition->build_rows()->BytesPinned(false) << ")" + << endl; if (partition->hash_tbl() != NULL) { ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl; }
