Repository: incubator-impala
Updated Branches:
  refs/heads/master f14e68c72 -> 4456ead84


IMPALA-5715: (mitigation only) defer destruction of MemTrackers

One potential candidate for the bad MemTracker IMPALA-5715 is one owned
by a join build sink. I haven't found a clear root cause, but we can
reduce the probability of bugs like this by deferring teardown of the
MemTrackers.

This patch defers destruction of the fragment instance, ExecNode,
DataSink and Codegen MemTrackers until query teardown. Instead
MemTracker::Close() is called at the place where the MemTracker
would have been destroyed to check that all memory is released
and enforce that no more memory is consumed. The entire query
MemTracker subtree is then disconnected in whole from the global
tree in QueryState::ReleaseResources(), instead of the MemTrackers
being incrementally disconnected bottom-up.

In cases where the MemTracker is owned by another object, this
required deferring teardown of the owner until query teardown.
E.g. for LlvmCodeGen I added a Close() method to release resources
and deferred calling the destructor.

We want to make this change anyway - see IMPALA-5587.

Testing:
Ran a core ASAN build.

Change-Id: I205abb0076d1ffd08cb93c0f1671c8b81e7fba0f
Reviewed-on: http://gerrit.cloudera.org:8080/7492
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/27dcc768
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/27dcc768
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/27dcc768

Branch: refs/heads/master
Commit: 27dcc768d20e11f29a2d24059214dfc2bacef229
Parents: f14e68c
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Jul 24 09:19:47 2017 -0700
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Tue Aug 8 19:43:36 2017 +0000

----------------------------------------------------------------------
 be/src/codegen/llvm-codegen-test.cc             | 11 ++++
 be/src/codegen/llvm-codegen.cc                  | 17 ++++--
 be/src/codegen/llvm-codegen.h                   | 10 ++--
 be/src/exec/data-sink.cc                        | 10 +---
 be/src/exec/exchange-node.cc                    |  1 -
 be/src/exec/exec-node.cc                        | 14 +++--
 be/src/exec/nested-loop-join-node.cc            |  5 +-
 be/src/exprs/expr-codegen-test.cc               |  1 +
 .../bufferpool/reservation-tracker-test.cc      | 12 ++---
 be/src/runtime/coordinator.cc                   | 18 +++----
 be/src/runtime/coordinator.h                    |  5 +-
 be/src/runtime/data-stream-recvr.cc             |  3 +-
 be/src/runtime/disk-io-mgr.cc                   |  3 +-
 be/src/runtime/initial-reservations.cc          |  9 ++--
 be/src/runtime/initial-reservations.h           |  2 +
 be/src/runtime/mem-tracker.cc                   | 22 ++++++--
 be/src/runtime/mem-tracker.h                    | 57 ++++++++++++--------
 be/src/runtime/query-state.cc                   |  9 +++-
 be/src/runtime/runtime-filter-bank.cc           |  2 +-
 be/src/runtime/runtime-state.cc                 | 26 ++++-----
 be/src/runtime/runtime-state.h                  |  9 ++--
 21 files changed, 144 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc 
b/be/src/codegen/llvm-codegen-test.cc
index 719a84e..61b942f 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -29,6 +29,7 @@
 #include "util/cpu-info.h"
 #include "util/hash-util.h"
 #include "util/path-builder.h"
+#include "util/scope-exit-trigger.h"
 #include "util/test-info.h"
 
 #include "common/names.h"
@@ -65,6 +66,10 @@ class LlvmCodeGenTest : public testing:: Test {
       ASSERT_OK(object1.Init(unique_ptr<Module>(new Module("Test", 
object1.context()))));
       ASSERT_OK(object2.Init(unique_ptr<Module>(new Module("Test", 
object2.context()))));
       ASSERT_OK(object3.Init(unique_ptr<Module>(new Module("Test", 
object3.context()))));
+
+      object1.Close();
+      object2.Close();
+      object3.Close();
     }
   }
 
