http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index e0393b5..912613d 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -26,8 +26,9 @@ #include "exec/data-sink.h" #include "exec/filter-context.h" #include "exec/hash-table.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/buffered-tuple-stream.h" +#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/suballocator.h" #include "gen-cpp/PlanNodes_types.h" @@ -56,7 +57,7 @@ class ScalarExprEvaluator; /// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1 /// partitions. /// -/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client +/// Both the PartitionedHashJoinNode and the builder share a BufferPool client /// and the corresponding reservations. Different stages of the spilling algorithm /// require different mixes of build and probe buffers and hash tables, so we can /// share the reservation to minimize the combined memory requirement. Initial probe-side @@ -72,7 +73,8 @@ class PhjBuilder : public DataSink { class Partition; PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc, - const RowDescriptor* build_row_desc, RuntimeState* state); + const RowDescriptor* build_row_desc, RuntimeState* state, + BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size); Status InitExprsAndFilters(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts, @@ -101,7 +103,7 @@ class PhjBuilder : public DataSink { /// Transfer ownership of the probe streams to the caller. One stream was allocated per /// spilled partition in FlushFinal(). The probe streams are empty but prepared for /// writing with a write buffer allocated. - std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams(); + std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams(); /// Clears the current list of hash partitions. Called after probing of the partitions /// is done. The partitions are not closed or destroyed, since they may be spilled or @@ -122,7 +124,7 @@ class PhjBuilder : public DataSink { /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase /// has enough buffers preallocated to execute successfully. Status RepartitionBuildInput(Partition* input_partition, int level, - BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT; + BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT; /// Returns the largest build row count out of the current hash partitions. int64_t LargestPartitionRows() const; @@ -132,7 +134,6 @@ class PhjBuilder : public DataSink { bool HashTableStoresNulls() const; /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode. - inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; } inline bool non_empty_build() const { return non_empty_build_; } inline const std::vector<bool>& is_not_distinct_from() const { return is_not_distinct_from_; @@ -200,24 +201,27 @@ class PhjBuilder : public DataSink { /// Spills this partition, the partition's stream is unpinned with 'mode' and /// its hash table is destroyed if it was built. - Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; + Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT; bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; } - BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); } + BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return build_rows_.get(); } HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } int ALWAYS_INLINE level() const { return level_; } private: - /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array - /// containing the index of each row's index into the hash table's tuple stream. + /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array + /// containing the rows in the hash table's tuple stream. /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash /// table buckets which the rows hashes to will be prefetched. This parameter is /// replaced with a constant during codegen time. This function may be replaced with /// a codegen'd version. Returns true if all rows in 'batch' are successfully - /// inserted. + /// inserted and false otherwise. If inserting failed, 'status' indicates why it + /// failed: if 'status' is ok, inserting failed because not enough reservation + /// was available and if 'status' is an error, inserting failed because of that error. bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, - RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices); + RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows, + Status* status); const PhjBuilder* parent_; @@ -235,16 +239,9 @@ class PhjBuilder : public DataSink { /// Stream of build tuples in this partition. Initially owned by this object but /// transferred to the parent exec node (via the row batch) when the partition /// is closed. If NULL, ownership has been transferred and the partition is closed. - std::unique_ptr<BufferedTupleStream> build_rows_; + std::unique_ptr<BufferedTupleStreamV2> build_rows_; }; - protected: - /// Init() function inherited from DataSink. Overridden to be a no-op for now. - /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink. - virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, - const TDataSink& tsink, RuntimeState* state) override; - - private: /// Computes the minimum number of buffers required to execute the spilling partitioned /// hash algorithm successfully for any input size (assuming enough disk space is /// available for spilled rows). The buffers are used for buffering both build and @@ -255,15 +252,22 @@ class PhjBuilder : public DataSink { /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_', /// 'null_aware_probe_partition_' and 'null_probe_rows_'. int MinRequiredBuffers() const { - // Must be kept in sync with HashJoinNode.computeResourceProfile() in fe. + // Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe. int num_reserved_buffers = PARTITION_FANOUT + 1; if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3; return num_reserved_buffers; } + protected: + /// Init() function inherited from DataSink. Overridden to be a no-op for now. + /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink. + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) override; + /// Free local allocations made from expr evaluators during hash table construction. void FreeLocalAllocations() const; + private: /// Create and initialize a set of hash partitions for partitioning level 'level'. /// The previous hash partitions must have been cleared with ClearHashPartitions(). /// After calling this, batches are added to the new partitions by calling Send(). @@ -284,19 +288,19 @@ class PhjBuilder : public DataSink { /// partitions. This odd return convention is used to avoid emitting unnecessary code /// for ~Status in perf-critical code. bool AppendRow( - BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; /// Slow path for AppendRow() above. It is called when the stream has failed to append /// the row. We need to find more memory by either switching to IO-buffers, in case the /// stream still uses small buffers, or spilling a partition. Returns false and sets /// 'status' if it was unable to append the row, even after spilling partitions. - bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, + bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed /// to the Spill() call for the selected partition. The current policy is to spill the /// largest partition. Returns non-ok status if we couldn't spill a partition. - Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; + Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT; /// Tries to build hash tables for all unspilled hash partitions. Called after /// FlushFinal() when all build rows have been partitioned and added to the appropriate @@ -358,14 +362,20 @@ class PhjBuilder : public DataSink { /// Pool for objects with same lifetime as builder. ObjectPool pool_; - /// Client to the buffered block mgr, used to allocate build partition buffers and hash - /// tables. When probing, the spilling algorithm keeps some build partitions in memory - /// while using memory for probe buffers for spilled partitions. To support dynamically - /// dividing memory between build and probe, this client is owned by the builder but - /// shared with the PartitionedHashJoinNode. + /// Client to the buffer pool, used to allocate build partition buffers and hash tables. + /// When probing, the spilling algorithm keeps some build partitions in memory while + /// using memory for probe buffers for spilled partitions. To support dynamically + /// dividing memory between build and probe, this client is shared between the builder + /// and the PartitionedHashJoinNode. /// TODO: this approach to sharing will not work for spilling broadcast joins with a /// 1:N relationship from builders to join nodes. - BufferedBlockMgr::Client* block_mgr_client_; + BufferPool::ClientHandle* buffer_pool_client_; + + /// The size of buffers to use in the build and probe streams. + const int64_t spillable_buffer_size_; + + /// Allocator for hash table memory. + boost::scoped_ptr<Suballocator> ht_allocator_; /// If true, the build side has at least one row. bool non_empty_build_; @@ -454,7 +464,7 @@ class PhjBuilder : public DataSink { /// /// Because of this, at the end of the build phase, we always have sufficient memory /// to execute the probe phase of the algorithm without spilling more partitions. - std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_; + std::vector<std::unique_ptr<BufferedTupleStreamV2>> spilled_partition_probe_streams_; /// END: Members that must be Reset() ///////////////////////////////////////// @@ -469,7 +479,7 @@ class PhjBuilder : public DataSink { ProcessBuildBatchFn process_build_batch_fn_level0_; typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*, - const std::vector<BufferedTupleStream::RowIdx>&); + const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*); /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. InsertBatchFn insert_batch_fn_; InsertBatchFn insert_batch_fn_level0_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index 2c951d1..b890eb9 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // The partition is not in memory, spill the probe row and move to the next row. // Skip the current row if we manage to append to the spilled partition's BTS. // Otherwise, we need to bail out and report the failure. - BufferedTupleStream* probe_rows = probe_partition->probe_rows(); + BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows(); if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) { DCHECK(!status->ok()); return false; @@ -438,9 +438,8 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode } inline bool PartitionedHashJoinNode::AppendProbeRow( - BufferedTupleStream* stream, TupleRow* row, Status* status) { - DCHECK(stream->has_write_block()); - DCHECK(!stream->using_small_buffers()); + BufferedTupleStreamV2* stream, TupleRow* row, Status* status) { + DCHECK(stream->has_write_iterator()); DCHECK(!stream->is_pinned()); return stream->AddRow(row, status); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index a5c9897..2db9e00 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -27,8 +27,7 @@ #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -47,9 +46,15 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "successfully."; using namespace impala; -using namespace llvm; -using namespace strings; -using std::unique_ptr; +using llvm::BasicBlock; +using llvm::ConstantInt; +using llvm::Function; +using llvm::GlobalValue; +using llvm::LLVMContext; +using llvm::PointerType; +using llvm::Type; +using llvm::Value; +using strings::Substitute; PartitionedHashJoinNode::PartitionedHashJoinNode( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -77,8 +82,9 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is // owned by this node, but duplicates some state (exprs, etc) in anticipation of it // being separated out further. - builder_.reset( - new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state)); + builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(), + child(1)->row_desc(), state, &buffer_pool_client_, + resource_profile_.spillable_buffer_size)); RETURN_IF_ERROR( builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters)); @@ -177,6 +183,11 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) { } Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) { + DCHECK_GE(resource_profile_.min_reservation, + resource_profile_.spillable_buffer_size * builder_->MinRequiredBuffers()); + if (!buffer_pool_client_.is_registered()) { + RETURN_IF_ERROR(ClaimBufferReservation(state)); + } if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { // Initialize these partitions before doing the build so that the build does not // use the reservation intended for them. @@ -254,12 +265,10 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) { PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition, - unique_ptr<BufferedTupleStream> probe_rows) - : parent_(parent), - build_partition_(build_partition), + unique_ptr<BufferedTupleStreamV2> probe_rows) + : build_partition_(build_partition), probe_rows_(std::move(probe_rows)) { - DCHECK(probe_rows_->has_write_block()); - DCHECK(!probe_rows_->using_small_buffers()); + DCHECK(probe_rows_->has_write_iterator()); DCHECK(!probe_rows_->is_pinned()); } @@ -270,10 +279,7 @@ PartitionedHashJoinNode::ProbePartition::~ProbePartition() { Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() { bool got_read_buffer; RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer)); - if (!got_read_buffer) { - return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_, - Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_)); - } + DCHECK(got_read_buffer) << "Accounted in min reservation"; return Status::OK(); } @@ -322,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch( probe_batch_pos_ = -1; return Status::OK(); } - BufferedTupleStream* probe_rows = input_partition_->probe_rows(); + BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows(); if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) { // Continue from the current probe stream. bool eos = false; @@ -414,12 +420,11 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe( ht_ctx_->set_level(next_partition_level); // Spill to free memory from hash tables and pinned streams for use in new partitions. - RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL)); + RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL)); // Temporarily free up the probe buffer to use when repartitioning. - RETURN_IF_ERROR( - input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); - DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString(); - DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString(); + input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString(); + DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString(); int64_t num_input_rows = build_partition->build_rows()->num_rows(); RETURN_IF_ERROR(builder_->RepartitionBuildInput( build_partition, next_partition_level, input_partition_->probe_rows())); @@ -430,7 +435,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe( "more rows than the input"; if (UNLIKELY(num_input_rows == largest_partition_rows)) { return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_, - next_partition_level, num_input_rows); + next_partition_level, num_input_rows, NodeDebugString(), + buffer_pool_client_.DebugString()); } RETURN_IF_ERROR(PrepareForProbe()); @@ -816,18 +822,18 @@ static Status NullAwareAntiJoinError(bool build) { Status PartitionedHashJoinNode::InitNullAwareProbePartition() { RuntimeState* state = runtime_state_; - unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>( - state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(), - false /* use_initial_small_buffers */, false /* read_write */); - Status status = probe_rows->Init(id(), runtime_profile(), false); + unique_ptr<BufferedTupleStreamV2> probe_rows = make_unique<BufferedTupleStreamV2>( + state, child(0)->row_desc(), &buffer_pool_client_, + resource_profile_.spillable_buffer_size, + resource_profile_.spillable_buffer_size); + // TODO: this should be pinned if spilling is disabled. + Status status = probe_rows->Init(id(), false); if (!status.ok()) goto error; bool got_buffer; status = probe_rows->PrepareForWrite(&got_buffer); if (!status.ok()) goto error; - if (!got_buffer) { - status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id()); - goto error; - } + DCHECK(got_buffer) + << "Accounted in min reservation" << buffer_pool_client_.DebugString(); null_aware_probe_partition_.reset(new ProbePartition( state, this, builder_->null_aware_partition(), std::move(probe_rows))); return Status::OK(); @@ -841,15 +847,15 @@ error: Status PartitionedHashJoinNode::InitNullProbeRows() { RuntimeState* state = runtime_state_; - null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(), - state->block_mgr(), builder_->block_mgr_client(), - false /* use_initial_small_buffers */, false /* read_write */); - RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false)); + null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, child(0)->row_desc(), + &buffer_pool_client_, resource_profile_.spillable_buffer_size, + resource_profile_.spillable_buffer_size); + // TODO: we shouldn't start with this unpinned if spilling is disabled. + RETURN_IF_ERROR(null_probe_rows_->Init(id(), false)); bool got_buffer; RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer)); - if (!got_buffer) { - return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id()); - } + DCHECK(got_buffer) + << "Accounted in min reservation" << buffer_pool_client_.DebugString(); return Status::OK(); } @@ -860,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { DCHECK_EQ(probe_batch_pos_, -1); DCHECK_EQ(probe_batch_->num_rows(), 0); - BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows(); - BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows(); + BufferedTupleStreamV2* build_stream = builder_->null_aware_partition()->build_rows(); + BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows(); if (build_stream->num_rows() == 0) { // There were no build rows. Nothing to do. Just prepare to output the null @@ -874,7 +880,7 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { // Bring the entire spilled build stream into memory and read into a single batch. bool got_rows; - RETURN_IF_ERROR(build_stream->GetRows(&nulls_build_batch_, &got_rows)); + RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, &got_rows)); if (!got_rows) return NullAwareAntiJoinError(true); // Initialize the streams for read. @@ -898,7 +904,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state, int num_join_conjuncts = other_join_conjuncts_.size(); DCHECK(probe_batch_ != NULL); - BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows(); + BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows(); if (probe_batch_pos_ == probe_batch_->num_rows()) { probe_batch_pos_ = 0; probe_batch_->TransferResourceOwnership(out_batch); @@ -946,7 +952,8 @@ Status PartitionedHashJoinNode::PrepareForProbe() { DCHECK(probe_hash_partitions_.empty()); // Initialize the probe partitions, providing them with probe streams. - vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams(); + vector<unique_ptr<BufferedTupleStreamV2>> probe_streams = + builder_->TransferProbeStreams(); probe_hash_partitions_.resize(PARTITION_FANOUT); for (int i = 0; i < PARTITION_FANOUT; ++i) { PhjBuilder::Partition* build_partition = builder_->hash_partition(i); @@ -982,16 +989,16 @@ Status PartitionedHashJoinNode::PrepareForProbe() { } void PartitionedHashJoinNode::CreateProbePartition( - int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) { + int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) { DCHECK_GE(partition_idx, 0); DCHECK_LT(partition_idx, probe_hash_partitions_.size()); DCHECK(probe_hash_partitions_[partition_idx] == NULL); - probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_, + probe_hash_partitions_[partition_idx] = make_unique<ProbePartition>(runtime_state_, this, builder_->hash_partition(partition_idx), std::move(probe_rows)); } Status PartitionedHashJoinNode::EvaluateNullProbe( - RuntimeState* state, BufferedTupleStream* build) { + RuntimeState* state, BufferedTupleStreamV2* build) { if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) { return Status::OK(); } @@ -1000,10 +1007,10 @@ Status PartitionedHashJoinNode::EvaluateNullProbe( // Bring both the build and probe side into memory and do a pairwise evaluation. bool got_rows; scoped_ptr<RowBatch> build_rows; - RETURN_IF_ERROR(build->GetRows(&build_rows, &got_rows)); + RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows)); if (!got_rows) return NullAwareAntiJoinError(true); scoped_ptr<RowBatch> probe_rows; - RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows)); + RETURN_IF_ERROR(null_probe_rows_->GetRows(mem_tracker(), &probe_rows, &got_rows)); if (!got_rows) return NullAwareAntiJoinError(false); ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data(); @@ -1060,11 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions( // can recurse the algorithm and create new hash partitions from spilled partitions. // TODO: we shouldn't need to unpin the build stream if we stop spilling // while probing. - RETURN_IF_ERROR( - build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); - DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0); - RETURN_IF_ERROR( - probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL)); + build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); + DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0); + probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL); if (probe_partition->probe_rows()->num_rows() != 0 || NeedToProcessUnmatchedBuildRows()) { @@ -1102,9 +1107,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions( // Just finished evaluating the null probe rows with all the non-spilled build // partitions. Unpin this now to free this memory for repartitioning. - if (null_probe_rows_ != NULL) - RETURN_IF_ERROR( - null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); + if (null_probe_rows_ != NULL) { + null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); + } builder_->ClearHashPartitions(); probe_hash_partitions_.clear(); @@ -1165,10 +1170,10 @@ string PartitionedHashJoinNode::NodeDebugString() const { ss << " Probe hash partition " << i << ": "; if (probe_partition != NULL) { ss << "probe ptr=" << probe_partition; - BufferedTupleStream* probe_rows = probe_partition->probe_rows(); + BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows(); if (probe_rows != NULL) { - ss << " Probe Rows: " << probe_rows->num_rows() - << " (Blocks pinned: " << probe_rows->blocks_pinned() << ")"; + ss << " Probe Rows: " << probe_rows->num_rows() + << " (Bytes pinned: " << probe_rows->BytesPinned(false) << ")"; } } ss << endl; @@ -1189,12 +1194,15 @@ string PartitionedHashJoinNode::NodeDebugString() const { } } if (input_partition_ != NULL) { - DCHECK(input_partition_->build_partition()->build_rows() != NULL); DCHECK(input_partition_->probe_rows() != NULL); - ss << "InputPartition: " << input_partition_.get() << endl - << " Spilled Build Rows: " - << input_partition_->build_partition()->build_rows()->num_rows() << endl - << " Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl; + ss << "InputPartition: " << input_partition_.get() << endl; + PhjBuilder::Partition* build_partition = input_partition_->build_partition(); + if (build_partition->IsClosed()) { + ss << " Build Partition Closed" << endl; + } else { + ss << " Build Rows: " << build_partition->build_rows()->num_rows() << endl; + } + ss << " Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl; } else { ss << "InputPartition: NULL" << endl; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 73e0dd5..b3f663e 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -15,28 +15,24 @@ // specific language governing permissions and limitations // under the License. - #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H -#include <boost/scoped_ptr.hpp> -#include <boost/thread.hpp> #include <list> #include <memory> #include <string> +#include <boost/scoped_ptr.hpp> +#include <boost/thread.hpp> #include "exec/blocking-join-node.h" #include "exec/exec-node.h" #include "exec/partitioned-hash-join-builder.h" -#include "runtime/buffered-block-mgr.h" #include "gen-cpp/Types_types.h" namespace impala { class BloomFilter; -class BufferedBlockMgr; -class BufferedTupleStream; class MemPool; class RowBatch; class RuntimeFilter; @@ -100,8 +96,6 @@ class TupleRow; /// NULLs into several different streams, which are processed in a separate step to /// produce additional output rows. The NAAJ algorithm is documented in more detail in /// header comments for the null aware functions and data structures. -/// -/// TODO: don't copy tuple rows so often. class PartitionedHashJoinNode : public BlockingJoinNode { public: PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode, @@ -168,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Creates an initialized probe partition at 'partition_idx' in /// 'probe_hash_partitions_'. void CreateProbePartition( - int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows); + int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows); /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have /// a write buffer allocated, so this will succeed unless an error is encountered. @@ -176,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// return convention is used to avoid emitting unnecessary code for ~Status in perf- /// critical code. bool AppendProbeRow( - BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; + BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; /// Probes the hash table for rows matching the current probe row and appends /// all the matching build rows (with probe row) to output batch. Returns true @@ -331,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// conjuncts pass (i.e. there is a match). /// This is used for NAAJ, when there are NULL probe rows. Status EvaluateNullProbe( - RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT; + RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT; /// Prepares to output NULLs on the probe side for NAAJ. Before calling this, /// matched_null_probe_ should have been fully evaluated. @@ -478,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// For NAAJ, this stream contains all probe rows that had NULL on the hash table /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches. - std::unique_ptr<BufferedTupleStream> null_probe_rows_; + std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_; /// For each row in null_probe_rows_, true if this row has matched any build row /// (i.e. the resulting joined row passes other_join_conjuncts). @@ -510,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// that has been prepared for writing with an I/O-sized write buffer. ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition, - std::unique_ptr<BufferedTupleStream> probe_rows); + std::unique_ptr<BufferedTupleStreamV2> probe_rows); ~ProbePartition(); /// Prepare to read the probe rows. Allocates the first read block, so reads will @@ -523,21 +517,19 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// resources if 'batch' is NULL. Idempotent. void Close(RowBatch* batch); - BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); } + BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); } PhjBuilder::Partition* build_partition() { return build_partition_; } inline bool IsClosed() const { return probe_rows_ == NULL; } private: - PartitionedHashJoinNode* parent_; - /// The corresponding build partition. Not NULL. Owned by PhjBuilder. PhjBuilder::Partition* build_partition_; /// Stream of probe tuples in this partition. Initially owned by this object but /// transferred to the parent exec node (via the row batch) when the partition /// is complete. If NULL, ownership was transferred and the partition is closed. - std::unique_ptr<BufferedTupleStream> probe_rows_; + std::unique_ptr<BufferedTupleStreamV2> probe_rows_; }; /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h index a53b40e..3441aac 100644 --- a/be/src/exec/partitioned-hash-join-node.inline.h +++ b/be/src/exec/partitioned-hash-join-node.inline.h @@ -20,7 +20,7 @@ #include "exec/partitioned-hash-join-node.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index fd42124..440f809 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -52,9 +52,12 @@ Status SortNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); - sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, - &row_descriptor_, mem_tracker(), runtime_profile(), state)); + sorter_.reset( + new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, mem_tracker(), + &buffer_pool_client_, resource_profile_.spillable_buffer_size, + runtime_profile(), state, id(), true)); RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool())); + DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation()); AddCodegenDisabledMessage(state); return Status::OK(); } @@ -69,9 +72,13 @@ void SortNode::Codegen(RuntimeState* state) { Status SortNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); - // Open the child before consuming resources in this node. - RETURN_IF_ERROR(child(0)->Open(state)); RETURN_IF_ERROR(ExecNode::Open(state)); + RETURN_IF_ERROR(child(0)->Open(state)); + // Claim reservation after the child has been opened to reduce the peak reservation + // requirement. + if (!buffer_pool_client_.is_registered()) { + RETURN_IF_ERROR(ClaimBufferReservation(state)); + } RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool())); RETURN_IF_ERROR(sorter_->Open()); RETURN_IF_CANCELLED(state); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h index 8b3de11..a11d424 100644 --- a/be/src/exec/sort-node.h +++ b/be/src/exec/sort-node.h @@ -20,13 +20,12 @@ #include "exec/exec-node.h" #include "runtime/sorter.h" -#include "runtime/buffered-block-mgr.h" namespace impala { /// Node that implements a full sort of its input with a fixed memory budget, spilling /// to disk if the input is larger than available memory. -/// Uses Sorter and BufferedBlockMgr for the external sort implementation. +/// Uses Sorter for the external sort implementation. /// Input rows to SortNode are materialized by the Sorter into a single tuple /// using the expressions specified in sort_tuple_exprs_. /// In GetNext(), SortNode passes in the output batch to the sorter instance created http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 2de0f2e..92af968 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -24,8 +24,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") add_library(Runtime - buffered-block-mgr.cc - buffered-tuple-stream.cc buffered-tuple-stream-v2.cc client-cache.cc coordinator.cc @@ -45,6 +43,7 @@ add_library(Runtime hbase-table.cc hbase-table-factory.cc hdfs-fs-cache.cc + initial-reservations.cc lib-cache.cc mem-tracker.cc mem-pool.cc @@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test) ADD_BE_TEST(data-stream-test) ADD_BE_TEST(timestamp-test) ADD_BE_TEST(disk-io-mgr-test) -ADD_BE_TEST(buffered-block-mgr-test) ADD_BE_TEST(parallel-executor-test) ADD_BE_TEST(raw-value-test) ADD_BE_TEST(string-compare-test) @@ -93,7 +91,6 @@ ADD_BE_TEST(thread-resource-mgr-test) ADD_BE_TEST(mem-tracker-test) ADD_BE_TEST(multi-precision-test) ADD_BE_TEST(decimal-test) -ADD_BE_TEST(buffered-tuple-stream-test) ADD_BE_TEST(buffered-tuple-stream-v2-test) ADD_BE_TEST(hdfs-fs-cache-test) ADD_BE_TEST(tmp-file-mgr-test)
