IMPALA-3647: track runtime filter memory in separate tracker

This change breaks out runtime filter memory consumption from the
query-wide tracker to improve debuggability of memory limit exceeded
errors.

Testing: ran exhaustive tests, ran local and cluster stress tests.

Change-Id: I9f28f3b55b5c62e6f0f9838c5947c9446d444d20
Reviewed-on: http://gerrit.cloudera.org:8080/3247
Reviewed-by: Tim Armstrong <[email protected]>
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/585ee48d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/585ee48d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/585ee48d

Branch: refs/heads/master
Commit: 585ee48dc77489f371cf477df544178ddf787fde
Parents: 7ac341d
Author: Tim Armstrong <[email protected]>
Authored: Tue May 24 23:37:28 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue May 31 23:32:12 2016 -0700

----------------------------------------------------------------------
 be/src/runtime/plan-fragment-executor.cc |  1 +
 be/src/runtime/runtime-filter-bank.cc    | 10 +++++++---
 be/src/runtime/runtime-filter-bank.h     |  5 +++++
 be/src/runtime/runtime-state.cc          | 13 +++++++------
 be/src/runtime/runtime-state.h           |  3 +++
 5 files changed, 23 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/585ee48d/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index ef39206..1d828ef 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -180,6 +180,7 @@ Status PlanFragmentExecutor::Prepare(const 
TExecPlanFragmentParams& request) {
   runtime_state_->InitMemTrackers(query_id_, 
&fragment_instance_ctx.request_pool,
       bytes_limit, rm_reservation_size_bytes);
   RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
+  runtime_state_->InitFilterBank();
 
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/585ee48d/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 91f12c9..36f7b5f 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -62,6 +62,9 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& 
query_ctx, RuntimeState* s
   default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
   default_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, 
max_filter_size_));
+
+  filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank",
+      state->instance_mem_tracker(), false));
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& 
filter_desc,
@@ -166,7 +169,7 @@ void RuntimeFilterBank::PublishGlobalFilter(int32_t 
filter_id,
         BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
     // Silently fail to publish the filter (replacing it with a 0-byte 
complete one) if
     // there's not enough memory for it.
-    if (!state_->query_mem_tracker()->TryConsume(required_space)) {
+    if (!filter_mem_tracker_->TryConsume(required_space)) {
       VLOG_QUERY << "No memory for global filter: " << filter_id
                  << " (fragment instance: " << state_->fragment_instance_id() 
<< ")";
       it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
@@ -191,7 +194,7 @@ BloomFilter* 
RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
   // Track required space
   int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
   int64_t required_space = 
BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
-  if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
+  if (!filter_mem_tracker_->TryConsume(required_space)) return NULL;
   BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
   DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
   memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
@@ -217,6 +220,7 @@ void RuntimeFilterBank::Close() {
   lock_guard<mutex> l(runtime_filter_lock_);
   closed_ = true;
   obj_pool_.Clear();
-  state_->query_mem_tracker()->Release(memory_allocated_->value());
+  filter_mem_tracker_->Release(memory_allocated_->value());
+  filter_mem_tracker_->UnregisterFromParent();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/585ee48d/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h 
b/be/src/runtime/runtime-filter-bank.h
index 4703a0f..1bf3870 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -19,12 +19,14 @@
 #include "runtime/types.h"
 #include "util/runtime-profile.h"
 
+#include <boost/scoped_ptr.hpp>
 #include <boost/thread/lock_guard.hpp>
 #include <boost/unordered_map.hpp>
 
 namespace impala {
 
 class BloomFilter;
+class MemTracker;
 class RuntimeFilter;
 class RuntimeState;
 class TBloomFilter;
@@ -127,6 +129,9 @@ class RuntimeFilterBank {
   /// Object pool to track allocated Bloom filters.
   ObjectPool obj_pool_;
 
+  /// MemTracker to track Bloom filter memory.
+  boost::scoped_ptr<MemTracker> filter_mem_tracker_;
+
   /// True iff Close() has been called. Used to prevent races between
   /// AllocateScratchBloomFilter() and Close().
   bool closed_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/585ee48d/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 3701907..afbe3b6 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -76,8 +76,7 @@ RuntimeState::RuntimeState(const TExecPlanFragmentParams& 
fragment_params,
         "Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
     is_cancelled_(false),
     query_resource_mgr_(NULL),
-    root_node_id_(-1),
-    filter_bank_(new RuntimeFilterBank(query_ctx(), this)) {
+    root_node_id_(-1) {
   Status status = Init(exec_env);
   DCHECK(status.ok()) << status.GetDetail();
 }
@@ -91,11 +90,9 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
     profile_(obj_pool_.get(), "<unnamed>"),
     is_cancelled_(false),
     query_resource_mgr_(NULL),
-    root_node_id_(-1),
-    filter_bank_(new RuntimeFilterBank(query_ctx, this)) {
+    root_node_id_(-1) {
   fragment_params_.__set_query_ctx(query_ctx);
-  fragment_params_.query_ctx.request.query_options
-      .__set_batch_size(DEFAULT_BATCH_SIZE);
+  
fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
 }
 
 RuntimeState::~RuntimeState() {
@@ -161,6 +158,10 @@ void RuntimeState::InitMemTrackers(const TUniqueId& 
query_id, const string* pool
       runtime_profile()->name(), query_mem_tracker_.get()));
 }
 
+void RuntimeState::InitFilterBank() {
+  filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this));
+}
+
 Status RuntimeState::CreateBlockMgr() {
   DCHECK(block_mgr_.get() == NULL);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/585ee48d/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 22cecaf..f104724 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -81,6 +81,9 @@ class RuntimeState {
   void InitMemTrackers(const TUniqueId& query_id, const std::string* 
request_pool,
       int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = 
-1);
 
+  /// Initializes the runtime filter bank. Must be called after 
InitMemTrackers().
+  void InitFilterBank();
+
   /// Gets/Creates the query wide block mgr.
   Status CreateBlockMgr();
 

Reply via email to