Repository: incubator-impala Updated Branches: refs/heads/master 23100102c -> d0152d424
IMPALA-5130: fix race in MemTracker::EnableReservationReporting() MemTracker::LogUsage() and MemTracker::EnableReservationReporting() could race, with LogUsage() seeing a partially-constructed 'reservation_counters_' value and crashing. This patch fixes that issue by atomically swapping in 'reservation_counters_' so that no threads see a partially-constructed value. While we're here, swap boost::mutex for the higher-performance SpinLock for 'child_trackers_lock_'. Change-Id: I2434c952d97c46040e29fca2327c244dd30599d2 Reviewed-on: http://gerrit.cloudera.org:8080/6502 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d0152d42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d0152d42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d0152d42 Branch: refs/heads/master Commit: d0152d424ad1aa21a91122ca874a81793d497720 Parents: 2310010 Author: Tim Armstrong <[email protected]> Authored: Tue Mar 28 13:54:34 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Mar 30 00:17:24 2017 +0000 ---------------------------------------------------------------------- be/src/common/atomic.h | 17 +++++++++++++++++ be/src/runtime/mem-tracker.cc | 23 +++++++++++++---------- be/src/runtime/mem-tracker.h | 14 ++++++++------ 3 files changed, 38 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/common/atomic.h ---------------------------------------------------------------------- diff --git a/be/src/common/atomic.h b/be/src/common/atomic.h index 784be11..791f3b2 100644 --- a/be/src/common/atomic.h +++ b/be/src/common/atomic.h @@ -116,6 +116,23 @@ class AtomicInt { typedef internal::AtomicInt<int32_t> AtomicInt32; typedef internal::AtomicInt<int64_t> AtomicInt64; +/// Atomic pointer. Operations have the same semantics as AtomicInt. +template<typename T> +class AtomicPtr { + public: + AtomicPtr(T* initial = nullptr) : ptr_(reinterpret_cast<intptr_t>(initial)) {} + + /// Atomic load with "acquire" memory-ordering semantic. + ALWAYS_INLINE T* Load() const { return reinterpret_cast<T*>(ptr_.Load()); } + + /// Atomic store with "release" memory-ordering semantic. + ALWAYS_INLINE void Store(T* val) { ptr_.Store(reinterpret_cast<intptr_t>(val)); } + + private: + internal::AtomicInt<intptr_t> ptr_; +}; + + } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index f4073b5..a48ede9 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -108,20 +108,20 @@ void MemTracker::Init() { } void MemTracker::AddChildTracker(MemTracker* tracker) { - lock_guard<mutex> l(child_trackers_lock_); + lock_guard<SpinLock> l(child_trackers_lock_); tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker); } void MemTracker::UnregisterFromParent() { DCHECK(parent_ != NULL); - lock_guard<mutex> l(parent_->child_trackers_lock_); + lock_guard<SpinLock> l(parent_->child_trackers_lock_); parent_->child_trackers_.erase(child_tracker_it_); child_tracker_it_ = parent_->child_trackers_.end(); } void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) { - reservation_counters_.reset(new ReservationTrackerCounters); - *reservation_counters_ = counters; + ReservationTrackerCounters* new_counters = new ReservationTrackerCounters(counters); + reservation_counters_.Store(new_counters); } int64_t MemTracker::GetPoolMemReserved() const { @@ -130,7 +130,7 @@ int64_t MemTracker::GetPoolMemReserved() const { DCHECK_EQ(limit_, -1) << LogUsage(""); int64_t mem_reserved = 0L; - lock_guard<mutex> l(child_trackers_lock_); + lock_guard<SpinLock> l(child_trackers_lock_); for (list<MemTracker*>::const_iterator it = child_trackers_.begin(); it != child_trackers_.end(); ++it) { int64_t child_limit = (*it)->limit(); @@ -194,6 +194,7 @@ MemTracker::~MemTracker() { DCHECK_EQ(consumption_->current_value(), 0) << label_ << "\n" << GetStackTrace() << "\n" << LogUsage(""); + delete reservation_counters_.Load(); } void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) { @@ -246,11 +247,13 @@ string MemTracker::LogUsage(const string& prefix) const { int64_t total = consumption(); int64_t peak = consumption_->value(); - if (reservation_counters_ != NULL) { - int64_t reservation = reservation_counters_->peak_reservation->current_value(); + + ReservationTrackerCounters* reservation_counters = reservation_counters_.Load(); + if (reservation_counters != nullptr) { + int64_t reservation = reservation_counters->peak_reservation->current_value(); int64_t used_reservation = - reservation_counters_->peak_used_reservation->current_value(); - int64_t reservation_limit = reservation_counters_->reservation_limit->value(); + reservation_counters->peak_used_reservation->current_value(); + int64_t reservation_limit = reservation_counters->reservation_limit->value(); ss << " BufferPoolUsed/Reservation=" << PrettyPrinter::Print(used_reservation, TUnit::BYTES) << "/" << PrettyPrinter::Print(reservation, TUnit::BYTES); @@ -265,7 +268,7 @@ string MemTracker::LogUsage(const string& prefix) const { stringstream prefix_ss; prefix_ss << prefix << " "; string new_prefix = prefix_ss.str(); - lock_guard<mutex> l(child_trackers_lock_); + lock_guard<SpinLock> l(child_trackers_lock_); string child_trackers_usage = LogUsage(new_prefix, child_trackers_); if (!child_trackers_usage.empty()) ss << "\n" << child_trackers_usage; return ss.str(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d0152d42/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 8fd40b5..acf3b46 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -65,6 +65,8 @@ class TQueryOptions; /// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so /// this will be called before the process limit is reported as exceeded. GcFunctions are /// called in the order they are added, so expensive functions should be added last. +/// GcFunctions are called with a global lock held, so should be non-blocking and not +/// call back into MemTrackers, except to release memory. // /// This class is thread-safe. class MemTracker { @@ -293,6 +295,7 @@ class MemTracker { MemTracker* parent() const { return parent_; } /// Signature for function that can be called to free some memory after limit is reached. + /// See the class header for further details on what these functions should do. typedef boost::function<void ()> GcFunction; /// Add a function 'f' to be called if the limit is reached. @@ -324,8 +327,7 @@ class MemTracker { bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); } /// If consumption is higher than max_consumption, attempts to free memory by calling - /// any - /// added GC functions. Returns true if max_consumption is still exceeded. Takes + /// any added GC functions. Returns true if max_consumption is still exceeded. Takes /// gc_lock. Updates metrics if initialized. bool GcMemory(int64_t max_consumption); @@ -384,9 +386,9 @@ class MemTracker { UIntGauge* consumption_metric_; /// If non-NULL, counters from a corresponding ReservationTracker that should be - /// reported in logs and other diagnostics. The counters are owned by the fragment's - /// RuntimeProfile. - boost::scoped_ptr<ReservationTrackerCounters> reservation_counters_; + /// reported in logs and other diagnostics. Owned by this MemTracker. The counters + /// are owned by the fragment's RuntimeProfile. + AtomicPtr<ReservationTrackerCounters> reservation_counters_; std::vector<MemTracker*> all_trackers_; // this tracker plus all of its ancestors std::vector<MemTracker*> limit_trackers_; // all_trackers_ with valid limits @@ -394,7 +396,7 @@ class MemTracker { /// All the child trackers of this tracker. Used only for computing resource pool mem /// reserved and error reporting, i.e., updating a parent tracker does not update its /// children. - mutable boost::mutex child_trackers_lock_; + mutable SpinLock child_trackers_lock_; std::list<MemTracker*> child_trackers_; /// Iterator into parent_->child_trackers_ for this object. Stored to have O(1)
