IMPALA-3090: always log memory limit errors Consistently log memory limit errors so that the error message contains a dump of the query memory trackers at the time that the memory limit was hit (instead of after the fact when the query is already partially cleaned up).
Testing: Exhaustive build passed. Ran local stress test for a bit. Change-Id: If5ec5572b0e26898da352b7e6b11eb01c6edb2e5 Reviewed-on: http://gerrit.cloudera.org:8080/4049 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7eb30309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7eb30309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7eb30309 Branch: refs/heads/master Commit: 7eb30309f3847f416f204bd5f7d6925102e94b67 Parents: d113205 Author: Tim Armstrong <[email protected]> Authored: Thu Aug 18 12:17:27 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Aug 19 05:36:02 2016 +0000 ---------------------------------------------------------------------- be/src/exec/analytic-eval-node.cc | 12 +++++++----- be/src/exec/partitioned-hash-join-node.cc | 20 ++++++++------------ be/src/runtime/buffered-block-mgr.cc | 20 +++++++++----------- be/src/runtime/mem-tracker.h | 4 +++- be/src/runtime/plan-fragment-executor.cc | 3 --- be/src/runtime/row-batch.cc | 1 - be/src/runtime/runtime-state.h | 2 ++ be/src/runtime/sorter.cc | 21 +++++++++------------ 8 files changed, 38 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/analytic-eval-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index d401909..c9e35d8 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -22,6 +22,7 @@ #include "exprs/agg-fn-evaluator.h" #include "runtime/buffered-tuple-stream.inline.h" #include "runtime/descriptors.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "udf/udf-internal.h" @@ -31,6 +32,11 @@ static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB +const string PREPARE_FOR_READ_FAILED_ERROR_MSG = + "Failed to acquire initial read buffer for analytic function evaluation. Reducing " + "query concurrency or increasing the memory limit may help this query to complete " + "successfully."; + using namespace strings; namespace impala { @@ -197,11 +203,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { bool got_read_buffer; RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer)); if (!got_read_buffer) { - Status status = Status::MemLimitExceeded(); - status.AddDetail("Failed to acquire initial read buffer for analytic function " - "evaluation. Reducing query concurrency or increasing the memory limit may " - "help this query to complete successfully."); - return status; + return mem_tracker()->MemLimitExceeded(state, PREPARE_FOR_READ_FAILED_ERROR_MSG); } DCHECK_EQ(evaluators_.size(), fn_ctxs_.size()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index a2b5001..d3aaf3d 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -659,9 +659,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level RETURN_IF_ERROR( input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer)); if (!got_read_buffer) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); - return status; + return mem_tracker()->MemLimitExceeded( + state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); } } @@ -829,9 +828,8 @@ Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) { bool got_read_buffer; RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer)); if (!got_read_buffer) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); - return status; + return mem_tracker()->MemLimitExceeded( + state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); } ht_ctx_->set_level(input_partition_->level_); @@ -1130,9 +1128,8 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() { bool got_read_buffer; RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(true, &got_read_buffer)); if (!got_read_buffer) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); - return status; + return mem_tracker()->MemLimitExceeded( + runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); } DCHECK_EQ(probe_batch_->num_rows(), 0); probe_batch_pos_ = 0; @@ -1211,9 +1208,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() { bool got_read_buffer; RETURN_IF_ERROR(probe_stream->PrepareForRead(true, &got_read_buffer)); if (!got_read_buffer) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); - return status; + return mem_tracker()->MemLimitExceeded( + runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_)); } probe_batch_pos_ = 0; return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index db62922..90c1041 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -403,16 +403,15 @@ bool BufferedBlockMgr::IsCancelled() { } Status BufferedBlockMgr::MemLimitTooLowError(Client* client, int node_id) { - // TODO: what to print here. We can't know the value of the entire query here. - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute("The memory limit is set too low to initialize spilling" - " operator (id=$0). The minimum required memory to spill this operator is $1.", - node_id, PrettyPrinter::Print(client->num_reserved_buffers_ * max_block_size(), - TUnit::BYTES))); VLOG_QUERY << "Query: " << query_id_ << ". Node=" << node_id << " ran out of memory: " << endl << DebugInternal() << endl << client->DebugString(); - return status; + int64_t min_memory = client->num_reserved_buffers_ * max_block_size(); + string msg = Substitute( + "The memory limit is set too low to initialize spilling operator (id=$0). The " + "minimum required memory to spill this operator is $1.", + node_id, PrettyPrinter::Print(min_memory, TUnit::BYTES)); + return client->tracker_->MemLimitExceeded(client->state_, msg); } Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block, @@ -1064,10 +1063,9 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { << endl << DebugInternal() << endl << client->DebugString(); VLOG_QUERY << ss.str(); } - Status status = Status::MemLimitExceeded(); - status.AddDetail("Query did not have enough memory to get the minimum required " - "buffers in the block manager."); - return status; + return client->tracker_->MemLimitExceeded(client->state_, + "Query did not have enough memory to get the minimum required buffers in the " + "block manager."); } DCHECK(buffer_desc != NULL); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 687ca18..2c8d0a9 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -344,8 +344,10 @@ class MemTracker { /// Log the memory usage when memory limit is exceeded and return a status object with /// details of the allocation which caused the limit to be exceeded. + /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If + /// 'failed_allocation_size' is zero, nothing about the allocation size is logged. Status MemLimitExceeded(RuntimeState* state, const std::string& details, - int64_t failed_allocation); + int64_t failed_allocation = 0); static const std::string COUNTER_NAME; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index 35ac757..c7bc3a6 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -523,9 +523,6 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) { { lock_guard<mutex> l(status_lock_); if (status_.ok()) { - // TODO: remove this once all locations which exceed query or process memory limit - // will log query memory usages with MemTracker::MemLimitExceeded(). - if (status.IsMemLimitExceeded()) runtime_state_->LogMemLimitExceeded(NULL, 0); status_ = status; } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index e602293..3d076bf 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -448,7 +448,6 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, *tuple_buffer_size = static_cast<int64_t>(row_size) * capacity_; *buffer = tuple_data_pool_.TryAllocate(*tuple_buffer_size); if (*buffer == NULL) { - Status status = Status::MemLimitExceeded(); return mem_tracker_->MemLimitExceeded(state, "Failed to allocate tuple buffer", *tuple_buffer_size); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 4d425b6..f86fa77 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -240,6 +240,8 @@ class RuntimeState { } /// Function for logging memory usages to the error log when memory limit is exceeded. + /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If + /// 'failed_allocation_size' is zero, nothing about the allocation size is logged. void LogMemLimitExceeded(const MemTracker* tracker, int64_t failed_allocation_size); /// Sets query_status_ to MEM_LIMIT_EXCEEDED and logs all the registered trackers. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/sorter.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index d04389a..6757be0 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -22,6 +22,7 @@ #include <gutil/strings/substitute.h> #include "runtime/buffered-block-mgr.h" +#include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" #include "runtime/sorted-run-merger.h" @@ -493,18 +494,16 @@ Status Sorter::Run::Init() { RETURN_IF_ERROR( sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block)); if (block == NULL) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed")); - return status; + return sorter_->mem_tracker_->MemLimitExceeded( + sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed")); } fixed_len_blocks_.push_back(block); if (has_var_len_slots_) { RETURN_IF_ERROR( sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block)); if (block == NULL) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable")); - return status; + return sorter_->mem_tracker_->MemLimitExceeded( + sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable")); } var_len_blocks_.push_back(block); if (initial_run_) { @@ -512,9 +511,8 @@ Status Sorter::Run::Init() { RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock( sorter_->block_mgr_client_, NULL, &var_len_copy_block_)); if (var_len_copy_block_ == NULL) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable")); - return status; + return sorter_->mem_tracker_->MemLimitExceeded( + sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable")); } } } @@ -1549,9 +1547,8 @@ Status Sorter::CreateMerger(int max_num_runs) { // TODO: IMPALA-3200: we should not need this logic once we have reliable // reservations (IMPALA-3200). if (merging_runs_.size() < 2) { - Status status = Status::MemLimitExceeded(); - status.AddDetail(Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size())); - return status; + return mem_tracker_->MemLimitExceeded( + state_, Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size())); } // Merge the runs that we were able to prepare. break;