@@ -113,6 +118,7 @@ TEST_F(LlvmCodeGenTest, BadIRFile) {
   string module_file = "NonExistentFile.ir";
   scoped_ptr<LlvmCodeGen> codegen;
   EXPECT_FALSE(CreateFromFile(module_file.c_str(), &codegen).ok());
+  codegen->Close();
 }
 
 // IR for the generated linner loop
@@ -236,6 +242,7 @@ TEST_F(LlvmCodeGenTest, ReplaceFnCall) {
   TestLoopFn new_loop_fn2 = reinterpret_cast<TestLoopFn>(new_loop2);
   new_loop_fn2(5);
   EXPECT_EQ(0, jitted_counter);
+  codegen->Close();
 }
 
 // Test function for c++/ir interop for strings.  Function will do:
@@ -325,6 +332,7 @@ TEST_F(LlvmCodeGenTest, StringValue) {
   int32_t* bytes = reinterpret_cast<int32_t*>(&str_val);
   EXPECT_EQ(1, bytes[2]);   // str_val.len
   EXPECT_EQ(0, bytes[3]);   // padding
+  codegen->Close();
 }
 
 // Test calling memcpy intrinsic
@@ -362,6 +370,7 @@ TEST_F(LlvmCodeGenTest, MemcpyTest) {
   test_fn(dst, src, 4);
 
   EXPECT_EQ(memcmp(src, dst, 4), 0);
+  codegen->Close();
 }
 
 // Test codegen for hash
@@ -377,6 +386,8 @@ TEST_F(LlvmCodeGenTest, HashTest) {
     scoped_ptr<LlvmCodeGen> codegen;
     ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", 
&codegen));
     ASSERT_TRUE(codegen.get() != NULL);
+    const auto close_codegen =
+        MakeScopeExitTrigger([&codegen]() { codegen->Close(); });
 
     Value* llvm_data1 =
         codegen->CastPtrToLlvmPtr(codegen->ptr_type(), 
const_cast<char*>(data1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc
index 13501fe..0e2235c 100644
--- a/be/src/codegen/llvm-codegen.cc
+++ b/be/src/codegen/llvm-codegen.cc
@@ -160,6 +160,7 @@ Status LlvmCodeGen::InitializeLlvm(bool load_backend) {
 
   // Initialize the global shared call graph.
   shared_call_graph_.Init(init_codegen->module_);
+  init_codegen->Close();
   return Status::OK();
 }
 
@@ -168,7 +169,7 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* 
pool,
   : state_(state),
     id_(id),
     profile_(pool, "CodeGen"),
-    mem_tracker_(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker)),
+    mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", 
parent_mem_tracker))),
     optimizations_enabled_(false),
     is_corrupt_(false),
     is_compiled_(false),
