Repository: incubator-impala Updated Branches: refs/heads/master f14e68c72 -> 4456ead84
IMPALA-5715: (mitigation only) defer destruction of MemTrackers One potential candidate for the bad MemTracker IMPALA-5715 is one owned by a join build sink. I haven't found a clear root cause, but we can reduce the probability of bugs like this by deferring teardown of the MemTrackers. This patch defers destruction of the fragment instance, ExecNode, DataSink and Codegen MemTrackers until query teardown. Instead MemTracker::Close() is called at the place where the MemTracker would have been destroyed to check that all memory is released and enforce that no more memory is consumed. The entire query MemTracker subtree is then disconnected in whole from the global tree in QueryState::ReleaseResources(), instead of the MemTrackers being incrementally disconnected bottom-up. In cases where the MemTracker is owned by another object, this required deferring teardown of the owner until query teardown. E.g. for LlvmCodeGen I added a Close() method to release resources and deferred calling the destructor. We want to make this change anyway - see IMPALA-5587. Testing: Ran a core ASAN build. Change-Id: I205abb0076d1ffd08cb93c0f1671c8b81e7fba0f Reviewed-on: http://gerrit.cloudera.org:8080/7492 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/27dcc768 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/27dcc768 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/27dcc768 Branch: refs/heads/master Commit: 27dcc768d20e11f29a2d24059214dfc2bacef229 Parents: f14e68c Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Mon Jul 24 09:19:47 2017 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Tue Aug 8 19:43:36 2017 +0000 ---------------------------------------------------------------------- be/src/codegen/llvm-codegen-test.cc | 11 ++++ be/src/codegen/llvm-codegen.cc | 17 ++++-- be/src/codegen/llvm-codegen.h | 10 ++-- be/src/exec/data-sink.cc | 10 +--- be/src/exec/exchange-node.cc | 1 - be/src/exec/exec-node.cc | 14 +++-- be/src/exec/nested-loop-join-node.cc | 5 +- be/src/exprs/expr-codegen-test.cc | 1 + .../bufferpool/reservation-tracker-test.cc | 12 ++--- be/src/runtime/coordinator.cc | 18 +++---- be/src/runtime/coordinator.h | 5 +- be/src/runtime/data-stream-recvr.cc | 3 +- be/src/runtime/disk-io-mgr.cc | 3 +- be/src/runtime/initial-reservations.cc | 9 ++-- be/src/runtime/initial-reservations.h | 2 + be/src/runtime/mem-tracker.cc | 22 ++++++-- be/src/runtime/mem-tracker.h | 57 ++++++++++++-------- be/src/runtime/query-state.cc | 9 +++- be/src/runtime/runtime-filter-bank.cc | 2 +- be/src/runtime/runtime-state.cc | 26 ++++----- be/src/runtime/runtime-state.h | 9 ++-- 21 files changed, 144 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen-test.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc index 719a84e..61b942f 100644 --- a/be/src/codegen/llvm-codegen-test.cc +++ b/be/src/codegen/llvm-codegen-test.cc @@ -29,6 +29,7 @@ #include "util/cpu-info.h" #include "util/hash-util.h" #include "util/path-builder.h" +#include "util/scope-exit-trigger.h" #include "util/test-info.h" #include "common/names.h" @@ -65,6 +66,10 @@ class LlvmCodeGenTest : public testing:: Test { ASSERT_OK(object1.Init(unique_ptr<Module>(new Module("Test", object1.context())))); ASSERT_OK(object2.Init(unique_ptr<Module>(new Module("Test", object2.context())))); ASSERT_OK(object3.Init(unique_ptr<Module>(new Module("Test", object3.context())))); + + object1.Close(); + object2.Close(); + object3.Close(); } } @@ -113,6 +118,7 @@ TEST_F(LlvmCodeGenTest, BadIRFile) { string module_file = "NonExistentFile.ir"; scoped_ptr<LlvmCodeGen> codegen; EXPECT_FALSE(CreateFromFile(module_file.c_str(), &codegen).ok()); + codegen->Close(); } // IR for the generated linner loop @@ -236,6 +242,7 @@ TEST_F(LlvmCodeGenTest, ReplaceFnCall) { TestLoopFn new_loop_fn2 = reinterpret_cast<TestLoopFn>(new_loop2); new_loop_fn2(5); EXPECT_EQ(0, jitted_counter); + codegen->Close(); } // Test function for c++/ir interop for strings. Function will do: @@ -325,6 +332,7 @@ TEST_F(LlvmCodeGenTest, StringValue) { int32_t* bytes = reinterpret_cast<int32_t*>(&str_val); EXPECT_EQ(1, bytes[2]); // str_val.len EXPECT_EQ(0, bytes[3]); // padding + codegen->Close(); } // Test calling memcpy intrinsic @@ -362,6 +370,7 @@ TEST_F(LlvmCodeGenTest, MemcpyTest) { test_fn(dst, src, 4); EXPECT_EQ(memcmp(src, dst, 4), 0); + codegen->Close(); } // Test codegen for hash @@ -377,6 +386,8 @@ TEST_F(LlvmCodeGenTest, HashTest) { scoped_ptr<LlvmCodeGen> codegen; ASSERT_OK(LlvmCodeGen::CreateImpalaCodegen(runtime_state_, NULL, "test", &codegen)); ASSERT_TRUE(codegen.get() != NULL); + const auto close_codegen = + MakeScopeExitTrigger([&codegen]() { codegen->Close(); }); Value* llvm_data1 = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), const_cast<char*>(data1)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index 13501fe..0e2235c 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -160,6 +160,7 @@ Status LlvmCodeGen::InitializeLlvm(bool load_backend) { // Initialize the global shared call graph. shared_call_graph_.Init(init_codegen->module_); + init_codegen->Close(); return Status::OK(); } @@ -168,7 +169,7 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool, : state_(state), id_(id), profile_(pool, "CodeGen"), - mem_tracker_(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker)), + mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker))), optimizations_enabled_(false), is_corrupt_(false), is_compiled_(false), @@ -405,13 +406,20 @@ void LlvmCodeGen::SetupJITListeners() { } LlvmCodeGen::~LlvmCodeGen() { - if (memory_manager_ != NULL) mem_tracker_->Release(memory_manager_->bytes_tracked()); - if (mem_tracker_->parent() != NULL) mem_tracker_->UnregisterFromParent(); - mem_tracker_.reset(); + DCHECK(execution_engine_ == nullptr) << "Must Close() before destruction"; +} + +void LlvmCodeGen::Close() { + if (memory_manager_ != nullptr) { + mem_tracker_->Release(memory_manager_->bytes_tracked()); + memory_manager_ = nullptr; + } + if (mem_tracker_ != nullptr) mem_tracker_->Close(); // Execution engine executes callback on event listener, so tear down engine first. execution_engine_.reset(); symbol_emitter_.reset(); + module_ = nullptr; } void LlvmCodeGen::EnableOptimizations(bool enable) { @@ -1242,6 +1250,7 @@ Status LlvmCodeGen::GetSymbols(const string& file, const string& module_id, for (const Function& fn: codegen->module_->functions()) { if (fn.isMaterializable()) symbols->insert(fn.getName()); } + codegen->Close(); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/codegen/llvm-codegen.h ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h index c41553b..805f447 100644 --- a/be/src/codegen/llvm-codegen.h +++ b/be/src/codegen/llvm-codegen.h @@ -154,9 +154,12 @@ class LlvmCodeGen { static Status CreateImpalaCodegen(RuntimeState* state, MemTracker* parent_mem_tracker, const std::string& id, boost::scoped_ptr<LlvmCodeGen>* codegen); - /// Removes all jit compiled dynamically linked functions from the process. ~LlvmCodeGen(); + /// Releases all resources associated with the codegen object. It is invalid to call + /// any other API methods after calling close. + void Close(); + RuntimeProfile* runtime_profile() { return &profile_; } RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; } @@ -668,8 +671,9 @@ class LlvmCodeGen { RuntimeProfile profile_; /// MemTracker used for tracking memory consumed by codegen. Connected to a parent - /// MemTracker if one was provided during initialization. - boost::scoped_ptr<MemTracker> mem_tracker_; + /// MemTracker if one was provided during initialization. Owned by the ObjectPool + /// provided in the constructor. + MemTracker* mem_tracker_; /// Time spent reading the .ir file from the file system. RuntimeProfile::Counter* load_module_timer_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index f7697c8..84015df 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -192,14 +192,8 @@ void DataSink::Close(RuntimeState* state) { ScalarExprEvaluator::Close(output_expr_evals_, state); ScalarExpr::Close(output_exprs_); if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll(); - if (expr_mem_tracker_ != NULL) { - expr_mem_tracker_->UnregisterFromParent(); - expr_mem_tracker_.reset(); - } - if (mem_tracker_ != NULL) { - mem_tracker_->UnregisterFromParent(); - mem_tracker_.reset(); - } + if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close(); + if (mem_tracker_ != nullptr) mem_tracker_->Close(); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/exchange-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index 5fe5fb4..9d1383d 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -123,7 +123,6 @@ void ExchangeNode::Close(RuntimeState* state) { if (is_closed()) return; if (less_than_.get() != nullptr) less_than_->Close(state); if (stream_recvr_ != nullptr) stream_recvr_->Close(); - stream_recvr_.reset(); ScalarExpr::Close(ordering_exprs_); ExecNode::Close(state); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index de92fbe..56a1f10 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -211,11 +211,15 @@ void ExecNode::Close(RuntimeState* state) { &buffer_pool_client_, resource_profile_.min_reservation); state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_); } - if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) { - LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl - << state->instance_mem_tracker()->LogUsage(); - DCHECK_EQ(mem_tracker()->consumption(), 0) - << "Leaked memory." << endl << state->instance_mem_tracker()->LogUsage(); + if (expr_mem_tracker_ != nullptr) expr_mem_tracker_->Close(); + if (mem_tracker_ != nullptr) { + if (mem_tracker()->consumption() != 0) { + LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl + << state->instance_mem_tracker()->LogUsage(); + DCHECK_EQ(mem_tracker()->consumption(), 0) + << "Leaked memory." << endl << state->instance_mem_tracker()->LogUsage(); + } + mem_tracker_->Close(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exec/nested-loop-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index d377a22..e80d230 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -138,10 +138,7 @@ void NestedLoopJoinNode::Close(RuntimeState* state) { if (is_closed()) return; ScalarExprEvaluator::Close(join_conjunct_evals_, state); ScalarExpr::Close(join_conjuncts_); - if (builder_ != NULL) { - builder_->Close(state); - builder_.reset(); - } + if (builder_ != NULL) builder_->Close(state); build_batches_ = NULL; if (matching_build_rows_ != NULL) { mem_tracker()->Release(matching_build_rows_->MemUsage()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/exprs/expr-codegen-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc index b4f0378..24b897e 100644 --- a/be/src/exprs/expr-codegen-test.cc +++ b/be/src/exprs/expr-codegen-test.cc @@ -348,6 +348,7 @@ TEST_F(ExprCodegenTest, TestInlineConstFnAttrs) { EXPECT_EQ(result.is_null, false); EXPECT_EQ(result.val8, 100003); CheckFnAttr(); + codegen->Close(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index 4197340..a31d7a9 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -284,8 +284,8 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) { ASSERT_EQ(0, child_mem_tracker2.consumption()); ASSERT_EQ(0, root_mem_tracker.consumption()); ASSERT_EQ(0, root_.GetUsedReservation()); - child_mem_tracker1.UnregisterFromParent(); - child_mem_tracker2.UnregisterFromParent(); + child_mem_tracker1.Close(); + child_mem_tracker2.Close(); } TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { @@ -364,7 +364,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) { for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) { reservations[i].Close(); - if (i != 0) mem_trackers[i]->UnregisterFromParent(); + if (i != 0) mem_trackers[i]->Close(); } } @@ -469,12 +469,12 @@ TEST_F(ReservationTrackerTest, TransferReservation) { EXPECT_EQ(GRANDPARENT_LIMIT, root_.GetReservation()); child->Close(); - child_mem_tracker->UnregisterFromParent(); + child_mem_tracker->Close(); aunt->Close(); parent->Close(); - parent_mem_tracker->UnregisterFromParent(); + parent_mem_tracker->Close(); grandparent->Close(); - grandparent_mem_tracker->UnregisterFromParent(); + grandparent_mem_tracker->Close(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index e2d9c4a..2ebbfdc 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -144,7 +144,7 @@ Status Coordinator::Exec() { lock_guard<mutex> l(lock_); query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_); - filter_mem_tracker_.reset(new MemTracker( + filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker( -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false)); InitFragmentStats(); @@ -1090,15 +1090,11 @@ void Coordinator::ReleaseResources() { lock_guard<SpinLock> l(filter_lock_); for (auto& filter : filter_routing_table_) { FilterState* state = &filter.second; - state->Disable(filter_mem_tracker_.get()); + state->Disable(filter_mem_tracker_); } } // This may be NULL while executing UDFs. - if (filter_mem_tracker_.get() != nullptr) { - // TODO: move this elsewhere, this isn't releasing resources (it's dismantling - // control structures) - filter_mem_tracker_->UnregisterFromParent(); - } + if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close(); // Need to protect against failed Prepare(), where root_sink() would not be set. if (coord_sink_ != nullptr) { coord_sink_->CloseConsumer(); @@ -1171,7 +1167,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) { } // Filter is complete, and can be released. - state->Disable(filter_mem_tracker_.get()); + state->Disable(filter_mem_tracker_); DCHECK(state->bloom_filter() == nullptr); } @@ -1196,15 +1192,15 @@ void Coordinator::FilterState::ApplyUpdate(const TUpdateFilterParams& params, --pending_count_; if (params.bloom_filter.always_true) { - Disable(coord->filter_mem_tracker_.get()); + Disable(coord->filter_mem_tracker_); } else if (bloom_filter_.get() == nullptr) { int64_t heap_space = params.bloom_filter.directory.size(); - if (!coord->filter_mem_tracker_.get()->TryConsume(heap_space)) { + if (!coord->filter_mem_tracker_->TryConsume(heap_space)) { VLOG_QUERY << "Not enough memory to allocate filter: " << PrettyPrinter::Print(heap_space, TUnit::BYTES) << " (query: " << coord->query_id() << ")"; // Disable, as one missing update means a correct filter cannot be produced. - Disable(coord->filter_mem_tracker_.get()); + Disable(coord->filter_mem_tracker_); } else { bloom_filter_.reset(new TBloomFilter()); // Workaround for fact that parameters are const& for Thrift RPCs - yet we want to http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 0401fa0..03d03df 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -249,8 +249,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save TRuntimeFilterMode::type filter_mode_; /// Tracks the memory consumed by runtime filters during aggregation. Child of - /// the query mem tracker in 'query_state_' and set in Exec(). - std::unique_ptr<MemTracker> filter_mem_tracker_; + /// the query mem tracker in 'query_state_' and set in Exec(). Stored in + /// query_state_->obj_pool() so it has same lifetime as other MemTrackers. + MemTracker* filter_mem_tracker_ = nullptr; /// Object pool owned by the coordinator. boost::scoped_ptr<ObjectPool> obj_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 2acab8b..3828a3e 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -350,8 +350,7 @@ void DataStreamRecvr::Close() { sender_queues_[i]->Close(); } merger_.reset(); - mem_tracker_->UnregisterFromParent(); - mem_tracker_.reset(); + mem_tracker_->Close(); } DataStreamRecvr::~DataStreamRecvr() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 55042d8..b168cdb 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -362,8 +362,7 @@ DiskIoMgr::~DiskIoMgr() { delete disk_queues_[i]; } - if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->UnregisterFromParent(); - + if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close(); if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/initial-reservations.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc index 4987ec3..88bc920 100644 --- a/be/src/runtime/initial-reservations.cc +++ b/be/src/runtime/initial-reservations.cc @@ -40,11 +40,11 @@ namespace impala { InitialReservations::InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation, MemTracker* query_mem_tracker, int64_t initial_reservation_total_claims) - : remaining_initial_reservation_claims_(initial_reservation_total_claims) { - MemTracker* initial_reservation_tracker = obj_pool->Add( - new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false)); + : initial_reservation_mem_tracker_(obj_pool->Add( + new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))), + remaining_initial_reservation_claims_(initial_reservation_total_claims) { initial_reservations_.InitChildTracker(nullptr, query_reservation, - initial_reservation_tracker, numeric_limits<int64_t>::max()); + initial_reservation_mem_tracker_, numeric_limits<int64_t>::max()); } Status InitialReservations::Init( @@ -86,5 +86,6 @@ void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes) { void InitialReservations::ReleaseResources() { initial_reservations_.Close(); + initial_reservation_mem_tracker_->Close(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/initial-reservations.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/initial-reservations.h b/be/src/runtime/initial-reservations.h index dfcb114..e2fbb03 100644 --- a/be/src/runtime/initial-reservations.h +++ b/be/src/runtime/initial-reservations.h @@ -70,6 +70,8 @@ class InitialReservations { // Return() returns reservations to. ReservationTracker initial_reservations_; + MemTracker* const initial_reservation_mem_tracker_; + /// The total bytes of additional reservations that we expect to be claimed. /// initial_reservations_->GetReservation() <= remaining_initial_reservation_claims_. int64_t remaining_initial_reservation_claims_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index ebf5137..525d069 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -28,6 +28,7 @@ #include "util/debug-util.h" #include "util/mem-info.h" #include "util/pretty-printer.h" +#include "util/test-info.h" #include "util/uid-util.h" #include "common/names.h" @@ -109,8 +110,18 @@ void MemTracker::AddChildTracker(MemTracker* tracker) { tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); } -void MemTracker::UnregisterFromParent() { - DCHECK(parent_ != NULL); +void MemTracker::Close() { + if (closed_) return; + if (consumption_metric_ == nullptr) { + DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n" + << GetStackTrace() << "\n" + << LogUsage(""); + } + closed_ = true; +} + +void MemTracker::CloseAndUnregisterFromParent() { + Close(); lock_guard<SpinLock> l(parent_->child_trackers_lock_); parent_->child_trackers_.erase(child_tracker_it_); child_tracker_it_ = parent_->child_trackers_.end(); @@ -187,9 +198,10 @@ MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id, } MemTracker::~MemTracker() { - DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n" - << GetStackTrace() << "\n" - << LogUsage(""); + // We should explicitly close MemTrackers in the context of a daemon process. It is ok + // if backend tests don't call Close() to make tests more concise. + if (TestInfo::is_test()) Close(); + DCHECK(closed_) << label_; delete reservation_counters_.Load(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index a9e265a..8d3d645 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -95,8 +95,19 @@ class MemTracker { ~MemTracker(); - /// Removes this tracker from parent_->child_trackers_. - void UnregisterFromParent(); + /// Closes this MemTracker. After closing it is invalid to consume memory on this + /// tracker and the tracker's consumption counter (which may be owned by a + /// RuntimeProfile, not this MemTracker) can be safely destroyed. MemTrackers without + /// consumption metrics in the context of a daemon must always be closed. + /// Idempotent: calling multiple times has no effect. + void Close(); + + /// Closes the MemTracker and deregisters it from its parent. Can be called before + /// destruction to prevent other threads from getting a reference to the MemTracker + /// via its parent. Only used to deregister the query-level MemTracker from the + /// global hierarchy. + /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be deleted. + void CloseAndUnregisterFromParent(); /// Include counters from a ReservationTracker in logs and other diagnostics. /// The counters should be owned by the fragment's RuntimeProfile. @@ -112,6 +123,7 @@ class MemTracker { /// Increases consumption of this tracker and its ancestors by 'bytes'. void Consume(int64_t bytes) { + DCHECK(!closed_) << label_; if (bytes <= 0) { if (bytes < 0) Release(-bytes); return; @@ -121,11 +133,10 @@ class MemTracker { RefreshConsumptionFromMetric(); return; } - for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin(); - tracker != all_trackers_.end(); ++tracker) { - (*tracker)->consumption_->Add(bytes); - if ((*tracker)->consumption_metric_ == NULL) { - DCHECK_GE((*tracker)->consumption_->current_value(), 0); + for (MemTracker* tracker : all_trackers_) { + tracker->consumption_->Add(bytes); + if (tracker->consumption_metric_ == NULL) { + DCHECK_GE(tracker->consumption_->current_value(), 0); } } } @@ -136,11 +147,13 @@ class MemTracker { /// to update tracking on a particular mem tracker but the consumption against /// the limit recorded in one of its ancestors already happened. void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) { + DCHECK(!closed_) << label_; DCHECK(consumption_metric_ == NULL) << "Should not be called on root."; - for (int i = 0; i < all_trackers_.size(); ++i) { - if (all_trackers_[i] == end_tracker) return; - DCHECK(!all_trackers_[i]->has_limit()); - all_trackers_[i]->consumption_->Add(bytes); + for (MemTracker* tracker : all_trackers_) { + if (tracker == end_tracker) return; + DCHECK(!tracker->has_limit()); + DCHECK(!tracker->closed_) << tracker->label_; + tracker->consumption_->Add(bytes); } DCHECK(false) << "end_tracker is not an ancestor"; } @@ -155,6 +168,7 @@ class MemTracker { /// Returns true if the try succeeded. WARN_UNUSED_RESULT bool TryConsume(int64_t bytes) { + DCHECK(!closed_) << label_; if (consumption_metric_ != NULL) RefreshConsumptionFromMetric(); if (UNLIKELY(bytes <= 0)) return true; int i; @@ -195,6 +209,7 @@ class MemTracker { /// Decreases consumption of this tracker and its ancestors by 'bytes'. void Release(int64_t bytes) { + DCHECK(!closed_) << label_; if (bytes <= 0) { if (bytes < 0) Consume(-bytes); return; @@ -204,30 +219,26 @@ class MemTracker { RefreshConsumptionFromMetric(); return; } - for (std::vector<MemTracker*>::iterator tracker = all_trackers_.begin(); - tracker != all_trackers_.end(); ++tracker) { - (*tracker)->consumption_->Add(-bytes); + for (MemTracker* tracker : all_trackers_) { + tracker->consumption_->Add(-bytes); /// If a UDF calls FunctionContext::TrackAllocation() but allocates less than the /// reported amount, the subsequent call to FunctionContext::Free() may cause the /// process mem tracker to go negative until it is synced back to the tcmalloc /// metric. Don't blow up in this case. (Note that this doesn't affect non-process /// trackers since we can enforce that the reported memory usage is internally /// consistent.) - if ((*tracker)->consumption_metric_ == NULL) { - DCHECK_GE((*tracker)->consumption_->current_value(), 0) - << std::endl << (*tracker)->LogUsage(); + if (tracker->consumption_metric_ == NULL) { + DCHECK_GE(tracker->consumption_->current_value(), 0) + << std::endl << tracker->LogUsage(); } } - - /// TODO: Release brokered memory? } /// Returns true if a valid limit of this tracker or one of its ancestors is /// exceeded. bool AnyLimitExceeded() { - for (std::vector<MemTracker*>::iterator tracker = limit_trackers_.begin(); - tracker != limit_trackers_.end(); ++tracker) { - if ((*tracker)->LimitExceeded()) return true; + for (MemTracker* tracker : limit_trackers_) { + if (tracker->LimitExceeded()) return true; } return false; } @@ -403,6 +414,8 @@ class MemTracker { /// if consumption is 0. bool log_usage_if_zero_; + bool closed_ = false; + /// The number of times the GcFunctions were called. IntCounter* num_gcs_metric_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 64a8c5a..92a6b7b 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -94,8 +94,13 @@ void QueryState::ReleaseResources() { // Release any remaining reservation. if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources(); if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); - // Avoid dangling reference from the parent of 'query_mem_tracker_'. - if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent(); + if (query_mem_tracker_ != nullptr) { + // No more tracked memory should be used by the query after this point, so we can + // close the MemTracker and remove the whole query subtree of MemTrackers from the + // global tree. After this point nothing should be touching this query's MemTrackers + // and they can be safely destroyed. + query_mem_tracker_->CloseAndUnregisterFromParent(); + } if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources(); released_resources_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 2a47cac..2ae65c8 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -228,5 +228,5 @@ void RuntimeFilterBank::Close() { closed_ = true; obj_pool_.Clear(); filter_mem_tracker_->Release(memory_allocated_->value()); - filter_mem_tracker_->UnregisterFromParent(); + filter_mem_tracker_->Close(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index ba8e75d..b986e8d 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -70,9 +70,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag query_state->query_ctx().utc_timestamp_string))), exec_env_(exec_env), profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), - instance_buffer_reservation_(new ReservationTracker), - is_cancelled_(false), - root_node_id_(-1) { + instance_buffer_reservation_(new ReservationTracker) { Init(); } @@ -85,10 +83,7 @@ RuntimeState::RuntimeState( now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))), utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))), exec_env_(exec_env), - profile_(obj_pool(), "<unnamed>"), - instance_buffer_reservation_(nullptr), - is_cancelled_(false), - root_node_id_(-1) { + profile_(obj_pool(), "<unnamed>") { if (query_ctx().request_pool.empty()) { const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool"; } @@ -97,7 +92,7 @@ RuntimeState::RuntimeState( } RuntimeState::~RuntimeState() { - DCHECK(instance_mem_tracker_ == nullptr) << "Must call ReleaseResources()"; + DCHECK(released_resources_) << "Must call ReleaseResources()"; } void RuntimeState::Init() { @@ -233,33 +228,30 @@ void RuntimeState::UnregisterReaderContexts() { } void RuntimeState::ReleaseResources() { - // TODO: IMPALA-5587: control structures (e.g. MemTrackers) shouldn't be destroyed here. + DCHECK(!released_resources_); UnregisterReaderContexts(); if (filter_bank_ != nullptr) filter_bank_->Close(); if (resource_pool_ != nullptr) { exec_env_->thread_mgr()->UnregisterPool(resource_pool_); } - codegen_.reset(); // Release any memory associated with codegen. + // Release any memory associated with codegen. + if (codegen_ != nullptr) codegen_->Close(); // Release the reservation, which should be unused at the point. if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close(); - // 'query_mem_tracker()' must be valid as long as 'instance_mem_tracker_' is so - // delete 'instance_mem_tracker_' first. - // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so - // break the link between 'instance_mem_tracker_' and its parent before - // 'instance_mem_tracker_' and its children are destroyed. - instance_mem_tracker_->UnregisterFromParent(); + // No more memory should be tracked for this instance at this point. if (instance_mem_tracker_->consumption() != 0) { LOG(WARNING) << "Query " << query_id() << " may have leaked memory." << endl << instance_mem_tracker_->LogUsage(); } - instance_mem_tracker_.reset(); + instance_mem_tracker_->Close(); if (local_query_state_.get() != nullptr) { // if we created this QueryState, we must call ReleaseResources() local_query_state_->ReleaseResources(); } + released_resources_ = true; } const std::string& RuntimeState::GetEffectiveUser() const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/27dcc768/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 12e7d8c..665de7b 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -302,7 +302,7 @@ class RuntimeState { /// Helper to call QueryState::StartSpilling(). Status StartSpilling(MemTracker* mem_tracker); - /// Release resources and prepare this object for destruction. + /// Release resources and prepare this object for destruction. Can only be called once. void ReleaseResources(); private: @@ -376,7 +376,10 @@ class RuntimeState { boost::scoped_ptr<ReservationTracker> instance_buffer_reservation_; /// if true, execution should stop with a CANCELLED status - bool is_cancelled_; + bool is_cancelled_ = false; + + /// if true, ReleaseResources() was called. + bool released_resources_ = false; /// Non-OK if an error has occurred and query execution should abort. Used only for /// asynchronously reporting such errors (e.g., when a UDF reports an error), so this @@ -397,7 +400,7 @@ class RuntimeState { /// 2) It is different between different fragments, so we do not run into hash /// collisions after data partitioning (across fragments). See IMPALA-219 for more /// details. - PlanNodeId root_node_id_; + PlanNodeId root_node_id_ = -1; /// Manages runtime filters that are either produced or consumed (or both!) by plan /// nodes that share this runtime state.