IMPALA-4674: Part 2: port backend exec to BufferPool Always create global BufferPool at startup using 80% of memory and limit reservations to 80% of query memory (same as BufferedBlockMgr). The query's initial reservation is computed in the planner, claimed centrally (managed by the InitialReservations class) and distributed to query operators from there.
min_spillable_buffer_size and default_spillable_buffer_size query options control the buffer size that the planner selects for spilling operators. Port ExecNodes to use BufferPool: * Each ExecNode has to claim its reservation during Open() * Port Sorter to use BufferPool. * Switch from BufferedTupleStream to BufferedTupleStreamV2 * Port HashTable to use BufferPool via a Suballocator. This also makes PAGG memory consumption more efficient (avoid wasting buffers) and improve the spilling algorithm: * Allow preaggs to execute with 0 reservation - if streams and hash tables cannot be allocated, it will pass through rows. * Halve the buffer requirement for spilling aggs - avoid allocating buffers for aggregated and unaggregated streams simultaneously. * Rebuild spilled partitions instead of repartitioning (IMPALA-2708) TODO in follow-up patches: * Rename BufferedTupleStreamV2 to BufferedTupleStream * Implement max_row_size query option. Testing: * Updated tests to reflect new memory requirements Change-Id: I7fc7fe1c04e9dfb1a0c749fb56a5e0f2bf9c6c3e Reviewed-on: http://gerrit.cloudera.org:8080/5801 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a98b90bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a98b90bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a98b90bd Branch: refs/heads/master Commit: a98b90bd3877886e97dc2385cfdf5e3f95245533 Parents: d5b0c6b Author: Tim Armstrong <[email protected]> Authored: Wed Mar 16 16:09:36 2016 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Aug 5 01:03:02 2017 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 2 +- be/src/exec/analytic-eval-node.cc | 53 +- be/src/exec/analytic-eval-node.h | 25 +- be/src/exec/exec-node.cc | 45 +- be/src/exec/exec-node.h | 17 + be/src/exec/hash-table-test.cc | 251 ++- be/src/exec/hash-table.cc | 149 +- be/src/exec/hash-table.h | 140 +- be/src/exec/hash-table.inline.h | 20 +- be/src/exec/nested-loop-join-builder.cc | 3 +- be/src/exec/partial-sort-node.cc | 7 +- be/src/exec/partial-sort-node.h | 1 - be/src/exec/partitioned-aggregation-node-ir.cc | 20 +- be/src/exec/partitioned-aggregation-node.cc | 639 ++++---- be/src/exec/partitioned-aggregation-node.h | 192 ++- be/src/exec/partitioned-hash-join-builder-ir.cc | 12 +- be/src/exec/partitioned-hash-join-builder.cc | 159 +- be/src/exec/partitioned-hash-join-builder.h | 76 +- be/src/exec/partitioned-hash-join-node-ir.cc | 7 +- be/src/exec/partitioned-hash-join-node.cc | 136 +- be/src/exec/partitioned-hash-join-node.h | 26 +- be/src/exec/partitioned-hash-join-node.inline.h | 2 +- be/src/exec/sort-node.cc | 15 +- be/src/exec/sort-node.h | 3 +- be/src/runtime/CMakeLists.txt | 5 +- be/src/runtime/buffered-block-mgr-test.cc | 1547 ------------------ be/src/runtime/buffered-block-mgr.cc | 1254 -------------- be/src/runtime/buffered-block-mgr.h | 606 ------- be/src/runtime/buffered-tuple-stream-test.cc | 1264 -------------- be/src/runtime/buffered-tuple-stream.cc | 903 ---------- be/src/runtime/buffered-tuple-stream.h | 561 ------- be/src/runtime/buffered-tuple-stream.inline.h | 59 - be/src/runtime/bufferpool/buffer-pool.cc | 12 +- be/src/runtime/bufferpool/buffer-pool.h | 8 + be/src/runtime/bufferpool/reservation-tracker.h | 4 + be/src/runtime/disk-io-mgr.cc | 7 +- be/src/runtime/exec-env.cc | 35 +- be/src/runtime/exec-env.h | 4 +- be/src/runtime/fragment-instance-state.cc | 2 - be/src/runtime/initial-reservations.cc | 90 + be/src/runtime/initial-reservations.h | 79 + be/src/runtime/query-exec-mgr.cc | 2 + be/src/runtime/query-state.cc | 91 +- be/src/runtime/query-state.h | 51 +- be/src/runtime/row-batch.cc | 19 - be/src/runtime/row-batch.h | 13 - be/src/runtime/runtime-filter.h | 1 + be/src/runtime/runtime-state.cc | 52 +- be/src/runtime/runtime-state.h | 32 +- be/src/runtime/sorter.cc | 1058 ++++++------ be/src/runtime/sorter.h | 65 +- be/src/runtime/test-env.cc | 23 +- be/src/runtime/test-env.h | 9 +- be/src/runtime/tmp-file-mgr-test.cc | 10 +- be/src/runtime/tmp-file-mgr.h | 23 +- be/src/service/client-request-state.cc | 4 +- be/src/service/query-options.cc | 28 +- be/src/service/query-options.h | 6 +- be/src/util/bloom-filter.h | 2 +- be/src/util/static-asserts.cc | 2 - common/thrift/Frontend.thrift | 16 +- common/thrift/ImpalaInternalService.thrift | 22 +- common/thrift/ImpalaService.thrift | 8 +- common/thrift/PlanNodes.thrift | 18 + common/thrift/generate_error_codes.py | 10 +- .../org/apache/impala/common/RuntimeEnv.java | 10 - .../apache/impala/planner/AggregationNode.java | 11 +- .../apache/impala/planner/AnalyticEvalNode.java | 7 +- .../impala/planner/DataSourceScanNode.java | 2 +- .../apache/impala/planner/DataStreamSink.java | 2 +- .../org/apache/impala/planner/EmptySetNode.java | 2 +- .../org/apache/impala/planner/ExchangeNode.java | 2 +- .../apache/impala/planner/HBaseScanNode.java | 2 +- .../apache/impala/planner/HBaseTableSink.java | 2 +- .../org/apache/impala/planner/HashJoinNode.java | 8 +- .../org/apache/impala/planner/HdfsScanNode.java | 4 +- .../apache/impala/planner/HdfsTableSink.java | 2 +- .../apache/impala/planner/JoinBuildSink.java | 2 +- .../org/apache/impala/planner/KuduScanNode.java | 2 +- .../apache/impala/planner/KuduTableSink.java | 2 +- .../impala/planner/NestedLoopJoinNode.java | 5 +- .../org/apache/impala/planner/PlanNode.java | 14 +- .../org/apache/impala/planner/PlanRootSink.java | 2 +- .../java/org/apache/impala/planner/Planner.java | 12 +- .../apache/impala/planner/ResourceProfile.java | 72 +- .../org/apache/impala/planner/SelectNode.java | 2 +- .../impala/planner/SingularRowSrcNode.java | 2 +- .../org/apache/impala/planner/SortNode.java | 47 +- .../org/apache/impala/planner/SubplanNode.java | 2 +- .../org/apache/impala/planner/UnionNode.java | 2 +- .../org/apache/impala/planner/UnnestNode.java | 2 +- .../org/apache/impala/service/Frontend.java | 2 +- .../org/apache/impala/planner/PlannerTest.java | 2 - .../queries/PlannerTest/constant-folding.test | 32 +- .../queries/PlannerTest/disable-codegen.test | 2 +- .../PlannerTest/fk-pk-join-detection.test | 52 +- .../queries/PlannerTest/mt-dop-validation.test | 30 +- .../queries/PlannerTest/parquet-filtering.test | 6 +- .../PlannerTest/resource-requirements.test | 418 ++--- .../PlannerTest/sort-expr-materialization.test | 30 +- .../PlannerTest/spillable-buffer-sizing.test | 112 +- .../queries/PlannerTest/tablesample.test | 4 +- .../queries/QueryTest/analytic-fns.test | 12 +- .../queries/QueryTest/explain-level0.test | 2 +- .../queries/QueryTest/explain-level1.test | 2 +- .../queries/QueryTest/explain-level2.test | 6 +- .../queries/QueryTest/explain-level3.test | 6 +- .../queries/QueryTest/nested-types-tpch.test | 24 +- .../QueryTest/runtime_row_filters_phj.test | 5 +- ...ingle-node-joins-with-limits-exhaustive.test | 2 +- .../QueryTest/single-node-large-sorts.test | 2 +- .../queries/QueryTest/spilling.test | 87 +- .../targeted-stress/queries/agg_stress.test | 2 +- .../workloads/tpch/queries/insert_parquet.test | 2 + tests/comparison/discrepancy_searcher.py | 4 +- tests/custom_cluster/test_scratch_disk.py | 12 +- tests/custom_cluster/test_spilling.py | 47 - tests/query_test/test_cancellation.py | 10 +- tests/query_test/test_mem_usage_scaling.py | 31 +- tests/query_test/test_nested_types.py | 1 - tests/query_test/test_scratch_limit.py | 12 +- tests/query_test/test_sort.py | 26 +- tests/query_test/test_spilling.py | 39 + 123 files changed, 2885 insertions(+), 8366 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 94dc86a..be4be80 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -119,7 +119,7 @@ ir_functions = [ ["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN", "_ZN6impala23PartitionedHashJoinNode17ProcessProbeBatchILi8EEEiNS_13TPrefetchMode4typeEPNS_8RowBatchEPNS_12HashTableCtxEPNS_6StatusE"], ["PHJ_INSERT_BATCH", - "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorINS_19BufferedTupleStream6RowIdxESaISA_EE"], + "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorIPhSaIS9_EEPNS_6StatusE"], ["HASH_TABLE_GET_HASH_SEED", "_ZNK6impala12HashTableCtx11GetHashSeedEv"], ["HASH_TABLE_GET_BUILD_EXPR_EVALUATORS", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index b789188..f6d96ae 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -23,9 +23,10 @@ #include "exprs/agg-fn-evaluator.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" #include "runtime/descriptors.h" #include "runtime/mem-tracker.h" +#include "runtime/query-state.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "udf/udf-internal.h" @@ -34,13 +35,14 @@ #include "common/names.h" static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB +static const int MIN_REQUIRED_BUFFERS = 2; using namespace strings; namespace impala { -AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) +AnalyticEvalNode::AnalyticEvalNode( + ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), window_(tnode.analytic_node.window), intermediate_tuple_desc_( @@ -51,7 +53,6 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode, rows_end_offset_(0), has_first_val_null_offset_(false), first_val_null_offset_(0), - client_(nullptr), child_tuple_cmp_row_(nullptr), last_result_idx_(-1), prev_pool_last_result_idx_(-1), @@ -110,6 +111,7 @@ AnalyticEvalNode::~AnalyticEvalNode() { Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Init(tnode, state)); DCHECK_EQ(conjunct_evals_.size(), 0); + state_ = state; const TAnalyticNode& analytic_node = tnode.analytic_node; bool has_lead_fn = false; @@ -154,6 +156,8 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(ExecNode::Prepare(state)); DCHECK(child(0)->row_desc()->IsPrefixOf(*row_desc())); + DCHECK_GE(resource_profile_.min_reservation, + resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS); curr_tuple_pool_.reset(new MemPool(mem_tracker())); prev_tuple_pool_.reset(new MemPool(mem_tracker())); mem_pool_.reset(new MemPool(mem_tracker())); @@ -175,12 +179,6 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) { fn_pool_.get(), &order_by_eq_expr_eval_)); AddEvaluatorToFree(order_by_eq_expr_eval_); } - - // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe. - const int MIN_REQUIRED_BUFFERS = 2; - RETURN_IF_ERROR(state->block_mgr()->RegisterClient( - Substitute("AnalyticEvalNode id=$0 ptr=$1", id_, this), - MIN_REQUIRED_BUFFERS, false, mem_tracker(), state, &client_)); return Status::OK(); } @@ -190,22 +188,20 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->Open(state)); - DCHECK(client_ != nullptr); - DCHECK(input_stream_ == nullptr); - input_stream_.reset( - new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_, - false /* use_initial_small_buffers */, true /* read_write */)); - RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true)); - bool got_write_buffer; - RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer)); - if (!got_write_buffer) { - return state->block_mgr()->MemLimitTooLowError(client_, id()); - } - bool got_read_buffer; - RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer)); - if (!got_read_buffer) { - return state->block_mgr()->MemLimitTooLowError(client_, id()); + + // Claim reservation after the child has been opened to reduce the peak reservation + // requirement. + if (!buffer_pool_client_.is_registered()) { + RETURN_IF_ERROR(ClaimBufferReservation(state)); } + DCHECK(input_stream_ == nullptr); + input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(), + &buffer_pool_client_, resource_profile_.spillable_buffer_size, + resource_profile_.spillable_buffer_size)); + RETURN_IF_ERROR(input_stream_->Init(id(), true)); + bool success; + RETURN_IF_ERROR(input_stream_->PrepareForReadWrite(true, &success)); + DCHECK(success) << "Had reservation: " << buffer_pool_client_.DebugString(); for (int i = 0; i < analytic_fn_evals_.size(); ++i) { RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state)); @@ -366,8 +362,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) { // the stream and continue writing/reading in unpinned mode. // TODO: Consider re-pinning later if the output stream is fully consumed. RETURN_IF_ERROR(status); - RETURN_IF_ERROR( - input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); + RETURN_IF_ERROR(state_->StartSpilling(mem_tracker())); + input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT); VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx; if (!input_stream_->AddRow(row, &status)) { // Rows should be added in unpinned mode unless an error occurs. @@ -627,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) { << " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes(); SCOPED_TIMER(evaluation_timer_); - // BufferedTupleStream::num_rows() returns the total number of rows that have been + // BufferedTupleStreamV2::num_rows() returns the total number of rows that have been // inserted into the stream (it does not decrease when we read rows), so the index of // the next input row that will be inserted will be the current size of the stream. int64_t stream_idx = input_stream_->num_rows(); @@ -857,7 +853,6 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) { void AnalyticEvalNode::Close(RuntimeState* state) { if (is_closed()) return; - if (client_ != nullptr) state->block_mgr()->ClearReservations(client_); // We may need to clean up input_stream_ if an error occurred at some point. if (input_stream_ != nullptr) { input_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h index 89c5cf3..eab9198 100644 --- a/be/src/exec/analytic-eval-node.h +++ b/be/src/exec/analytic-eval-node.h @@ -19,8 +19,7 @@ #define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H #include "exec/exec-node.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/buffered-tuple-stream.h" +#include "runtime/buffered-tuple-stream-v2.h" #include "runtime/tuple.h" namespace impala { @@ -189,6 +188,10 @@ class AnalyticEvalNode : public ExecNode { /// Debug string containing the window definition. std::string DebugWindowString() const; + /// The RuntimeState for the fragment instance containing this AnalyticEvalNode. Set + /// in Init(). + RuntimeState* state_; + /// Window over which the analytic functions are evaluated. Only used if fn_scope_ /// is ROWS or RANGE. /// TODO: fn_scope_ and window_ are candidates to be removed during codegen @@ -254,9 +257,6 @@ class AnalyticEvalNode : public ExecNode { boost::scoped_ptr<MemPool> curr_tuple_pool_; boost::scoped_ptr<MemPool> prev_tuple_pool_; - /// Block manager client used by input_stream_. Not owned. - BufferedBlockMgr::Client* client_ = nullptr; - ///////////////////////////////////////// /// BEGIN: Members that must be Reset() @@ -330,15 +330,16 @@ class AnalyticEvalNode : public ExecNode { /// Buffers input rows added in ProcessChildBatch() until enough rows are able to /// be returned by GetNextOutputBatch(), in which case row batches are returned from - /// the front of the stream and the underlying buffered blocks are deleted once read. + /// the front of the stream and the underlying buffers are deleted once read. /// The number of rows that must be buffered may vary from an entire partition (e.g. - /// no order by clause) to a single row (e.g. ROWS windows). When the amount of - /// buffered data exceeds the available memory in the underlying BufferedBlockMgr, - /// input_stream_ is unpinned (i.e., possibly spilled to disk if necessary). - /// The input stream owns tuple data backing rows returned in GetNext(). The blocks - /// with tuple data are attached to an output row batch on eos or ReachedLimit(). + /// no order by clause) to a single row (e.g. ROWS windows). If the amount of buffered + /// data in 'input_stream_' exceeds the ExecNode's buffer reservation and the stream + /// cannot increase the reservation, then 'input_stream_' is unpinned (i.e., spilled to + /// disk). The input stream owns tuple data backing rows returned in GetNext(). The + /// buffers with tuple data are attached to an output row batch on eos or + /// ReachedLimit(). /// TODO: Consider re-pinning unpinned streams when possible. - boost::scoped_ptr<BufferedTupleStream> input_stream_; + boost::scoped_ptr<BufferedTupleStreamV2> input_stream_; /// Pool used for O(1) allocations that live until Close() or Reset(). /// Does not own data backing tuples returned in GetNext(), so it does not http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index c3d9946..61c8d40 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -34,10 +34,10 @@ #include "exec/empty-set-node.h" #include "exec/exchange-node.h" #include "exec/hbase-scan-node.h" -#include "exec/hdfs-scan-node.h" #include "exec/hdfs-scan-node-mt.h" -#include "exec/kudu-scan-node.h" +#include "exec/hdfs-scan-node.h" #include "exec/kudu-scan-node-mt.h" +#include "exec/kudu-scan-node.h" #include "exec/kudu-util.h" #include "exec/nested-loop-join-node.h" #include "exec/partial-sort-node.h" @@ -50,9 +50,14 @@ #include "exec/topn-node.h" #include "exec/union-node.h" #include "exec/unnest-node.h" +#include "exprs/expr.h" +#include "gutil/strings/substitute.h" #include "runtime/descriptors.h" -#include "runtime/mem-tracker.h" +#include "runtime/exec-env.h" +#include "runtime/initial-reservations.h" #include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/query-state.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "util/debug-util.h" @@ -61,7 +66,10 @@ #include "common/names.h" using namespace llvm; +using strings::Substitute; +DECLARE_int32(be_port); +DECLARE_string(hostname); DEFINE_bool(enable_partitioned_hash_join, true, "Deprecated - has no effect"); DEFINE_bool(enable_partitioned_aggregation, true, "Deprecated - has no effect"); @@ -116,6 +124,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl type_(tnode.node_type), pool_(pool), row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples), + resource_profile_(tnode.resource_profile), debug_phase_(TExecNodePhase::INVALID), debug_action_(TDebugAction::WAIT), limit_(tnode.limit), @@ -195,7 +204,12 @@ void ExecNode::Close(RuntimeState* state) { ScalarExprEvaluator::Close(conjunct_evals_, state); ScalarExpr::Close(conjuncts_); if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll(); - + if (buffer_pool_client_.is_registered()) { + VLOG_FILE << id_ << " returning reservation " << resource_profile_.min_reservation; + state->query_state()->initial_reservations()->Return( + &buffer_pool_client_, resource_profile_.min_reservation); + state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_); + } if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) { LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl << state->instance_mem_tracker()->LogUsage(); @@ -204,6 +218,29 @@ void ExecNode::Close(RuntimeState* state) { } } +Status ExecNode::ClaimBufferReservation(RuntimeState* state) { + DCHECK(!buffer_pool_client_.is_registered()); + BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool(); + // Check the minimum buffer size in case the minimum buffer size used by the planner + // doesn't match this backend's. + if (resource_profile_.__isset.spillable_buffer_size && + resource_profile_.spillable_buffer_size < buffer_pool->min_buffer_len()) { + return Status(Substitute("Spillable buffer size for node $0 of $1 bytes is less " + "than the minimum buffer pool buffer size of $2 bytes", + id_, resource_profile_.spillable_buffer_size, buffer_pool->min_buffer_len())); + } + + RETURN_IF_ERROR(buffer_pool->RegisterClient( + Substitute("$0 id=$1 ptr=$2", PrintPlanNodeType(type_), id_, this), + state->query_state()->file_group(), state->instance_buffer_reservation(), + mem_tracker(), resource_profile_.max_reservation, runtime_profile(), + &buffer_pool_client_)); + VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation; + state->query_state()->initial_reservations()->Claim( + &buffer_pool_client_, resource_profile_.min_reservation); + return Status::OK(); +} + Status ExecNode::CreateTree( RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { if (plan.nodes.size() == 0) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index a107f62..60efff0 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -26,6 +26,8 @@ #include "common/status.h" #include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/PlanNodes_types.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/descriptors.h" // for RowDescriptor #include "util/blocking-queue.h" #include "util/runtime-profile.h" @@ -227,6 +229,12 @@ class ExecNode { protected: friend class DataSink; + /// Initialize 'buffer_pool_client_' and claim the initial reservation for this + /// ExecNode. Only needs to be called by ExecNodes that will use the client. + /// The client is automatically cleaned up in Close(). Should not be called if + /// the client is already open. + Status ClaimBufferReservation(RuntimeState* state); + /// Extends blocking queue for row batches. Row batches have a property that /// they must be processed in the order they were produced, even in cancellation /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches @@ -276,6 +284,9 @@ class ExecNode { std::vector<ExecNode*> children_; RowDescriptor row_descriptor_; + /// Resource information sent from the frontend. + const TBackendResourceProfile resource_profile_; + /// debug-only: if debug_action_ is not INVALID, node will perform action in /// debug_phase_ TExecNodePhase::type debug_phase_; @@ -298,6 +309,12 @@ class ExecNode { /// Created in Prepare(). boost::scoped_ptr<MemPool> expr_mem_pool_; + /// Buffer pool client for this node. Initialized with the node's minimum reservation + /// in ClaimBufferReservation(). After initialization, the client must hold onto at + /// least the minimum reservation so that it can be returned to the initial + /// reservations pool in Close(). + BufferPool::ClientHandle buffer_pool_client_; + bool is_closed() const { return is_closed_; } /// Pointer to the containing SubplanNode or NULL if not inside a subplan. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index 42bc7e1..7a6ec9d 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -17,24 +17,27 @@ #include <boost/scoped_ptr.hpp> -#include <stdlib.h> #include <stdio.h> +#include <stdlib.h> #include <iostream> +#include <limits> #include <vector> -#include "testutil/gtest-util.h" #include "common/compiler-util.h" #include "common/init.h" #include "exec/hash-table.inline.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/string-value.h" #include "runtime/test-env.h" #include "runtime/tuple-row.h" #include "service/fe-support.h" +#include "testutil/gtest-util.h" #include "util/cpu-info.h" #include "util/runtime-profile-counters.h" #include "util/test-info.h" @@ -51,9 +54,16 @@ class HashTableTest : public testing::Test { HashTableTest() : mem_pool_(&tracker_) {} protected: + /// Temporary runtime environment for the hash table. scoped_ptr<TestEnv> test_env_; RuntimeState* runtime_state_; + + /// Hash tables and associated clients - automatically closed in TearDown(). + vector<BufferPool::ClientHandle*> clients_; + vector<HashTable*> hash_tables_; + ObjectPool pool_; + /// A dummy MemTracker used for exprs and other things we don't need to have limits on. MemTracker tracker_; MemPool mem_pool_; vector<ScalarExpr*> build_exprs_; @@ -83,6 +93,8 @@ class HashTableTest : public testing::Test { ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_, &probe_expr_evals_)); ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr)); + + CreateTestEnv(); } virtual void TearDown() { @@ -90,9 +102,34 @@ class HashTableTest : public testing::Test { ScalarExprEvaluator::Close(probe_expr_evals_, nullptr); ScalarExpr::Close(build_exprs_); ScalarExpr::Close(probe_exprs_); + + for (HashTable* hash_table : hash_tables_) hash_table->Close(); + hash_tables_.clear(); + + for (BufferPool::ClientHandle* client : clients_) { + test_env_->exec_env()->buffer_pool()->DeregisterClient(client); + } + clients_.clear(); + runtime_state_ = nullptr; test_env_.reset(); mem_pool_.FreeAll(); + pool_.Clear(); + } + + /// Initialize test_env_ and runtime_state_ with the given page size and capacity + /// for the given number of pages. If test_env_ was already created, then re-creates it. + void CreateTestEnv(int64_t min_page_size = 64 * 1024, + int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024) { + test_env_.reset(new TestEnv()); + test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit); + ASSERT_OK(test_env_->Init()); + + TQueryOptions query_options; + query_options.__set_default_spillable_buffer_size(min_page_size); + query_options.__set_min_spillable_buffer_size(min_page_size); + query_options.__set_buffer_pool_limit(buffer_bytes_limit); + ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_)); } TupleRow* CreateTupleRow(int32_t val) { @@ -116,8 +153,9 @@ class HashTableTest : public testing::Test { // Wrapper to call private methods on HashTable // TODO: understand google testing, there must be a more natural way to do this - void ResizeTable(HashTable* table, int64_t new_size, HashTableCtx* ht_ctx) { - table->ResizeBuckets(new_size, ht_ctx); + Status ResizeTable( + HashTable* table, int64_t new_size, HashTableCtx* ht_ctx, bool* success) { + return table->ResizeBuckets(new_size, ht_ctx, success); } // Do a full table scan on table. All values should be between [min,max). If @@ -188,24 +226,41 @@ class HashTableTest : public testing::Test { } } - // Construct hash table with custom block manager. Returns result of HashTable::Init() - bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, - scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024, - int max_num_blocks = 100, int reserved_blocks = 10) { - EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr( - next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_)); + /// Construct hash table and buffer pool client. + /// Returns true if HashTable::Init() was successful. Created objects + /// and resources (e.g. reservations) are automatically freed in TearDown(). + bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, HashTable** table, + int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100, + int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) { + BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool(); + RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht")); + + // Set up memory tracking for the hash table. MemTracker* client_tracker = pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); - BufferedBlockMgr::Client* client; - EXPECT_OK(runtime_state_->block_mgr()->RegisterClient( - "", reserved_blocks, false, client_tracker, runtime_state_, &client)); + int64_t initial_reservation_bytes = block_size * initial_reserved_blocks; + int64_t max_reservation_bytes = block_size * max_num_blocks; + + // Set up the memory allocator. + BufferPool::ClientHandle* client = pool_.Add(new BufferPool::ClientHandle); + clients_.push_back(client); + EXPECT_OK(buffer_pool->RegisterClient("", nullptr, + runtime_state_->instance_buffer_reservation(), client_tracker, + max_reservation_bytes, profile, client)); + EXPECT_TRUE(client->IncreaseReservation(initial_reservation_bytes)); + Suballocator* allocator = + pool_.Add(new Suballocator(buffer_pool, client, suballocator_buffer_len)); // Initial_num_buckets must be a power of two. EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets)); int64_t max_num_buckets = 1L << 31; - table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, nullptr, - max_num_buckets, initial_num_buckets)); - return (*table)->Init(); + *table = pool_.Add(new HashTable( + quadratic, allocator, true, 1, nullptr, max_num_buckets, initial_num_buckets)); + hash_tables_.push_back(*table); + bool success; + Status status = (*table)->Init(&success); + EXPECT_OK(status); + return status.ok() && success; } // Constructs and closes a hash table. @@ -229,14 +284,12 @@ class HashTableTest : public testing::Test { EXPECT_EQ(*val_row4, 4); // Create and close the hash table. - scoped_ptr<HashTable> hash_table; + HashTable* hash_table; bool initialized = CreateHashTable(quadratic, initial_num_buckets, &hash_table); EXPECT_EQ(too_big, !initialized); if (initialized && initial_num_buckets > 0) { EXPECT_NE(hash_table->ByteSize(), 0); } - - hash_table->Close(); } // IMPALA-2897: Build rows that are equivalent (where nullptrs are counted as equivalent) @@ -246,7 +299,7 @@ class HashTableTest : public testing::Test { for (int i = 0; i < 2; ++i) build_rows[i] = CreateNullTupleRow(); // Create the hash table and insert the build rows - scoped_ptr<HashTable> hash_table; + HashTable* hash_table; ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table)); scoped_ptr<HashTableCtx> ht_ctx; EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_, @@ -256,13 +309,15 @@ class HashTableTest : public testing::Test { for (int i = 0; i < 2; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; - BufferedTupleStream::RowIdx dummy_row_idx; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); - bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]); + Status status; + bool inserted = + hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status); EXPECT_TRUE(inserted); + ASSERT_OK(status); } EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1); - hash_table->Close(); ht_ctx->Close(runtime_state_); } @@ -282,7 +337,7 @@ class HashTableTest : public testing::Test { } // Create the hash table and insert the build rows - scoped_ptr<HashTable> hash_table; + HashTable* hash_table; ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table)); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, @@ -290,52 +345,57 @@ class HashTableTest : public testing::Test { vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx); EXPECT_OK(status); EXPECT_OK(ht_ctx->Open(runtime_state_)); - bool success = hash_table->CheckAndResize(5, ht_ctx.get()); + bool success; + EXPECT_OK(hash_table->CheckAndResize(5, ht_ctx.get(), &success)); ASSERT_TRUE(success); for (int i = 0; i < 5; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; - BufferedTupleStream::RowIdx dummy_row_idx; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); - bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]); + bool inserted = + hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status); EXPECT_TRUE(inserted); + ASSERT_OK(status); } EXPECT_EQ(hash_table->size(), 5); // Do a full table scan and validate returned pointers - FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false); + FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false); // Double the size of the hash table and scan again. - ResizeTable(hash_table.get(), 2048, ht_ctx.get()); + EXPECT_OK(ResizeTable(hash_table, 2048, ht_ctx.get(), &success)); + EXPECT_TRUE(success); EXPECT_EQ(hash_table->num_buckets(), 2048); EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false); + FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false); // Try to shrink and scan again. - ResizeTable(hash_table.get(), 64, ht_ctx.get()); + EXPECT_OK(ResizeTable(hash_table, 64, ht_ctx.get(), &success)); + EXPECT_TRUE(success); EXPECT_EQ(hash_table->num_buckets(), 64); EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false); + FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false); // Resize to 8, which is the smallest value to fit the number of filled buckets. - ResizeTable(hash_table.get(), 8, ht_ctx.get()); + EXPECT_OK(ResizeTable(hash_table, 8, ht_ctx.get(), &success)); + EXPECT_TRUE(success); EXPECT_EQ(hash_table->num_buckets(), 8); EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false); + FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false); - hash_table->Close(); ht_ctx->Close(runtime_state_); } - void ScanTest(bool quadratic, int initial_size, int rows_to_insert, - int additional_rows) { - scoped_ptr<HashTable> hash_table; + void ScanTest( + bool quadratic, int initial_size, int rows_to_insert, int additional_rows) { + HashTable* hash_table; ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table)); int total_rows = rows_to_insert + additional_rows; @@ -347,19 +407,21 @@ class HashTableTest : public testing::Test { EXPECT_OK(ht_ctx->Open(runtime_state_)); // Add 1 row with val 1, 2 with val 2, etc. + bool success; vector<TupleRow*> build_rows; ProbeTestData* probe_rows = new ProbeTestData[total_rows]; probe_rows[0].probe_row = CreateTupleRow(0); for (int val = 1; val <= rows_to_insert; ++val) { - bool success = hash_table->CheckAndResize(val, ht_ctx.get()); + EXPECT_OK(hash_table->CheckAndResize(val, ht_ctx.get(), &success)); EXPECT_TRUE(success) << " failed to resize: " << val; probe_rows[val].probe_row = CreateTupleRow(val); for (int i = 0; i < val; ++i) { TupleRow* row = CreateTupleRow(val); if (!ht_ctx->EvalAndHashBuild(row)) continue; - BufferedTupleStream::RowIdx dummy_row_idx; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); - hash_table->Insert(ht_ctx.get(), dummy_row_idx, row); + ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status)); + ASSERT_OK(status); build_rows.push_back(row); probe_rows[val].expected_build_rows.push_back(row); } @@ -371,21 +433,22 @@ class HashTableTest : public testing::Test { } // Test that all the builds were found. - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true); // Resize and try again. int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows); - ResizeTable(hash_table.get(), target_size, ht_ctx.get()); + EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success)); + EXPECT_TRUE(success); EXPECT_EQ(hash_table->num_buckets(), target_size); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true); target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1); - ResizeTable(hash_table.get(), target_size, ht_ctx.get()); + EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success)); + EXPECT_TRUE(success); EXPECT_EQ(hash_table->num_buckets(), target_size); - ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true); + ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true); delete [] probe_rows; - hash_table->Close(); ht_ctx->Close(runtime_state_); } @@ -395,9 +458,11 @@ class HashTableTest : public testing::Test { uint64_t num_to_add = 4; int expected_size = 0; - MemTracker tracker(100 * 1024 * 1024); - scoped_ptr<HashTable> hash_table; - ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table)); + // Need enough memory for two hash table bucket directories during resize. + const int64_t mem_limit_mb = 128 + 64; + HashTable* hash_table; + ASSERT_TRUE( + CreateHashTable(quadratic, num_to_add, &hash_table, 1024 * 1024, mem_limit_mb)); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, probe_exprs_, false /* !stores_nulls_ */, @@ -408,27 +473,32 @@ class HashTableTest : public testing::Test { // entries. When num_to_add == 4, then the total number of inserts is 4194300. int build_row_val = 0; for (int i = 0; i < 20; ++i) { - // Currently the mem used for the bucket is not being tracked by the mem tracker. - // Thus the resize is expected to be successful. - // TODO: Keep track of the mem used for the buckets and test cases where we actually - // hit OOM. - // TODO: Insert duplicates to also hit OOM. - bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get()); - EXPECT_TRUE(success) << " failed to resize: " << num_to_add; + bool success; + EXPECT_OK(hash_table->CheckAndResize(num_to_add, ht_ctx.get(), &success)); + EXPECT_TRUE(success) << " failed to resize: " << num_to_add << "\n" + << tracker_.LogUsage() << "\n" + << clients_.back()->DebugString(); for (int j = 0; j < num_to_add; ++build_row_val, ++j) { TupleRow* row = CreateTupleRow(build_row_val); if (!ht_ctx->EvalAndHashBuild(row)) continue; - BufferedTupleStream::RowIdx dummy_row_idx; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); - bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row); + bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status); + ASSERT_OK(status); if (!inserted) goto done_inserting; } expected_size += num_to_add; num_to_add *= 2; } - done_inserting: - EXPECT_FALSE(tracker.LimitExceeded()); + done_inserting: EXPECT_EQ(hash_table->size(), 4194300); + + // The next allocation should put us over the limit, since we'll need 128MB for + // the old buckets and 256MB for the new buckets. + bool success; + EXPECT_OK(hash_table->CheckAndResize(num_to_add * 2, ht_ctx.get(), &success)); + EXPECT_FALSE(success); + // Validate that we can find the entries before we went over the limit for (int i = 0; i < expected_size * 5; i += 100000) { TupleRow* probe_row = CreateTupleRow(i); @@ -441,7 +511,34 @@ class HashTableTest : public testing::Test { EXPECT_TRUE(iter.AtEnd()) << " i: " << i; } } - hash_table->Close(); + + // Insert duplicates to also hit OOM. + int64_t num_duplicates_inserted = 0; + const int DUPLICATE_VAL = 1234; + while (true) { + TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL); + if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; + bool inserted = + hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status); + ASSERT_OK(status); + if (!inserted) break; + ++num_duplicates_inserted; + } + + // Check that the duplicates that we successfully inserted are all present. + TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL); + ASSERT_TRUE(ht_ctx->EvalAndHashProbe(duplicate_row)); + HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get()); + ValidateMatch(duplicate_row, iter.GetRow()); + for (int64_t i = 0; i < num_duplicates_inserted; ++i) { + ASSERT_FALSE(iter.AtEnd()); + iter.NextDuplicate(); + ValidateMatch(duplicate_row, iter.GetRow()); + } + iter.NextDuplicate(); + EXPECT_TRUE(iter.AtEnd()); + ht_ctx->Close(runtime_state_); } @@ -450,7 +547,7 @@ class HashTableTest : public testing::Test { // enough space in the hash table (it is also expected to be slow). It also expects that // a probe for a N+1 element will return BUCKET_NOT_FOUND. void InsertFullTest(bool quadratic, int table_size) { - scoped_ptr<HashTable> hash_table; + HashTable* hash_table; ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table)); EXPECT_EQ(hash_table->EmptyBuckets(), table_size); scoped_ptr<HashTableCtx> ht_ctx; @@ -472,10 +569,11 @@ class HashTableTest : public testing::Test { // Insert using both Insert() and FindBucket() methods. if (build_row_val % 2 == 0) { - BufferedTupleStream::RowIdx dummy_row_idx; + BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr; EXPECT_TRUE(hash_table->stores_tuples_); - bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row); + bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status); EXPECT_TRUE(inserted); + ASSERT_OK(status); } else { iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found); EXPECT_FALSE(iter.AtEnd()); @@ -511,20 +609,20 @@ class HashTableTest : public testing::Test { EXPECT_TRUE(iter.AtEnd()); EXPECT_FALSE(found); - hash_table->Close(); ht_ctx->Close(runtime_state_); } // This test makes sure we can tolerate the low memory case where we do not have enough // memory to allocate the array of buckets for the hash table. void VeryLowMemTest(bool quadratic) { - const int block_size = 2 * 1024; + const int64_t block_size = 2 * 1024; const int max_num_blocks = 1; - const int reserved_blocks = 0; const int table_size = 1024; - scoped_ptr<HashTable> hash_table; - ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size, - max_num_blocks, reserved_blocks)); + CreateTestEnv(block_size, block_size * max_num_blocks); + + HashTable* hash_table; + ASSERT_FALSE(CreateHashTable( + quadratic, table_size, &hash_table, block_size, max_num_blocks, 0, 1024)); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1, @@ -532,7 +630,6 @@ class HashTableTest : public testing::Test { EXPECT_OK(status); HashTable::Iterator iter = hash_table->Begin(ht_ctx.get()); EXPECT_TRUE(iter.AtEnd()); - hash_table->Close(); ht_ctx->Close(runtime_state_); } }; @@ -612,8 +709,6 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) { // Test that hashing empty string updates hash value. TEST_F(HashTableTest, HashEmpty) { - EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr( - 0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_)); scoped_ptr<HashTableCtx> ht_ctx; Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_, probe_exprs_, false /* !stores_nulls_ */, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index a4856e9..aacedc2 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -26,7 +26,7 @@ #include "exprs/slot-ref.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-block-mgr.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.inline.h" #include "runtime/runtime-state.h" @@ -37,8 +37,17 @@ #include "common/names.h" using namespace impala; -using namespace llvm; -using namespace strings; +using llvm::APFloat; +using llvm::ArrayRef; +using llvm::BasicBlock; +using llvm::ConstantFP; +using llvm::Function; +using llvm::LLVMContext; +using llvm::PHINode; +using llvm::PointerType; +using llvm::Type; +using llvm::Value; +using strings::Substitute; DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table"); @@ -85,12 +94,6 @@ static int64_t NULL_VALUE[] = { static_assert(sizeof(NULL_VALUE) >= ColumnType::MAX_CHAR_LENGTH, "NULL_VALUE must be at least as large as the largest possible slot"); -// The first NUM_SMALL_BLOCKS of nodes_ are made of blocks less than the IO size (of 8MB) -// to reduce the memory footprint of small queries. In particular, we always first use a -// 64KB and a 512KB block before starting using IO-sized blocks. -static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 }; -static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); - HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs, const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls, const std::vector<bool>& finds_nulls, int32_t initial_seed, @@ -378,21 +381,20 @@ void HashTableCtx::ExprValuesCache::ResetForRead() { ResetIterators(); } -const double HashTable::MAX_FILL_FACTOR = 0.75f; +constexpr double HashTable::MAX_FILL_FACTOR; +constexpr int64_t HashTable::DATA_PAGE_SIZE; -HashTable* HashTable::Create(RuntimeState* state, - BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples, - BufferedTupleStream* tuple_stream, int64_t max_num_buckets, +HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates, + int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets) { - return new HashTable(FLAGS_enable_quadratic_probing, state, client, stores_duplicates, + return new HashTable(FLAGS_enable_quadratic_probing, allocator, stores_duplicates, num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); } -HashTable::HashTable(bool quadratic_probing, RuntimeState* state, - BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples, - BufferedTupleStream* stream, int64_t max_num_buckets, int64_t num_buckets) - : state_(state), - block_mgr_client_(client), +HashTable::HashTable(bool quadratic_probing, Suballocator* allocator, + bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* stream, + int64_t max_num_buckets, int64_t num_buckets) + : allocator_(allocator), tuple_stream_(stream), stores_tuples_(num_build_tuples == 1), stores_duplicates_(stores_duplicates), @@ -410,26 +412,23 @@ HashTable::HashTable(bool quadratic_probing, RuntimeState* state, has_matches_(false), num_probes_(0), num_failed_probes_(0), travel_length_(0), num_hash_collisions_(0), num_resizes_(0) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; + DCHECK_EQ((num_buckets & (num_buckets - 1)), 0) << "num_buckets must be a power of 2"; DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0"; DCHECK(stores_tuples_ || stream != NULL); - DCHECK(client != NULL); } -bool HashTable::Init() { +Status HashTable::Init(bool* got_memory) { int64_t buckets_byte_size = num_buckets_ * sizeof(Bucket); - if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, buckets_byte_size)) { - num_buckets_ = 0; - return false; - } - buckets_ = reinterpret_cast<Bucket*>(malloc(buckets_byte_size)); - if (buckets_ == NULL) { - state_->block_mgr()->ReleaseMemory(block_mgr_client_, buckets_byte_size); + RETURN_IF_ERROR(allocator_->Allocate(buckets_byte_size, &bucket_allocation_)); + if (bucket_allocation_ == nullptr) { num_buckets_ = 0; - return false; + *got_memory = false; + return Status::OK(); } + buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data()); memset(buckets_, 0, buckets_byte_size); - return true; + *got_memory = true; + return Status::OK(); } void HashTable::Close() { @@ -439,36 +438,39 @@ void HashTable::Close() { const int64_t HEAVILY_USED = 1024 * 1024; // TODO: These statistics should go to the runtime profile as well. if ((num_buckets_ > LARGE_HT) || (num_probes_ > HEAVILY_USED)) VLOG(2) << PrintStats(); - for (int i = 0; i < data_pages_.size(); ++i) { - data_pages_[i]->Delete(); - } + for (auto& data_page : data_pages_) allocator_->Free(move(data_page)); + data_pages_.clear(); if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) { ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-total_data_page_size_); } - data_pages_.clear(); - if (buckets_ != NULL) free(buckets_); - state_->block_mgr()->ReleaseMemory(block_mgr_client_, num_buckets_ * sizeof(Bucket)); + if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_)); } -bool HashTable::CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx) { +Status HashTable::CheckAndResize( + uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, bool* got_memory) { uint64_t shift = 0; while (num_filled_buckets_ + buckets_to_fill > (num_buckets_ << shift) * MAX_FILL_FACTOR) { - // TODO: next prime instead of double? ++shift; } - if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx); - return true; + if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx, got_memory); + *got_memory = true; + return Status::OK(); } -bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) +Status HashTable::ResizeBuckets( + int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory) { + DCHECK_EQ((num_buckets & (num_buckets - 1)), 0) << "num_buckets=" << num_buckets << " must be a power of 2"; - DCHECK_GT(num_buckets, num_filled_buckets_) << "Cannot shrink the hash table to " - "smaller number of buckets than the number of filled buckets."; - VLOG(2) << "Resizing hash table from " - << num_buckets_ << " to " << num_buckets << " buckets."; - if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) return false; + DCHECK_GT(num_buckets, num_filled_buckets_) + << "Cannot shrink the hash table to smaller number of buckets than the number of " + << "filled buckets."; + VLOG(2) << "Resizing hash table from " << num_buckets_ << " to " << num_buckets + << " buckets."; + if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) { + *got_memory = false; + return Status::OK(); + } ++num_resizes_; // All memory that can grow proportional to the input should come from the block mgrs @@ -476,14 +478,16 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) { // Note that while we copying over the contents of the old hash table, we need to have // allocated both the old and the new hash table. Once we finish, we return the memory // of the old hash table. - int64_t old_size = num_buckets_ * sizeof(Bucket); + // int64_t old_size = num_buckets_ * sizeof(Bucket); int64_t new_size = num_buckets * sizeof(Bucket); - if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, new_size)) return false; - Bucket* new_buckets = reinterpret_cast<Bucket*>(malloc(new_size)); - if (new_buckets == NULL) { - state_->block_mgr()->ReleaseMemory(block_mgr_client_, new_size); - return false; + + unique_ptr<Suballocation> new_allocation; + RETURN_IF_ERROR(allocator_->Allocate(new_size, &new_allocation)); + if (new_allocation == NULL) { + *got_memory = false; + return Status::OK(); } + Bucket* new_buckets = reinterpret_cast<Bucket*>(new_allocation->data()); memset(new_buckets, 0, new_size); // Walk the old table and copy all the filled buckets to the new (resized) table. @@ -503,28 +507,22 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) { } num_buckets_ = num_buckets; - free(buckets_); - buckets_ = new_buckets; - state_->block_mgr()->ReleaseMemory(block_mgr_client_, old_size); - return true; + allocator_->Free(move(bucket_allocation_)); + bucket_allocation_ = move(new_allocation); + buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data()); + *got_memory = true; + return Status::OK(); } -bool HashTable::GrowNodeArray() { - int64_t page_size = 0; - page_size = state_->block_mgr()->max_block_size(); - if (data_pages_.size() < NUM_SMALL_DATA_PAGES) { - page_size = min(page_size, INITIAL_DATA_PAGE_SIZES[data_pages_.size()]); - } - BufferedBlockMgr::Block* block = NULL; - Status status = state_->block_mgr()->GetNewBlock( - block_mgr_client_, NULL, &block, page_size); - DCHECK(status.ok() || block == NULL); - if (block == NULL) return false; - data_pages_.push_back(block); - next_node_ = block->Allocate<DuplicateNode>(page_size); - ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(page_size); - node_remaining_current_page_ = page_size / sizeof(DuplicateNode); - total_data_page_size_ += page_size; +bool HashTable::GrowNodeArray(Status* status) { + unique_ptr<Suballocation> allocation; + *status = allocator_->Allocate(DATA_PAGE_SIZE, &allocation); + if (!status->ok() || allocation == nullptr) return false; + next_node_ = reinterpret_cast<DuplicateNode*>(allocation->data()); + data_pages_.push_back(move(allocation)); + ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(DATA_PAGE_SIZE); + node_remaining_current_page_ = DATA_PAGE_SIZE / sizeof(DuplicateNode); + total_data_page_size_ += DATA_PAGE_SIZE; return true; } @@ -533,8 +531,7 @@ void HashTable::DebugStringTuple(stringstream& ss, HtData& htdata, if (stores_tuples_) { ss << "(" << htdata.tuple << ")"; } else { - ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx() - << ", " << htdata.idx.offset() << ")"; + ss << "(" << htdata.flat_row << ")"; } if (desc != NULL) { Tuple* row[num_build_tuples_]; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index 9ba5b04..297e619 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -15,19 +15,21 @@ // specific language governing permissions and limitations // under the License. - #ifndef IMPALA_EXEC_HASH_TABLE_H #define IMPALA_EXEC_HASH_TABLE_H +#include <memory> #include <vector> #include <boost/cstdint.hpp> #include <boost/scoped_ptr.hpp> + #include "codegen/impala-ir.h" -#include "common/logging.h" #include "common/compiler-util.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/buffered-tuple-stream.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "common/logging.h" +#include "runtime/buffered-tuple-stream-v2.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/suballocator.h" #include "runtime/tuple-row.h" #include "util/bitmap.h" #include "util/hash-util.h" @@ -101,7 +103,6 @@ class HashTable; /// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular /// growing). /// TODO: Batched interface for inserts and finds. -/// TODO: Do we need to check mem limit exceeded so often. Check once per batch? /// TODO: as an optimization, compute variable-length data size for the agg node. /// Control block for a hash table. This class contains the logic as well as the variables @@ -525,13 +526,15 @@ class HashTableCtx { /// nodes do not contain the hash value, because all the linked nodes have the same hash /// value, the one in the bucket. The data is either a tuple stream index or a Tuple*. /// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The -/// data allocated by the hash table comes from the BufferedBlockMgr. +/// data allocated by the hash table comes from the BufferPool. class HashTable { private: - - /// Either the row in the tuple stream or a pointer to the single tuple of this row. + /// Rows are represented as pointers into the BufferedTupleStream data with one + /// of two formats, depending on the number of tuples in the row. union HtData { - BufferedTupleStream::RowIdx idx; + // For rows with multiple tuples per row, a pointer to the flattened TupleRow. + BufferedTupleStreamV2::FlatRowPtr flat_row; + // For rows with one tuple per row, a pointer to the Tuple itself. Tuple* tuple; }; @@ -584,7 +587,7 @@ class HashTable { /// Returns a newly allocated HashTable. The probing algorithm is set by the /// FLAG_enable_quadratic_probing. - /// - client: block mgr client to allocate data pages from. + /// - allocator: allocator to allocate bucket directory and data pages from. /// - stores_duplicates: true if rows with duplicate keys may be inserted into the /// hash table. /// - num_build_tuples: number of Tuples in the build tuple row. @@ -596,31 +599,35 @@ class HashTable { /// -1, if it unlimited. /// - initial_num_buckets: number of buckets that the hash table should be initialized /// with. - static HashTable* Create(RuntimeState* state, BufferedBlockMgr::Client* client, - bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream, - int64_t max_num_buckets, int64_t initial_num_buckets); + static HashTable* Create(Suballocator* allocator, bool stores_duplicates, + int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, + int64_t initial_num_buckets); - /// Allocates the initial bucket structure. Returns false if OOM. - bool Init(); + /// Allocates the initial bucket structure. Returns a non-OK status if an error is + /// encountered. If an OK status is returned , 'got_memory' is set to indicate whether + /// enough memory for the initial buckets was allocated from the Suballocator. + Status Init(bool* got_memory) WARN_UNUSED_RESULT; /// Call to cleanup any resources. Must be called once. void Close(); - /// Inserts the row to the hash table. Returns true if the insertion was successful. - /// Always returns true if the table has free buckets and the key is not a duplicate. - /// The caller is responsible for ensuring that the table has free buckets - /// 'idx' is the index into tuple_stream_ for this row. If the row contains more than - /// one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the - /// hash table and the caller must guarantee it stays in memory. This will not grow the - /// hash table. In the case that there is a need to insert a duplicate node, instead of - /// filling a new bucket, and there is not enough memory to insert a duplicate node, - /// the insert fails and this function returns false. - /// Used during the build phase of hash joins. + /// Inserts the row to the hash table. The caller is responsible for ensuring that the + /// table has free buckets. Returns true if the insertion was successful. Always + /// returns true if the table has free buckets and the key is not a duplicate. If the + /// key was a duplicate and memory could not be allocated for the new duplicate node, + /// returns false. If an error is encountered while creating a duplicate node, returns + /// false and sets 'status' to the error. + /// + /// 'flat_row' is a pointer to the flattened row in 'tuple_stream_' If the row contains + /// only one tuple, a pointer to that tuple is stored. Otherwise the 'flat_row' pointer + /// is stored. The 'row' is not copied by the hash table and the caller must guarantee + /// it stays in memory. This will not grow the hash table. bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx, - const BufferedTupleStream::RowIdx& idx, TupleRow* row); + BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, + Status* status) WARN_UNUSED_RESULT; /// Prefetch the hash table bucket which the given hash value 'hash' maps to. - template<const bool READ> + template <const bool READ> void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash); /// Returns an iterator to the bucket that matches the probe expression results that @@ -680,12 +687,17 @@ class HashTable { /// Calculates the fill factor if 'buckets_to_fill' additional buckets were to be /// filled and resizes the hash table so that the projected fill factor is below the /// max fill factor. - /// If it returns true, then it is guaranteed at least 'rows_to_add' rows can be - /// inserted without need to resize. - bool CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx); + /// If 'got_memory' is true, then it is guaranteed at least 'rows_to_add' rows can be + /// inserted without need to resize. If there is not enough memory available to + /// resize the hash table, Status::OK() is returned and 'got_memory' is false. If a + /// another error occurs, an error status may be returned. + Status CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, + bool* got_memory) WARN_UNUSED_RESULT; /// Returns the number of bytes allocated to the hash table from the block manager. - int64_t ByteSize() const { return num_buckets_ * sizeof(Bucket) + total_data_page_size_; } + int64_t ByteSize() const { + return num_buckets_ * sizeof(Bucket) + total_data_page_size_; + } /// Returns an iterator at the beginning of the hash table. Advancing this iterator /// will traverse all elements. @@ -792,7 +804,6 @@ class HashTable { TupleRow* scratch_row_; /// Current bucket idx. - /// TODO: Use uint32_t? int64_t bucket_idx_; /// Pointer to the current duplicate node. @@ -807,9 +818,9 @@ class HashTable { /// of calling this constructor directly. /// - quadratic_probing: set to true when the probing algorithm is quadratic, as /// opposed to linear. - HashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr::Client* client, - bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream, - int64_t max_num_buckets, int64_t initial_num_buckets); + HashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates, + int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets, + int64_t initial_num_buckets); /// Performs the probing operation according to the probing algorithm (linear or /// quadratic. Returns one of the following: @@ -839,8 +850,10 @@ class HashTable { HashTableCtx* ht_ctx, uint32_t hash, bool* found); /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node - /// where the data should be inserted. Returns NULL if the insert was not successful. - HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx); + /// where the data should be inserted. Returns NULL if the insert was not successful + /// and either sets 'status' to OK if it failed because not enough reservation was + /// available or the error if an error was encountered. + HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, Status* status); /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has /// duplicates, 'node' will be pointing to the head of the linked list of duplicates. @@ -848,8 +861,8 @@ class HashTable { /// 'bucket_idx' to BUCKET_NOT_FOUND. void NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node); - /// Resize the hash table to 'num_buckets'. Returns false on OOM. - bool ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx); + /// Resize the hash table to 'num_buckets'. 'got_memory' is false on OOM. + Status ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory); /// Appends the DuplicateNode pointed by next_node_ to 'bucket' and moves the next_node_ /// pointer to the next DuplicateNode in the page, updating the remaining node counter. @@ -862,9 +875,10 @@ class HashTable { /// the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the /// bucket are copied to a DuplicateNode and 'data' is updated to pointing to a /// DuplicateNode. - /// Returns NULL if the node array could not grow, i.e. there was not enough memory to - /// allocate a new DuplicateNode. - DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx); + /// Returns NULL and sets 'status' to OK if the node array could not grow, i.e. there + /// was not enough memory to allocate a new DuplicateNode. Returns NULL and sets + /// 'status' to an error if another error was encountered. + DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx, Status* status); /// Resets the contents of the empty bucket with index 'bucket_idx', in preparation for /// an insert. Sets all the fields of the bucket other than 'data'. @@ -877,8 +891,10 @@ class HashTable { /// returns the content of the first chained duplicate node of the bucket. TupleRow* GetRow(Bucket* bucket, TupleRow* row) const; - /// Grow the node array. Returns false on OOM. - bool GrowNodeArray(); + /// Grow the node array. Returns true and sets 'status' to OK on success. Returns false + /// and set 'status' to OK if we can't get sufficient reservation to allocate the next + /// data page. Returns false and sets 'status' if another error is encountered. + bool GrowNodeArray(Status* status); /// Functions to be replaced by codegen to specialize the hash table. bool IR_NO_INLINE stores_tuples() const { return stores_tuples_; } @@ -887,20 +903,26 @@ class HashTable { /// Load factor that will trigger growing the hash table on insert. This is /// defined as the number of non-empty buckets / total_buckets - static const double MAX_FILL_FACTOR; + static constexpr double MAX_FILL_FACTOR = 0.75; + + /// The size in bytes of each page of duplicate nodes. Should be large enough to fit + /// enough DuplicateNodes to amortise the overhead of allocating each page and low + /// enough to not waste excessive memory to internal fragmentation. + static constexpr int64_t DATA_PAGE_SIZE = 64L * 1024; RuntimeState* state_; - /// Client to allocate data pages with. - BufferedBlockMgr::Client* block_mgr_client_; + /// Suballocator to allocate data pages and hash table buckets with. + Suballocator* allocator_; /// Stream contains the rows referenced by the hash table. Can be NULL if the /// row only contains a single tuple, in which case the TupleRow indirection /// is removed by the hash table. - BufferedTupleStream* tuple_stream_; + BufferedTupleStreamV2* tuple_stream_; - /// Constants on how the hash table should behave. Joins and aggs have slightly - /// different behavior. + /// Constants on how the hash table should behave. + + /// True if the HtData uses the Tuple* representation, or false if it uses FlatRowPtr. const bool stores_tuples_; /// True if duplicates may be inserted into hash table. @@ -909,8 +931,9 @@ class HashTable { /// Quadratic probing enabled (as opposed to linear). const bool quadratic_probing_; - /// Data pages for all nodes. These are always pinned. - std::vector<BufferedBlockMgr::Block*> data_pages_; + /// Data pages for all nodes. Allocated from suballocator to reduce memory + /// consumption of small tables. + std::vector<std::unique_ptr<Suballocation>> data_pages_; /// Byte size of all buffers in data_pages_. int64_t total_data_page_size_; @@ -926,8 +949,10 @@ class HashTable { const int64_t max_num_buckets_; - /// Array of all buckets. Owned by this node. Using c-style array to control - /// control memory footprint. + /// Allocation containing all buckets. + std::unique_ptr<Suballocation> bucket_allocation_; + + /// Pointer to the 'buckets_' array from 'bucket_allocation_'. Bucket* buckets_; /// Total number of buckets (filled and empty). @@ -943,9 +968,8 @@ class HashTable { /// Number of build tuples, used for constructing temp row* for probes. const int num_build_tuples_; - /// Flag used to disable spilling hash tables that already had matches in case of - /// right joins (IMPALA-1488). - /// TODO: Not fail when spilling hash tables with matches in right joins + /// Flag used to check that we don't lose stored matches when spilling hash tables + /// (IMPALA-1488). bool has_matches_; /// The stats below can be used for debugging perf. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h index aff7c14..ce2f784 100644 --- a/be/src/exec/hash-table.inline.h +++ b/be/src/exec/hash-table.inline.h @@ -90,7 +90,8 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets, return Iterator::BUCKET_NOT_FOUND; } -inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) { +inline HashTable::HtData* HashTable::InsertInternal( + HashTableCtx* ht_ctx, Status* status) { ++num_probes_; bool found = false; uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash(); @@ -98,7 +99,7 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) { DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND); if (found) { // We need to insert a duplicate node, note that this may fail to allocate memory. - DuplicateNode* new_node = InsertDuplicateNode(bucket_idx); + DuplicateNode* new_node = InsertDuplicateNode(bucket_idx, status); if (UNLIKELY(new_node == NULL)) return NULL; return &new_node->htdata; } else { @@ -108,14 +109,14 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) { } inline bool HashTable::Insert(HashTableCtx* ht_ctx, - const BufferedTupleStream::RowIdx& idx, TupleRow* row) { - HtData* htdata = InsertInternal(ht_ctx); + BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) { + HtData* htdata = InsertInternal(ht_ctx, status); // If successful insert, update the contents of the newly inserted entry with 'idx'. if (LIKELY(htdata != NULL)) { if (stores_tuples()) { htdata->tuple = row->GetTuple(0); } else { - htdata->idx = idx; + htdata->flat_row = flat_row; } return true; } @@ -213,7 +214,8 @@ inline HashTable::DuplicateNode* HashTable::AppendNextNode(Bucket* bucket) { return next_node_++; } -inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_idx) { +inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode( + int64_t bucket_idx, Status* status) { DCHECK_GE(bucket_idx, 0); DCHECK_LT(bucket_idx, num_buckets_); Bucket* bucket = &buckets_[bucket_idx]; @@ -222,12 +224,12 @@ inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_i // Allocate one duplicate node for the new data and one for the preexisting data, // if needed. while (node_remaining_current_page_ < 1 + !bucket->hasDuplicates) { - if (UNLIKELY(!GrowNodeArray())) return NULL; + if (UNLIKELY(!GrowNodeArray(status))) return NULL; } if (!bucket->hasDuplicates) { // This is the first duplicate in this bucket. It means that we need to convert // the current entry in the bucket to a node and link it from the bucket. - next_node_->htdata.idx = bucket->bucketData.htdata.idx; + next_node_->htdata.flat_row = bucket->bucketData.htdata.flat_row; DCHECK(!bucket->matched); next_node_->matched = false; next_node_->next = NULL; @@ -246,7 +248,7 @@ inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(HtData& htdata, TupleRow* ro return reinterpret_cast<TupleRow*>(&htdata.tuple); } else { // TODO: GetTupleRow() has interpreted code that iterates over the row's descriptor. - tuple_stream_->GetTupleRow(htdata.idx, row); + tuple_stream_->GetTupleRow(htdata.flat_row, row); return row; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/nested-loop-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc index 67e6ed6..fdd94ee 100644 --- a/be/src/exec/nested-loop-join-builder.cc +++ b/be/src/exec/nested-loop-join-builder.cc @@ -45,8 +45,7 @@ Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) { build_batch->AcquireState(batch); AddBuildBatch(build_batch); - if (build_batch->needs_deep_copy() || build_batch->num_blocks() > 0 - || build_batch->num_buffers() > 0) { + if (build_batch->needs_deep_copy() || build_batch->num_buffers() > 0) { // This batch and earlier batches may refer to resources passed from the child // that aren't owned by the row batch itself. Deep copying ensures that the row // batches are backed by memory owned by this node that is safe to hold on to. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc index 4f485d5..88b2f26 100644 --- a/be/src/exec/partial-sort-node.cc +++ b/be/src/exec/partial-sort-node.cc @@ -58,8 +58,10 @@ Status PartialSortNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Prepare(state)); less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_)); sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, - mem_tracker(), runtime_profile(), state, false)); + mem_tracker(), &buffer_pool_client_, resource_profile_.spillable_buffer_size, + runtime_profile(), state, id(), false)); RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool())); + DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation()); AddCodegenDisabledMessage(state); input_batch_.reset( new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker())); @@ -81,6 +83,9 @@ Status PartialSortNode::Open(RuntimeState* state) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->Open(state)); + if (!buffer_pool_client_.is_registered()) { + RETURN_IF_ERROR(ClaimBufferReservation(state)); + } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h index ab4c547..d40d653 100644 --- a/be/src/exec/partial-sort-node.h +++ b/be/src/exec/partial-sort-node.h @@ -19,7 +19,6 @@ #define IMPALA_EXEC_PARTIAL_SORT_NODE_H #include "exec/exec-node.h" -#include "runtime/buffered-block-mgr.h" #include "runtime/sorter.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index cd5d336..126a2a5 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -21,7 +21,7 @@ #include "exprs/agg-fn-evaluator.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" -#include "runtime/buffered-tuple-stream.inline.h" +#include "runtime/buffered-tuple-stream-v2.inline.h" #include "runtime/row-batch.h" #include "runtime/tuple-row.h" @@ -46,7 +46,8 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch, // will end up to the same partition. // TODO: Once we have a histogram with the number of rows per partition, we will have // accurate resize calls. - RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx)); + RETURN_IF_ERROR( + CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx)); HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); const int cache_size = expr_vals_cache->capacity(); @@ -108,6 +109,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row, // so we can try again to insert the row. HashTable* hash_tbl = GetHashTable(partition_idx); Partition* dst_partition = hash_partitions_[partition_idx]; + DCHECK(dst_partition != nullptr); DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL); if (hash_tbl == NULL) { // This partition is already spilled, just append the row. @@ -155,24 +157,13 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ } // We did not have enough memory to add intermediate_tuple to the stream. - RETURN_IF_ERROR(SpillPartition()); + RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS)); if (partition->is_spilled()) { return AppendSpilledRow<AGGREGATED_ROWS>(partition, row); } } } -template<bool AGGREGATED_ROWS> -Status PartitionedAggregationNode::AppendSpilledRow(Partition* __restrict__ partition, - TupleRow* __restrict__ row) { - DCHECK(!is_streaming_preagg_); - DCHECK(partition->is_spilled()); - BufferedTupleStream* stream = AGGREGATED_ROWS ? - partition->aggregated_row_stream.get() : - partition->unaggregated_row_stream.get(); - return AppendSpilledRow(stream, row); -} - Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) { @@ -230,6 +221,7 @@ bool PartitionedAggregationNode::TryAddToHashTable( DCHECK(remaining_capacity != NULL); DCHECK_EQ(hash_tbl, partition->hash_tbl.get()); DCHECK_GE(*remaining_capacity, 0); + if (hash_tbl == nullptr) return false; // Hash table was not created - pass through. bool found; // This is called from ProcessBatchStreaming() so the rows are not aggregated. HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