@@ -405,13 +406,20 @@ void LlvmCodeGen::SetupJITListeners() {
 }
 
 LlvmCodeGen::~LlvmCodeGen() {
-  if (memory_manager_ != NULL) 
mem_tracker_->Release(memory_manager_->bytes_tracked());
-  if (mem_tracker_->parent() != NULL) mem_tracker_->UnregisterFromParent();
-  mem_tracker_.reset();
+  DCHECK(execution_engine_ == nullptr) << "Must Close() before destruction";
+}
+
+void LlvmCodeGen::Close() {
+  if (memory_manager_ != nullptr) {
+    mem_tracker_->Release(memory_manager_->bytes_tracked());
+    memory_manager_ = nullptr;
+  }
+  if (mem_tracker_ != nullptr) mem_tracker_->Close();
 
   // Execution engine executes callback on event listener, so tear down engine 
first.
   execution_engine_.reset();
   symbol_emitter_.reset();
+  module_ = nullptr;
 }
 
 void LlvmCodeGen::EnableOptimizations(bool enable) {
@@ -1242,6 +1250,7 @@ Status LlvmCodeGen::GetSymbols(const string& file, const 
string& module_id,
   for (const Function& fn: codegen->module_->functions()) {
     if (fn.isMaterializable()) symbols->insert(fn.getName());
   }
+  codegen->Close();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h
index c41553b..805f447 100644
--- a/be/src/codegen/llvm-codegen.h
+++ b/be/src/codegen/llvm-codegen.h
@@ -154,9 +154,12 @@ class LlvmCodeGen {
   static Status CreateImpalaCodegen(RuntimeState* state, MemTracker* 
parent_mem_tracker,
       const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen);
 
-  /// Removes all jit compiled dynamically linked functions from the process.
   ~LlvmCodeGen();
 
+  /// Releases all resources associated with the codegen object. It is invalid 
to call
+  /// any other API methods after calling close.
+  void Close();
+
   RuntimeProfile* runtime_profile() { return &profile_; }
   RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; }
 
@@ -668,8 +671,9 @@ class LlvmCodeGen {
   RuntimeProfile profile_;
 
   /// MemTracker used for tracking memory consumed by codegen. Connected to a 
parent
-  /// MemTracker if one was provided during initialization.
-  boost::scoped_ptr<MemTracker> mem_tracker_;
+  /// MemTracker if one was provided during initialization. Owned by the 
ObjectPool
+  /// provided in the constructor.
+  MemTracker* mem_tracker_;
 
   /// Time spent reading the .ir file from the file system.
   RuntimeProfile::Counter* load_module_timer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f7697c8..84015df 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -192,14 +192,8 @@ void DataSink::Close(RuntimeState* state) {
   ScalarExprEvaluator::Close(output_expr_evals_, state);
   ScalarExpr::Close(output_exprs_);
   if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
-  if (expr_mem_tracker_ != NULL) {
-    expr_mem_tracker_->UnregisterFromParent();
-    expr_mem_tracker_.reset();
-  }
-  if (mem_tracker_ != NULL) {
-    mem_tracker_->UnregisterFromParent();
-    mem_tracker_.reset();
-  }
+  if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
+  if (mem_tracker_ != nullptr) mem_tracker_->Close();
   closed_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 5fe5fb4..9d1383d 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -123,7 +123,6 @@ void ExchangeNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (less_than_.get() != nullptr) less_than_->Close(state);
   if (stream_recvr_ != nullptr) stream_recvr_->Close();
-  stream_recvr_.reset();
   ScalarExpr::Close(ordering_exprs_);
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index de92fbe..56a1f10 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -211,11 +211,15 @@ void ExecNode::Close(RuntimeState* state) {
         &buffer_pool_client_, resource_profile_.min_reservation);
     state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
-  if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) {
-    LOG(WARNING) << "Query " << state->query_id() << " may have leaked 
memory." << endl
-                 << state->instance_mem_tracker()->LogUsage();
-    DCHECK_EQ(mem_tracker()->consumption(), 0)
-        << "Leaked memory." << endl << 
state->instance_mem_tracker()->LogUsage();
+  if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close();
+  if (mem_tracker_ != nullptr) {
+    if (mem_tracker()->consumption() != 0) {
+      LOG(WARNING) << "Query " << state->query_id() << " may have leaked 
memory." << endl
+                   << state->instance_mem_tracker()->LogUsage();
+      DCHECK_EQ(mem_tracker()->consumption(), 0)
+          << "Leaked memory." << endl << 
state->instance_mem_tracker()->LogUsage();
+    }
+    mem_tracker_->Close();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc 
b/be/src/exec/nested-loop-join-node.cc
index d377a22..e80d230 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -138,10 +138,7 @@ void NestedLoopJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   ScalarExprEvaluator::Close(join_conjunct_evals_, state);
   ScalarExpr::Close(join_conjuncts_);
-  if (builder_ != NULL) {
-    builder_->Close(state);
-    builder_.reset();
-  }
+  if (builder_ != NULL) builder_->Close(state);
   build_batches_ = NULL;
   if (matching_build_rows_ != NULL) {
     mem_tracker()->Release(matching_build_rows_->MemUsage());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exprs/expr-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-codegen-test.cc 
b/be/src/exprs/expr-codegen-test.cc
index b4f0378..24b897e 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -348,6 +348,7 @@ TEST_F(ExprCodegenTest, TestInlineConstFnAttrs) {
   EXPECT_EQ(result.is_null, false);
   EXPECT_EQ(result.val8, 100003);
   CheckFnAttr();
+  codegen->Close();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc 
b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index 4197340..a31d7a9 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -284,8 +284,8 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationTwoLevel) {
   ASSERT_EQ(0, child_mem_tracker2.consumption());
   ASSERT_EQ(0, root_mem_tracker.consumption());
   ASSERT_EQ(0, root_.GetUsedReservation());
-  child_mem_tracker1.UnregisterFromParent();
-  child_mem_tracker2.UnregisterFromParent();
+  child_mem_tracker1.Close();
+  child_mem_tracker2.Close();
 }
 
 TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
@@ -364,7 +364,7 @@ TEST_F(ReservationTrackerTest, 
MemTrackerIntegrationMultiLevel) {
 
   for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
     reservations[i].Close();
-    if (i != 0) mem_trackers[i]->UnregisterFromParent();
+    if (i != 0) mem_trackers[i]->Close();
   }
 }
 
@@ -469,12 +469,12 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
   EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation());
 
   child->Close();
-  child_mem_tracker->UnregisterFromParent();
+  child_mem_tracker->Close();
   aunt->Close();
   parent->Close();
-  parent_mem_tracker->UnregisterFromParent();
+  parent_mem_tracker->Close();
   grandparent->Close();
-  grandparent_mem_tracker->UnregisterFromParent();
+  grandparent_mem_tracker->Close();
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index e2d9c4a..2ebbfdc 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -144,7 +144,7 @@ Status Coordinator::Exec() {
   lock_guard<mutex> l(lock_);
 
   query_state_ = 
ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_);
-  filter_mem_tracker_.reset(new MemTracker(
+  filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), 
false));
 
   InitFragmentStats();
@@ -1090,15 +1090,11 @@ void Coordinator::ReleaseResources() {
     lock_guard<SpinLock> l(filter_lock_);
     for (auto& filter : filter_routing_table_) {
       FilterState* state = &filter.second;
-      state->Disable(filter_mem_tracker_.get());
+      state->Disable(filter_mem_tracker_);
     }
   }
   // This may be NULL while executing UDFs.
-  if (filter_mem_tracker_.get() != nullptr) {
-    // TODO: move this elsewhere, this isn't releasing resources (it's 
dismantling
-    // control structures)
-    filter_mem_tracker_->UnregisterFromParent();
-  }
+  if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
   // Need to protect against failed Prepare(), where root_sink() would not be 
set.
   if (coord_sink_ != nullptr) {
     coord_sink_->CloseConsumer();
@@ -1171,7 +1167,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& 
params) {
     }
 
     // Filter is complete, and can be released.
-    state->Disable(filter_mem_tracker_.get());
+    state->Disable(filter_mem_tracker_);
     DCHECK(state->bloom_filter() == nullptr);
   }
 
@@ -1196,15 +1192,15 @@ void Coordinator::FilterState::ApplyUpdate(const 
TUpdateFilterParams& params,
 
   --pending_count_;
   if (params.bloom_filter.always_true) {
-    Disable(coord->filter_mem_tracker_.get());
+    Disable(coord->filter_mem_tracker_);
   } else if (bloom_filter_.get() == nullptr) {
     int64_t heap_space = params.bloom_filter.directory.size();
-    if (!coord->filter_mem_tracker_.get()->TryConsume(heap_space)) {
+    if (!coord->filter_mem_tracker_->TryConsume(heap_space)) {
       VLOG_QUERY << "Not enough memory to allocate filter: "
                  << PrettyPrinter::Print(heap_space, TUnit::BYTES)
                  << " (query: " << coord->query_id() << ")";
       // Disable, as one missing update means a correct filter cannot be 
produced.
-      Disable(coord->filter_mem_tracker_.get());
+      Disable(coord->filter_mem_tracker_);
     } else {
       bloom_filter_.reset(new TBloomFilter());
       // Workaround for fact that parameters are const& for Thrift RPCs - yet 
we want to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0401fa0..03d03df 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -249,8 +249,9 @@ class Coordinator { // NOLINT: The member variables could 
be re-ordered to save
   TRuntimeFilterMode::type filter_mode_;
 
   /// Tracks the memory consumed by runtime filters during aggregation. Child 
of
-  /// the query mem tracker in 'query_state_' and set in Exec().
-  std::unique_ptr<MemTracker> filter_mem_tracker_;
+  /// the query mem tracker in 'query_state_' and set in Exec(). Stored in
+  /// query_state_->obj_pool() so it has same lifetime as other MemTrackers.
+  MemTracker* filter_mem_tracker_ = nullptr;
 
   /// Object pool owned by the coordinator.
   boost::scoped_ptr<ObjectPool> obj_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc 
b/be/src/runtime/data-stream-recvr.cc
index 2acab8b..3828a3e 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -350,8 +350,7 @@ void DataStreamRecvr::Close() {
     sender_queues_[i]->Close();
   }
   merger_.reset();
-  mem_tracker_->UnregisterFromParent();
-  mem_tracker_.reset();
+  mem_tracker_->Close();
 }
 
 DataStreamRecvr::~DataStreamRecvr() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 55042d8..b168cdb 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -362,8 +362,7 @@ DiskIoMgr::~DiskIoMgr() {
     delete disk_queues_[i];
   }
 
-  if (free_buffer_mem_tracker_ != nullptr) 
free_buffer_mem_tracker_->UnregisterFromParent();
-
+  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
   if (cached_read_options_ != nullptr) 
hadoopRzOptionsFree(cached_read_options_);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc 
b/be/src/runtime/initial-reservations.cc
index 4987ec3..88bc920 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -40,11 +40,11 @@ namespace impala {
 InitialReservations::InitialReservations(ObjectPool* obj_pool,
     ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
     int64_t initial_reservation_total_claims)
-  : remaining_initial_reservation_claims_(initial_reservation_total_claims) {
-  MemTracker* initial_reservation_tracker = obj_pool->Add(
-      new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false));
+  : initial_reservation_mem_tracker_(obj_pool->Add(
+      new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
+      remaining_initial_reservation_claims_(initial_reservation_total_claims) {
   initial_reservations_.InitChildTracker(nullptr, query_reservation,
-      initial_reservation_tracker, numeric_limits<int64_t>::max());
+      initial_reservation_mem_tracker_, numeric_limits<int64_t>::max());
 }
 
 Status InitialReservations::Init(
@@ -86,5 +86,6 @@ void InitialReservations::Return(BufferPool::ClientHandle* 
src, int64_t bytes) {
 
 void InitialReservations::ReleaseResources() {
   initial_reservations_.Close();
+  initial_reservation_mem_tracker_->Close();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/initial-reservations.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.h 
b/be/src/runtime/initial-reservations.h
index dfcb114..e2fbb03 100644
--- a/be/src/runtime/initial-reservations.h
+++ b/be/src/runtime/initial-reservations.h
@@ -70,6 +70,8 @@ class InitialReservations {
   // Return() returns reservations to.
   ReservationTracker initial_reservations_;
 
+  MemTracker* const initial_reservation_mem_tracker_;
+
   /// The total bytes of additional reservations that we expect to be claimed.
   /// initial_reservations_->GetReservation() <= 
remaining_initial_reservation_claims_.
   int64_t remaining_initial_reservation_claims_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index ebf5137..525d069 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -28,6 +28,7 @@
 #include "util/debug-util.h"
 #include "util/mem-info.h"
 #include "util/pretty-printer.h"
+#include "util/test-info.h"
 #include "util/uid-util.h"
 
 #include "common/names.h"
@@ -109,8 +110,18 @@ void MemTracker::AddChildTracker(MemTracker* tracker) {
   tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), 
tracker);
 }
 
-void MemTracker::UnregisterFromParent() {
-  DCHECK(parent_ != NULL);
+void MemTracker::Close() {
+  if (closed_) return;
+  if (consumption_metric_ == nullptr) {
+    DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n"
+                                                << GetStackTrace() << "\n"
+                                                << LogUsage("");
+  }
+  closed_ = true;
+}
+
+void MemTracker::CloseAndUnregisterFromParent() {
+  Close();
   lock_guard<SpinLock> l(parent_->child_trackers_lock_);
   parent_->child_trackers_.erase(child_tracker_it_);
   child_tracker_it_ = parent_->child_trackers_.end();
@@ -187,9 +198,10 @@ MemTracker* MemTracker::CreateQueryMemTracker(const 
TUniqueId& id,
 }
 
 MemTracker::~MemTracker() {
-  DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n"
-                                              << GetStackTrace() << "\n"
-                                              << LogUsage("");
+  // We should explicitly close MemTrackers in the context of a daemon 
process. It is ok
+  // if backend tests don't call Close() to make tests more concise.
+  if (TestInfo::is_test()) Close();
+  DCHECK(closed_) << label_;
   delete reservation_counters_.Load();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index a9e265a..8d3d645 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -95,8 +95,19 @@ class MemTracker {
 
   ~MemTracker();
 
-  /// Removes this tracker from parent_->child_trackers_.
-  void UnregisterFromParent();
+  /// Closes this MemTracker. After closing it is invalid to consume memory on 
this
+  /// tracker and the tracker's consumption counter (which may be owned by a
+  /// RuntimeProfile, not this MemTracker) can be safely destroyed. 
MemTrackers without
+  /// consumption metrics in the context of a daemon must always be closed.
+  /// Idempotent: calling multiple times has no effect.
+  void Close();
+
+  /// Closes the MemTracker and deregisters it from its parent. Can be called 
before
+  /// destruction to prevent other threads from getting a reference to the 
MemTracker
+  /// via its parent. Only used to deregister the query-level MemTracker from 
the
+  /// global hierarchy.
+  /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be 
deleted.
+  void CloseAndUnregisterFromParent();
 
   /// Include counters from a ReservationTracker in logs and other diagnostics.
   /// The counters should be owned by the fragment's RuntimeProfile.
@@ -112,6 +123,7 @@ class MemTracker {
 
   /// Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes) {
+    DCHECK(!closed_) << label_;
     if (bytes <= 0) {
       if (bytes < 0) Release(-bytes);
       return;
@@ -121,11 +133,10 @@ class MemTracker {
       RefreshConsumptionFromMetric();
       return;
     }
-    for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin();
-         tracker != all_trackers_.end(); ++tracker) {
-      (*tracker)->consumption_->Add(bytes);
-      if ((*tracker)->consumption_metric_ == NULL) {
-        DCHECK_GE((*tracker)->consumption_->current_value(), 0);
+    for (MemTracker* tracker : all_trackers_) {
+      tracker->consumption_->Add(bytes);
+      if (tracker->consumption_metric_ == NULL) {
+        DCHECK_GE(tracker->consumption_->current_value(), 0);
       }
     }
   }
@@ -136,11 +147,13 @@ class MemTracker {
   /// to update tracking on a particular mem tracker but the consumption 
against
   /// the limit recorded in one of its ancestors already happened.
   void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) {
+    DCHECK(!closed_) << label_;
     DCHECK(consumption_metric_ == NULL) << "Should not be called on root.";
-    for (int i = 0; i < all_trackers_.size(); ++i) {
-      if (all_trackers_[i] == end_tracker) return;
-      DCHECK(!all_trackers_[i]->has_limit());
-      all_trackers_[i]->consumption_->Add(bytes);
+    for (MemTracker* tracker : all_trackers_) {
+      if (tracker == end_tracker) return;
+      DCHECK(!tracker->has_limit());
+      DCHECK(!tracker->closed_) << tracker->label_;
+      tracker->consumption_->Add(bytes);
     }
     DCHECK(false) << "end_tracker is not an ancestor";
   }
@@ -155,6 +168,7 @@ class MemTracker {
   /// Returns true if the try succeeded.
   WARN_UNUSED_RESULT
   bool TryConsume(int64_t bytes) {
+    DCHECK(!closed_) << label_;
     if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
     if (UNLIKELY(bytes <= 0)) return true;
     int i;
@@ -195,6 +209,7 @@ class MemTracker {
 
   /// Decreases consumption of this tracker and its ancestors by 'bytes'.
   void Release(int64_t bytes) {
+    DCHECK(!closed_) << label_;
     if (bytes <= 0) {
       if (bytes < 0) Consume(-bytes);
       return;
@@ -204,30 +219,26 @@ class MemTracker {
       RefreshConsumptionFromMetric();
       return;
     }
-    for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin();
-         tracker != all_trackers_.end(); ++tracker) {
-      (*tracker)->consumption_->Add(-bytes);
+    for (MemTracker* tracker : all_trackers_) {
+      tracker->consumption_->Add(-bytes);
       /// If a UDF calls FunctionContext::TrackAllocation() but allocates less 
than the
       /// reported amount, the subsequent call to FunctionContext::Free() may 
cause the
       /// process mem tracker to go negative until it is synced back to the 
tcmalloc
       /// metric. Don't blow up in this case. (Note that this doesn't affect 
non-process
       /// trackers since we can enforce that the reported memory usage is 
internally
       /// consistent.)
-      if ((*tracker)->consumption_metric_ == NULL) {
-        DCHECK_GE((*tracker)->consumption_->current_value(), 0)
-          << std::endl << (*tracker)->LogUsage();
+      if (tracker->consumption_metric_ == NULL) {
+        DCHECK_GE(tracker->consumption_->current_value(), 0)
+          << std::endl << tracker->LogUsage();
       }
     }
-
-    /// TODO: Release brokered memory?
   }
 
   /// Returns true if a valid limit of this tracker or one of its ancestors is
   /// exceeded.
   bool AnyLimitExceeded() {
-    for (std::vector<MemTracker*>::iterator tracker = limit_trackers_.begin();
-         tracker != limit_trackers_.end(); ++tracker) {
-      if ((*tracker)->LimitExceeded()) return true;
+    for (MemTracker* tracker : limit_trackers_) {
+      if (tracker->LimitExceeded()) return true;
     }
     return false;
   }
@@ -403,6 +414,8 @@ class MemTracker {
   /// if consumption is 0.
   bool log_usage_if_zero_;
 
+  bool closed_ = false;
+
   /// The number of times the GcFunctions were called.
   IntCounter* num_gcs_metric_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 64a8c5a..92a6b7b 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -94,8 +94,13 @@ void QueryState::ReleaseResources() {
   // Release any remaining reservation.
   if (initial_reservations_ != nullptr) 
initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
-  // Avoid dangling reference from the parent of 'query_mem_tracker_'.
-  if (query_mem_tracker_ != nullptr) 
query_mem_tracker_->UnregisterFromParent();
+  if (query_mem_tracker_ != nullptr) {
+    // No more tracked memory should be used by the query after this point, so 
we can
+    // close the MemTracker and remove the whole query subtree of MemTrackers 
from the
+    // global tree. After this point nothing should be touching this query's 
MemTrackers
+    // and they can be safely destroyed.
+    query_mem_tracker_->CloseAndUnregisterFromParent();
+  }
   if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
   released_resources_ = true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc 
b/be/src/runtime/runtime-filter-bank.cc
index 2a47cac..2ae65c8 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -228,5 +228,5 @@ void RuntimeFilterBank::Close() {
   closed_ = true;
   obj_pool_.Clear();
   filter_mem_tracker_->Release(memory_allocated_->value());
-  filter_mem_tracker_->UnregisterFromParent();
+  filter_mem_tracker_->Close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index ba8e75d..b986e8d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -70,9 +70,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const 
TPlanFragmentCtx& frag
         query_state->query_ctx().utc_timestamp_string))),
     exec_env_(exec_env),
     profile_(obj_pool(), "Fragment " + 
PrintId(instance_ctx.fragment_instance_id)),
-    instance_buffer_reservation_(new ReservationTracker),
-    is_cancelled_(false),
-    root_node_id_(-1) {
+    instance_buffer_reservation_(new ReservationTracker) {
   Init();
 }
 
@@ -85,10 +83,7 @@ RuntimeState::RuntimeState(
     now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))),
     utc_timestamp_(new 
TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     exec_env_(exec_env),
-    profile_(obj_pool(), "<unnamed>"),
-    instance_buffer_reservation_(nullptr),
-    is_cancelled_(false),
-    root_node_id_(-1) {
+    profile_(obj_pool(), "<unnamed>") {
   if (query_ctx().request_pool.empty()) {
     const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
   }
@@ -97,7 +92,7 @@ RuntimeState::RuntimeState(
 }
 
 RuntimeState::~RuntimeState() {
-  DCHECK(instance_mem_tracker_ == nullptr) << "Must call ReleaseResources()";
+  DCHECK(released_resources_) << "Must call ReleaseResources()";
 }
 
 void RuntimeState::Init() {
@@ -233,33 +228,30 @@ void RuntimeState::UnregisterReaderContexts() {
 }
 
 void RuntimeState::ReleaseResources() {
-  // TODO: IMPALA-5587: control structures (e.g. MemTrackers) shouldn't be 
destroyed here.
+  DCHECK(!released_resources_);
   UnregisterReaderContexts();
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
   }
-  codegen_.reset(); // Release any memory associated with codegen.
+  // Release any memory associated with codegen.
+  if (codegen_ != nullptr) codegen_->Close();
 
   // Release the reservation, which should be unused at the point.
   if (instance_buffer_reservation_ != nullptr) 
instance_buffer_reservation_->Close();
 
-  // 'query_mem_tracker()' must be valid as long as 'instance_mem_tracker_' is 
so
-  // delete 'instance_mem_tracker_' first.
-  // LogUsage() walks the MemTracker tree top-down when the memory limit is 
exceeded, so
-  // break the link between 'instance_mem_tracker_' and its parent before
-  // 'instance_mem_tracker_' and its children are destroyed.
-  instance_mem_tracker_->UnregisterFromParent();
+  // No more memory should be tracked for this instance at this point.
   if (instance_mem_tracker_->consumption() != 0) {
     LOG(WARNING) << "Query " << query_id() << " may have leaked memory." << 
endl
                  << instance_mem_tracker_->LogUsage();
   }
-  instance_mem_tracker_.reset();
+  instance_mem_tracker_->Close();
 
   if (local_query_state_.get() != nullptr) {
     // if we created this QueryState, we must call ReleaseResources()
     local_query_state_->ReleaseResources();
   }
+  released_resources_ = true;
 }
 
 const std::string& RuntimeState::GetEffectiveUser() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 12e7d8c..665de7b 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -302,7 +302,7 @@ class RuntimeState {
   /// Helper to call QueryState::StartSpilling().
   Status StartSpilling(MemTracker* mem_tracker);
 
-  /// Release resources and prepare this object for destruction.
+  /// Release resources and prepare this object for destruction. Can only be 
called once.
   void ReleaseResources();
 
  private:
@@ -376,7 +376,10 @@ class RuntimeState {
   boost::scoped_ptr<ReservationTracker> instance_buffer_reservation_;
 
   /// if true, execution should stop with a CANCELLED status
-  bool is_cancelled_;
+  bool is_cancelled_ = false;
+
+  /// if true, ReleaseResources() was called.
+  bool released_resources_ = false;
 
   /// Non-OK if an error has occurred and query execution should abort. Used 
only for
   /// asynchronously reporting such errors (e.g., when a UDF reports an 
error), so this
@@ -397,7 +400,7 @@ class RuntimeState {
   /// 2) It is different between different fragments, so we do not run into 
hash
   /// collisions after data partitioning (across fragments). See IMPALA-219 
for more
   /// details.
-  PlanNodeId root_node_id_;
+  PlanNodeId root_node_id_ = -1;
 
   /// Manages runtime filters that are either produced or consumed (or both!) 
by plan
   /// nodes that share this runtime state.

Reply via email to