http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile-test.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc index fe9f3ae..1bc7911 100644 --- a/be/src/util/runtime-profile-test.cc +++ b/be/src/util/runtime-profile-test.cc @@ -34,17 +34,17 @@ namespace impala { TEST(CountersTest, Basic) { ObjectPool pool; - RuntimeProfile profile_a(&pool, "ProfileA"); - RuntimeProfile profile_a1(&pool, "ProfileA1"); - RuntimeProfile profile_a2(&pool, "ProfileAb"); + RuntimeProfile* profile_a = RuntimeProfile::Create(&pool, "ProfileA"); + RuntimeProfile* profile_a1 = RuntimeProfile::Create(&pool, "ProfileA1"); + RuntimeProfile* profile_a2 = RuntimeProfile::Create(&pool, "ProfileAb"); TRuntimeProfileTree thrift_profile; - profile_a.AddChild(&profile_a1); - profile_a.AddChild(&profile_a2); + profile_a->AddChild(profile_a1); + profile_a->AddChild(profile_a2); // Test Empty - profile_a.ToThrift(&thrift_profile.nodes); + profile_a->ToThrift(&thrift_profile.nodes); EXPECT_EQ(thrift_profile.nodes.size(), 3); thrift_profile.nodes.clear(); @@ -53,7 +53,7 @@ TEST(CountersTest, Basic) { RuntimeProfile::Counter* counter_merged; // Updating/setting counter - counter_a = profile_a.AddCounter("A", TUnit::UNIT); + counter_a = profile_a->AddCounter("A", TUnit::UNIT); EXPECT_TRUE(counter_a != NULL); counter_a->Add(10); counter_a->Add(-5); @@ -61,40 +61,40 @@ TEST(CountersTest, Basic) { counter_a->Set(1); EXPECT_EQ(counter_a->value(), 1); - counter_b = profile_a2.AddCounter("B", TUnit::BYTES); + counter_b = profile_a2->AddCounter("B", TUnit::BYTES); EXPECT_TRUE(counter_b != NULL); // Serialize/deserialize - profile_a.ToThrift(&thrift_profile.nodes); + profile_a->ToThrift(&thrift_profile.nodes); RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift(&pool, thrift_profile); counter_merged = from_thrift->GetCounter("A"); EXPECT_EQ(counter_merged->value(), 1); EXPECT_TRUE(from_thrift->GetCounter("Not there") == NULL); // Averaged - RuntimeProfile averaged_profile(&pool, "Merged", true); - averaged_profile.UpdateAverage(from_thrift); - counter_merged = averaged_profile.GetCounter("A"); + RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true); + averaged_profile->UpdateAverage(from_thrift); + counter_merged = averaged_profile->GetCounter("A"); EXPECT_EQ(counter_merged->value(), 1); // UpdateAverage again, there should be no change. - averaged_profile.UpdateAverage(from_thrift); + averaged_profile->UpdateAverage(from_thrift); EXPECT_EQ(counter_merged->value(), 1); - counter_a = profile_a2.AddCounter("A", TUnit::UNIT); + counter_a = profile_a2->AddCounter("A", TUnit::UNIT); counter_a->Set(3); - averaged_profile.UpdateAverage(&profile_a2); + averaged_profile->UpdateAverage(profile_a2); EXPECT_EQ(counter_merged->value(), 2); // Update - RuntimeProfile updated_profile(&pool, "Updated"); - updated_profile.Update(thrift_profile); - RuntimeProfile::Counter* counter_updated = updated_profile.GetCounter("A"); + RuntimeProfile* updated_profile = RuntimeProfile::Create(&pool, "Updated"); + updated_profile->Update(thrift_profile); + RuntimeProfile::Counter* counter_updated = updated_profile->GetCounter("A"); EXPECT_EQ(counter_updated->value(), 1); // Update 2 more times, counters should stay the same - updated_profile.Update(thrift_profile); - updated_profile.Update(thrift_profile); + updated_profile->Update(thrift_profile); + updated_profile->Update(thrift_profile); EXPECT_EQ(counter_updated->value(), 1); } @@ -110,27 +110,27 @@ TEST(CountersTest, MergeAndUpdate) { // children, with the counters from the shared child aggregated. ObjectPool pool; - RuntimeProfile profile1(&pool, "Parent1"); - RuntimeProfile p1_child1(&pool, "Child1"); - RuntimeProfile p1_child2(&pool, "Child2"); - profile1.AddChild(&p1_child1); - profile1.AddChild(&p1_child2); - - RuntimeProfile profile2(&pool, "Parent2"); - RuntimeProfile p2_child1(&pool, "Child1"); - RuntimeProfile p2_child3(&pool, "Child3"); - profile2.AddChild(&p2_child1); - profile2.AddChild(&p2_child3); + RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Parent1"); + RuntimeProfile* p1_child1 = RuntimeProfile::Create(&pool, "Child1"); + RuntimeProfile* p1_child2 = RuntimeProfile::Create(&pool, "Child2"); + profile1->AddChild(p1_child1); + profile1->AddChild(p1_child2); + + RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Parent2"); + RuntimeProfile* p2_child1 = RuntimeProfile::Create(&pool, "Child1"); + RuntimeProfile* p2_child3 = RuntimeProfile::Create(&pool, "Child3"); + profile2->AddChild(p2_child1); + profile2->AddChild(p2_child3); // Create parent level counters RuntimeProfile::Counter* parent1_shared = - profile1.AddCounter("Parent Shared", TUnit::UNIT); + profile1->AddCounter("Parent Shared", TUnit::UNIT); RuntimeProfile::Counter* parent2_shared = - profile2.AddCounter("Parent Shared", TUnit::UNIT); + profile2->AddCounter("Parent Shared", TUnit::UNIT); RuntimeProfile::Counter* parent1_only = - profile1.AddCounter("Parent 1 Only", TUnit::UNIT); + profile1->AddCounter("Parent 1 Only", TUnit::UNIT); RuntimeProfile::Counter* parent2_only = - profile2.AddCounter("Parent 2 Only", TUnit::UNIT); + profile2->AddCounter("Parent 2 Only", TUnit::UNIT); parent1_shared->Add(1); parent2_shared->Add(3); parent1_only->Add(2); @@ -138,17 +138,17 @@ TEST(CountersTest, MergeAndUpdate) { // Create child level counters RuntimeProfile::Counter* p1_c1_shared = - p1_child1.AddCounter("Child1 Shared", TUnit::UNIT); + p1_child1->AddCounter("Child1 Shared", TUnit::UNIT); RuntimeProfile::Counter* p1_c1_only = - p1_child1.AddCounter("Child1 Parent 1 Only", TUnit::UNIT); + p1_child1->AddCounter("Child1 Parent 1 Only", TUnit::UNIT); RuntimeProfile::Counter* p1_c2 = - p1_child2.AddCounter("Child2", TUnit::UNIT); + p1_child2->AddCounter("Child2", TUnit::UNIT); RuntimeProfile::Counter* p2_c1_shared = - p2_child1.AddCounter("Child1 Shared", TUnit::UNIT); + p2_child1->AddCounter("Child1 Shared", TUnit::UNIT); RuntimeProfile::Counter* p2_c1_only = - p1_child1.AddCounter("Child1 Parent 2 Only", TUnit::UNIT); + p1_child1->AddCounter("Child1 Parent 2 Only", TUnit::UNIT); RuntimeProfile::Counter* p2_c3 = - p2_child3.AddCounter("Child3", TUnit::UNIT); + p2_child3->AddCounter("Child3", TUnit::UNIT); p1_c1_shared->Add(10); p1_c1_only->Add(50); p2_c1_shared->Add(20); @@ -158,17 +158,17 @@ TEST(CountersTest, MergeAndUpdate) { // Merge the two and validate TRuntimeProfileTree tprofile1; - profile1.ToThrift(&tprofile1); - RuntimeProfile averaged_profile(&pool, "merged", true); - averaged_profile.UpdateAverage(&profile1); - averaged_profile.UpdateAverage(&profile2); - EXPECT_EQ(5, averaged_profile.num_counters()); - ValidateCounter(&averaged_profile, "Parent Shared", 2); - ValidateCounter(&averaged_profile, "Parent 1 Only", 2); - ValidateCounter(&averaged_profile, "Parent 2 Only", 5); + profile1->ToThrift(&tprofile1); + RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "merged", true); + averaged_profile->UpdateAverage(profile1); + averaged_profile->UpdateAverage(profile2); + EXPECT_EQ(5, averaged_profile->num_counters()); + ValidateCounter(averaged_profile, "Parent Shared", 2); + ValidateCounter(averaged_profile, "Parent 1 Only", 2); + ValidateCounter(averaged_profile, "Parent 2 Only", 5); vector<RuntimeProfile*> children; - averaged_profile.GetChildren(&children); + averaged_profile->GetChildren(&children); EXPECT_EQ(children.size(), 3); for (int i = 0; i < 3; ++i) { @@ -191,16 +191,16 @@ TEST(CountersTest, MergeAndUpdate) { // make sure we can print stringstream dummy; - averaged_profile.PrettyPrint(&dummy); + averaged_profile->PrettyPrint(&dummy); // Update profile2 w/ profile1 and validate - profile2.Update(tprofile1); - EXPECT_EQ(5, profile2.num_counters()); - ValidateCounter(&profile2, "Parent Shared", 1); - ValidateCounter(&profile2, "Parent 1 Only", 2); - ValidateCounter(&profile2, "Parent 2 Only", 5); + profile2->Update(tprofile1); + EXPECT_EQ(5, profile2->num_counters()); + ValidateCounter(profile2, "Parent Shared", 1); + ValidateCounter(profile2, "Parent 1 Only", 2); + ValidateCounter(profile2, "Parent 2 Only", 5); - profile2.GetChildren(&children); + profile2->GetChildren(&children); EXPECT_EQ(children.size(), 3); for (int i = 0; i < 3; ++i) { @@ -222,14 +222,14 @@ TEST(CountersTest, MergeAndUpdate) { } // make sure we can print - profile2.PrettyPrint(&dummy); + profile2->PrettyPrint(&dummy); } TEST(CountersTest, HighWaterMarkCounters) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); RuntimeProfile::HighWaterMarkCounter* bytes_counter = - profile.AddHighWaterMarkCounter("bytes", TUnit::BYTES); + profile->AddHighWaterMarkCounter("bytes", TUnit::BYTES); bytes_counter->Set(10); EXPECT_EQ(bytes_counter->current_value(), 10); @@ -260,9 +260,9 @@ TEST(CountersTest, HighWaterMarkCounters) { TEST(CountersTest, SummaryStatsCounters) { ObjectPool pool; - RuntimeProfile profile1(&pool, "Profile 1"); + RuntimeProfile* profile1 = RuntimeProfile::Create(&pool, "Profile 1"); RuntimeProfile::SummaryStatsCounter* summary_stats_counter_1 = - profile1.AddSummaryStatsCounter("summary_stats", TUnit::UNIT); + profile1->AddSummaryStatsCounter("summary_stats", TUnit::UNIT); EXPECT_EQ(summary_stats_counter_1->value(), 0); EXPECT_EQ(summary_stats_counter_1->MinValue(), numeric_limits<int64_t>::max()); @@ -297,9 +297,9 @@ TEST(CountersTest, SummaryStatsCounters) { EXPECT_EQ(summary_stats_counter_1->MinValue(), -40); EXPECT_EQ(summary_stats_counter_1->MaxValue(), 40); - RuntimeProfile profile2(&pool, "Profile 2"); + RuntimeProfile* profile2 = RuntimeProfile::Create(&pool, "Profile 2"); RuntimeProfile::SummaryStatsCounter* summary_stats_counter_2 = - profile2.AddSummaryStatsCounter("summary_stats", TUnit::UNIT); + profile2->AddSummaryStatsCounter("summary_stats", TUnit::UNIT); summary_stats_counter_2->UpdateCounter(100); EXPECT_EQ(summary_stats_counter_2->value(), 100); @@ -307,10 +307,10 @@ TEST(CountersTest, SummaryStatsCounters) { EXPECT_EQ(summary_stats_counter_2->MaxValue(), 100); TRuntimeProfileTree tprofile1; - profile1.ToThrift(&tprofile1); + profile1->ToThrift(&tprofile1); // Merge profile1 and profile2 and check that profile2 is overwritten. - profile2.Update(tprofile1); + profile2->Update(tprofile1); EXPECT_EQ(summary_stats_counter_2->value(), 4); EXPECT_EQ(summary_stats_counter_2->MinValue(), -40); EXPECT_EQ(summary_stats_counter_2->MaxValue(), 40); @@ -319,16 +319,16 @@ TEST(CountersTest, SummaryStatsCounters) { TEST(CountersTest, DerivedCounters) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); RuntimeProfile::Counter* bytes_counter = - profile.AddCounter("bytes", TUnit::BYTES); + profile->AddCounter("bytes", TUnit::BYTES); RuntimeProfile::Counter* ticks_counter = - profile.AddCounter("ticks", TUnit::TIME_NS); + profile->AddCounter("ticks", TUnit::TIME_NS); // set to 1 sec ticks_counter->Set(1000L * 1000L * 1000L); RuntimeProfile::DerivedCounter* throughput_counter = - profile.AddDerivedCounter("throughput", TUnit::BYTES, + profile->AddDerivedCounter("throughput", TUnit::BYTES, bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_counter, ticks_counter)); bytes_counter->Set(10); @@ -341,11 +341,11 @@ TEST(CountersTest, DerivedCounters) { TEST(CountersTest, AverageSetCounters) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); RuntimeProfile::Counter* bytes_1_counter = - profile.AddCounter("bytes 1", TUnit::BYTES); + profile->AddCounter("bytes 1", TUnit::BYTES); RuntimeProfile::Counter* bytes_2_counter = - profile.AddCounter("bytes 2", TUnit::BYTES); + profile->AddCounter("bytes 2", TUnit::BYTES); bytes_1_counter->Set(10); RuntimeProfile::AveragedCounter bytes_avg(TUnit::BYTES); @@ -366,9 +366,9 @@ TEST(CountersTest, AverageSetCounters) { EXPECT_EQ(bytes_avg.value(), 25); RuntimeProfile::Counter* double_1_counter = - profile.AddCounter("double 1", TUnit::DOUBLE_VALUE); + profile->AddCounter("double 1", TUnit::DOUBLE_VALUE); RuntimeProfile::Counter* double_2_counter = - profile.AddCounter("double 2", TUnit::DOUBLE_VALUE); + profile->AddCounter("double 2", TUnit::DOUBLE_VALUE); double_1_counter->Set(1.0f); RuntimeProfile::AveragedCounter double_avg(TUnit::DOUBLE_VALUE); double_avg.UpdateCounter(double_1_counter); @@ -390,17 +390,17 @@ TEST(CountersTest, AverageSetCounters) { TEST(CountersTest, InfoStringTest) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); - EXPECT_TRUE(profile.GetInfoString("Key") == NULL); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); + EXPECT_TRUE(profile->GetInfoString("Key") == NULL); - profile.AddInfoString("Key", "Value"); - const string* value = profile.GetInfoString("Key"); + profile->AddInfoString("Key", "Value"); + const string* value = profile->GetInfoString("Key"); EXPECT_TRUE(value != NULL); EXPECT_EQ(*value, "Value"); // Convert it to thrift TRuntimeProfileTree tprofile; - profile.ToThrift(&tprofile); + profile->ToThrift(&tprofile); // Convert it back RuntimeProfile* from_thrift = RuntimeProfile::CreateFromThrift( @@ -410,34 +410,34 @@ TEST(CountersTest, InfoStringTest) { EXPECT_EQ(*value, "Value"); // Test update. - RuntimeProfile update_dst_profile(&pool, "Profile2"); - update_dst_profile.Update(tprofile); - value = update_dst_profile.GetInfoString("Key"); + RuntimeProfile* update_dst_profile = RuntimeProfile::Create(&pool, "Profile2"); + update_dst_profile->Update(tprofile); + value = update_dst_profile->GetInfoString("Key"); EXPECT_TRUE(value != NULL); EXPECT_EQ(*value, "Value"); // Update the original profile, convert it to thrift and update from the dst // profile - profile.AddInfoString("Key", "NewValue"); - profile.AddInfoString("Foo", "Bar"); - EXPECT_EQ(*profile.GetInfoString("Key"), "NewValue"); - EXPECT_EQ(*profile.GetInfoString("Foo"), "Bar"); - profile.ToThrift(&tprofile); - - update_dst_profile.Update(tprofile); - EXPECT_EQ(*update_dst_profile.GetInfoString("Key"), "NewValue"); - EXPECT_EQ(*update_dst_profile.GetInfoString("Foo"), "Bar"); + profile->AddInfoString("Key", "NewValue"); + profile->AddInfoString("Foo", "Bar"); + EXPECT_EQ(*profile->GetInfoString("Key"), "NewValue"); + EXPECT_EQ(*profile->GetInfoString("Foo"), "Bar"); + profile->ToThrift(&tprofile); + + update_dst_profile->Update(tprofile); + EXPECT_EQ(*update_dst_profile->GetInfoString("Key"), "NewValue"); + EXPECT_EQ(*update_dst_profile->GetInfoString("Foo"), "Bar"); } TEST(CountersTest, RateCounters) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); RuntimeProfile::Counter* bytes_counter = - profile.AddCounter("bytes", TUnit::BYTES); + profile->AddCounter("bytes", TUnit::BYTES); RuntimeProfile::Counter* rate_counter = - profile.AddRateCounter("RateCounter", bytes_counter); + profile->AddRateCounter("RateCounter", bytes_counter); EXPECT_TRUE(rate_counter->unit() == TUnit::BYTES_PER_SECOND); EXPECT_EQ(rate_counter->value(), 0); @@ -449,8 +449,8 @@ TEST(CountersTest, RateCounters) { int64_t rate = rate_counter->value(); - // Remove the counter so it no longer gets updates - PeriodicCounterUpdater::StopRateCounter(rate_counter); + // Stop the counter so it no longer gets updates + profile->StopPeriodicCounters(); // The rate counter is not perfectly accurate. Currently updated at 500ms intervals, // we should have seen somewhere between 1 and 3 updates (33 - 200 MB/s) @@ -468,44 +468,42 @@ TEST(CountersTest, RateCounters) { TEST(CountersTest, BucketCounters) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); RuntimeProfile::Counter* unit_counter = - profile.AddCounter("unit", TUnit::UNIT); + profile->AddCounter("unit", TUnit::UNIT); // Set the unit to 1 before sampling unit_counter->Set(1); // Create the bucket counters and start sampling - vector<RuntimeProfile::Counter*> buckets; - buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); - buckets.push_back(pool.Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); - profile.RegisterBucketingCounters(unit_counter, &buckets); + vector<RuntimeProfile::Counter*>* buckets = + profile->AddBucketingCounters(unit_counter, 2); // Wait two seconds. sleep(2); // Stop sampling - PeriodicCounterUpdater::StopBucketingCounters(&buckets, true); + profile->StopPeriodicCounters(); // TODO: change the value to double // The value of buckets[0] should be zero and buckets[1] should be 1. - double val0 = buckets[0]->double_value(); - double val1 = buckets[1]->double_value(); + double val0 = (*buckets)[0]->double_value(); + double val1 = (*buckets)[1]->double_value(); EXPECT_EQ(0, val0); EXPECT_EQ(100, val1); // Wait another second. The counter has been removed. So the value should not be // changed (much). sleep(2); - EXPECT_EQ(val0, buckets[0]->double_value()); - EXPECT_EQ(val1, buckets[1]->double_value()); + EXPECT_EQ(val0, (*buckets)[0]->double_value()); + EXPECT_EQ(val1, (*buckets)[1]->double_value()); } TEST(CountersTest, EventSequences) { ObjectPool pool; - RuntimeProfile profile(&pool, "Profile"); - RuntimeProfile::EventSequence* seq = profile.AddEventSequence("event sequence"); + RuntimeProfile* profile = RuntimeProfile::Create(&pool, "Profile"); + RuntimeProfile::EventSequence* seq = profile->AddEventSequence("event sequence"); seq->MarkEvent("aaaa"); seq->MarkEvent("bbbb"); seq->MarkEvent("cccc"); @@ -524,7 +522,7 @@ TEST(CountersTest, EventSequences) { } TRuntimeProfileTree thrift_profile; - profile.ToThrift(&thrift_profile); + profile->ToThrift(&thrift_profile); EXPECT_TRUE(thrift_profile.nodes[0].__isset.event_sequences); EXPECT_EQ(1, thrift_profile.nodes[0].event_sequences.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 12f4e25..4254b9c 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -53,10 +53,14 @@ const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime"; const string RuntimeProfile::LOCAL_TIME_COUNTER_NAME = "LocalTime"; const string RuntimeProfile::INACTIVE_TIME_COUNTER_NAME = "InactiveTotalTime"; +RuntimeProfile* RuntimeProfile::Create(ObjectPool* pool, const string& name, + bool is_averaged_profile) { + return pool->Add(new RuntimeProfile(pool, name, is_averaged_profile)); +} + RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name, bool is_averaged_profile) : pool_(pool), - own_pool_(false), name_(name), metadata_(-1), is_averaged_profile_(is_averaged_profile), @@ -78,27 +82,25 @@ RuntimeProfile::RuntimeProfile(ObjectPool* pool, const string& name, } RuntimeProfile::~RuntimeProfile() { - map<string, Counter*>::const_iterator iter; - for (iter = counter_map_.begin(); iter != counter_map_.end(); ++iter) { - PeriodicCounterUpdater::StopRateCounter(iter->second); - PeriodicCounterUpdater::StopSamplingCounter(iter->second); - } + DCHECK(!has_active_periodic_counters_); +} - set<vector<Counter*>* >::const_iterator buckets_iter; - for (buckets_iter = bucketing_counters_.begin(); - buckets_iter != bucketing_counters_.end(); ++buckets_iter) { - // This is just a clean up. No need to perform conversion. Also, the underlying - // counters might be gone already. - PeriodicCounterUpdater::StopBucketingCounters(*buckets_iter, false); +void RuntimeProfile::StopPeriodicCounters() { + lock_guard<SpinLock> l(counter_map_lock_); + if (!has_active_periodic_counters_) return; + for (Counter* sampling_counter : sampling_counters_) { + PeriodicCounterUpdater::StopSamplingCounter(sampling_counter); } - - TimeSeriesCounterMap::const_iterator time_series_it; - for (time_series_it = time_series_counter_map_.begin(); - time_series_it != time_series_counter_map_.end(); ++time_series_it) { - PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_it->second); + for (Counter* rate_counter : rate_counters_) { + PeriodicCounterUpdater::StopRateCounter(rate_counter); } - - if (own_pool_) delete pool_; + for (vector<Counter*>* counters : bucketing_counters_) { + PeriodicCounterUpdater::StopBucketingCounters(counters); + } + for (auto& time_series_counter_entry : time_series_counter_map_) { + PeriodicCounterUpdater::StopTimeSeriesCounter(time_series_counter_entry.second); + } + has_active_periodic_counters_ = false; } RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool, @@ -113,7 +115,7 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool, DCHECK_LT(*idx, nodes.size()); const TRuntimeProfileNode& node = nodes[*idx]; - RuntimeProfile* profile = pool->Add(new RuntimeProfile(pool, node.name)); + RuntimeProfile* profile = Create(pool, node.name); profile->metadata_ = node.metadata; for (int i = 0; i < node.counters.size(); ++i) { const TCounter& counter = node.counters[i]; @@ -208,7 +210,7 @@ void RuntimeProfile::UpdateAverage(RuntimeProfile* other) { if (j != child_map_.end()) { child = j->second; } else { - child = pool_->Add(new RuntimeProfile(pool_, other_child->name_, true)); + child = Create(pool_, other_child->name_, true); child->metadata_ = other_child->metadata_; bool indent_other_child = other->children_[i].second; child_map_[child->name_] = child; @@ -282,7 +284,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) } { - lock_guard<SpinLock> l(time_series_counter_map_lock_); + lock_guard<SpinLock> l(counter_map_lock_); for (int i = 0; i < node.time_series_counters.size(); ++i) { const TTimeSeriesCounter& c = node.time_series_counters[i]; TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(c.name); @@ -322,7 +324,7 @@ void RuntimeProfile::Update(const vector<TRuntimeProfileNode>& nodes, int* idx) if (j != child_map_.end()) { child = j->second; } else { - child = pool_->Add(new RuntimeProfile(pool_, tchild.name)); + child = Create(pool_, tchild.name); child->metadata_ = tchild.metadata; child_map_[tchild.name] = child; children_.push_back(make_pair(child, tchild.indent)); @@ -439,7 +441,7 @@ RuntimeProfile* RuntimeProfile::CreateChild(const string& name, bool indent, bool prepend) { lock_guard<SpinLock> l(children_lock_); DCHECK(child_map_.find(name) == child_map_.end()); - RuntimeProfile* child = pool_->Add(new RuntimeProfile(pool_, name)); + RuntimeProfile* child = Create(pool_, name); AddChildLocked(child, indent, prepend ? children_.begin() : children_.end()); return child; } @@ -504,9 +506,16 @@ void RuntimeProfile::AddCodegenMsg( #define ADD_COUNTER_IMPL(NAME, T) \ RuntimeProfile::T* RuntimeProfile::NAME( \ const string& name, TUnit::type unit, const string& parent_counter_name) { \ - DCHECK_EQ(is_averaged_profile_, false); \ lock_guard<SpinLock> l(counter_map_lock_); \ + bool dummy; \ + return NAME##Locked(name, unit, parent_counter_name, &dummy); \ + } \ + RuntimeProfile::T* RuntimeProfile::NAME##Locked( const string& name, \ + TUnit::type unit, const string& parent_counter_name, bool* created) { \ + counter_map_lock_.DCheckLocked(); \ + DCHECK_EQ(is_averaged_profile_, false); \ if (counter_map_.find(name) != counter_map_.end()) { \ + *created = false; \ return reinterpret_cast<T*>(counter_map_[name]); \ } \ DCHECK(parent_counter_name == ROOT_COUNTER \ @@ -516,6 +525,7 @@ void RuntimeProfile::AddCodegenMsg( set<string>* child_counters = \ FindOrInsert(&child_counter_map_, parent_counter_name, set<string>()); \ child_counters->insert(name); \ + *created = true; \ return counter; \ } @@ -668,7 +678,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const { // <Name> (<period>): <val1>, <val2>, <etc> SpinLock* lock; int num, period; - lock_guard<SpinLock> l(time_series_counter_map_lock_); + lock_guard<SpinLock> l(counter_map_lock_); for (const TimeSeriesCounterMap::value_type& v: time_series_counter_map_) { const int64_t* samples = v.second->samples_.GetSamples(&num, &period, &lock); if (num > 0) { @@ -813,7 +823,7 @@ void RuntimeProfile::ToThrift(vector<TRuntimeProfileNode>* nodes) const { } { - lock_guard<SpinLock> l(time_series_counter_map_lock_); + lock_guard<SpinLock> l(counter_map_lock_); if (time_series_counter_map_.size() != 0) { node.__set_time_series_counters( vector<TTimeSeriesCounter>(time_series_counter_map_.size())); @@ -884,44 +894,71 @@ RuntimeProfile::Counter* RuntimeProfile::AddRateCounter( DCHECK(false) << "Unsupported src counter unit: " << src_counter->unit(); return NULL; } - Counter* dst_counter = AddCounter(name, dst_unit); - PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter, - PeriodicCounterUpdater::RATE_COUNTER); - return dst_counter; + { + lock_guard<SpinLock> l(counter_map_lock_); + bool created; + Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created); + if (!created) return dst_counter; + rate_counters_.push_back(dst_counter); + PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter, + PeriodicCounterUpdater::RATE_COUNTER); + has_active_periodic_counters_ = true; + return dst_counter; + } } RuntimeProfile::Counter* RuntimeProfile::AddRateCounter( const string& name, DerivedCounterFunction fn, TUnit::type dst_unit) { - Counter* dst_counter = AddCounter(name, dst_unit); + lock_guard<SpinLock> l(counter_map_lock_); + bool created; + Counter* dst_counter = AddCounterLocked(name, dst_unit, "", &created); + if (!created) return dst_counter; + rate_counters_.push_back(dst_counter); PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, fn, dst_counter, PeriodicCounterUpdater::RATE_COUNTER); + has_active_periodic_counters_ = true; return dst_counter; } RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter( const string& name, Counter* src_counter) { DCHECK(src_counter->unit() == TUnit::UNIT); - Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE); + lock_guard<SpinLock> l(counter_map_lock_); + bool created; + Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created); + if (!created) return dst_counter; + sampling_counters_.push_back(dst_counter); PeriodicCounterUpdater::RegisterPeriodicCounter(src_counter, NULL, dst_counter, PeriodicCounterUpdater::SAMPLING_COUNTER); + has_active_periodic_counters_ = true; return dst_counter; } RuntimeProfile::Counter* RuntimeProfile::AddSamplingCounter( const string& name, DerivedCounterFunction sample_fn) { - Counter* dst_counter = AddCounter(name, TUnit::DOUBLE_VALUE); + lock_guard<SpinLock> l(counter_map_lock_); + bool created; + Counter* dst_counter = AddCounterLocked(name, TUnit::DOUBLE_VALUE, "", &created); + if (!created) return dst_counter; + sampling_counters_.push_back(dst_counter); PeriodicCounterUpdater::RegisterPeriodicCounter(NULL, sample_fn, dst_counter, PeriodicCounterUpdater::SAMPLING_COUNTER); + has_active_periodic_counters_ = true; return dst_counter; } -void RuntimeProfile::RegisterBucketingCounters(Counter* src_counter, - vector<Counter*>* buckets) { - { - lock_guard<SpinLock> l(counter_map_lock_); - bucketing_counters_.insert(buckets); +vector<RuntimeProfile::Counter*>* RuntimeProfile::AddBucketingCounters( + Counter* src_counter, int num_buckets) { + lock_guard<SpinLock> l(counter_map_lock_); + vector<RuntimeProfile::Counter*>* buckets = pool_->Add(new vector<Counter*>); + for (int i = 0; i < num_buckets; ++i) { + buckets->push_back( + pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); } + bucketing_counters_.insert(buckets); + has_active_periodic_counters_ = true; PeriodicCounterUpdater::RegisterBucketingCounters(src_counter, buckets); + return buckets; } RuntimeProfile::EventSequence* RuntimeProfile::AddEventSequence(const string& name) { @@ -978,16 +1015,14 @@ RuntimeProfile::SummaryStatsCounter* RuntimeProfile::AddSummaryStatsCounter( RuntimeProfile::TimeSeriesCounter* RuntimeProfile::AddTimeSeriesCounter( const string& name, TUnit::type unit, DerivedCounterFunction fn) { - DCHECK(fn != NULL); - TimeSeriesCounter* counter = NULL; - { - lock_guard<SpinLock> l(time_series_counter_map_lock_); - TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name); - if (it != time_series_counter_map_.end()) return it->second; - counter = pool_->Add(new TimeSeriesCounter(name, unit, fn)); - time_series_counter_map_[name] = counter; - } + DCHECK(fn != nullptr); + lock_guard<SpinLock> l(counter_map_lock_); + TimeSeriesCounterMap::iterator it = time_series_counter_map_.find(name); + if (it != time_series_counter_map_.end()) return it->second; + TimeSeriesCounter* counter = pool_->Add(new TimeSeriesCounter(name, unit, fn)); + time_series_counter_map_[name] = counter; PeriodicCounterUpdater::RegisterTimeSeriesCounter(counter); + has_active_periodic_counters_ = true; return counter; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 298c214..8348161 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -101,13 +101,13 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s typedef boost::function<int64_t ()> DerivedCounterFunction; - /// Create a runtime profile object with 'name'. Counters and merged profile are - /// allocated from pool. - /// If is_averaged_profile is true, the counters in this profile will be derived + /// Create a runtime profile object with 'name'. The profile, counters and any other + /// structures owned by the profile are allocated from 'pool'. + /// If 'is_averaged_profile' is true, the counters in this profile will be derived /// averages (of unit AveragedCounter) from other profiles, so the counter map will /// be left empty Otherwise, the counter map is initialized with a single entry for /// TotalTime. - RuntimeProfile(ObjectPool* pool, const std::string& name, + static RuntimeProfile* Create(ObjectPool* pool, const std::string& name, bool is_averaged_profile = false); ~RuntimeProfile(); @@ -247,6 +247,12 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// the key does not exist. const std::string* GetInfoString(const std::string& key) const; + /// Stops updating all counters in this profile that are periodically updated by a + /// background thread (i.e. sampling, rate, bucketing and time series counters). + /// Must be called before the profile is destroyed if any such counters are active. + /// Does not stop counters on descendant profiles. + void StopPeriodicCounters(); + /// Returns the counter for the total elapsed time. Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; } Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; } @@ -299,8 +305,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// Add a rate counter to the current profile based on src_counter with name. /// The rate counter is updated periodically based on the src counter. /// The rate counter has units in src_counter unit per second. - /// Rate counters should be stopped (by calling PeriodicCounterUpdater::StopRateCounter) - /// as soon as the src_counter stops changing. + /// StopPeriodicCounters() must be called to stop the periodic updating before this + /// profile is destroyed. The periodic updating can be stopped earlier by calling + /// PeriodicCounterUpdater::StopRateCounter() if 'src_counter' stops changing. Counter* AddRateCounter(const std::string& name, Counter* src_counter); /// Same as 'AddRateCounter' above except values are taken by calling fn. @@ -312,27 +319,40 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// The sampling counter is updated periodically based on the src counter by averaging /// the samples taken from the src counter. /// The sampling counter has the same unit as src_counter unit. - /// Sampling counters should be stopped (by calling - /// PeriodicCounterUpdater::StopSamplingCounter) as soon as the src_counter stops - /// changing. + /// StopPeriodicCounters() must be called to stop the periodic updating before this + /// profile is destroyed. The periodic updating can be stopped earlier by calling + /// PeriodicCounterUpdater::StopSamplingCounter() if 'src_counter' stops changing. Counter* AddSamplingCounter(const std::string& name, Counter* src_counter); /// Same as 'AddSamplingCounter' above except the samples are taken by calling fn. Counter* AddSamplingCounter(const std::string& name, DerivedCounterFunction fn); - /// Register a bucket of counters to store the sampled value of src_counter. - /// The src_counter is sampled periodically and the buckets are updated. - void RegisterBucketingCounters(Counter* src_counter, std::vector<Counter*>* buckets); + /// Create a set of counters, one per bucket, to store the sampled value of src_counter. + /// The 'src_counter' is sampled periodically to obtain the index of the bucket to + /// increment. E.g. if the value of 'src_counter' is 3, the bucket at index 3 is + /// updated. If the index exceeds the index of the last bucket, the last bucket is + /// updated. + /// + /// The created counters do not appear in the profile when serialized or + /// pretty-printed. The caller must do its own processing of the counter value + /// (e.g. converting it to an info string). + /// TODO: make this interface more consistent and sane. + /// + /// StopPeriodicCounters() must be called to stop the periodic updating before this + /// profile is destroyed. The periodic updating can be stopped earlier by calling + /// PeriodicCounterUpdater::StopBucketingCounters() if 'buckets' stops changing. + std::vector<Counter*>* AddBucketingCounters(Counter* src_counter, int num_buckets); /// Create a time series counter. This begins sampling immediately. This counter /// contains a number of samples that are collected periodically by calling sample_fn(). + /// StopPeriodicCounters() must be called to stop the periodic updating before this + /// profile is destroyed. The periodic updating can be stopped earlier by calling + /// PeriodicCounterUpdater::StopTimeSeriesCounter() if the input stops changing. /// Note: these counters don't get merged (to make average profiles) TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, TUnit::type unit, DerivedCounterFunction sample_fn); - /// Create a time series counter that samples the source counter. Sampling begins - /// immediately. - /// Note: these counters don't get merged (to make average profiles) + /// Same as above except the samples are collected from 'src_counter'. TimeSeriesCounter* AddTimeSeriesCounter(const std::string& name, Counter* src_counter); /// Recursively compute the fraction of the 'total_time' spent in this profile and @@ -345,9 +365,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// object, but occasionally allocated in the constructor. ObjectPool* pool_; - /// True if we have to delete the pool_ on destruction. - bool own_pool_; - /// Name for this runtime profile. std::string name_; @@ -369,9 +386,27 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s ChildCounterMap child_counter_map_; /// A set of bucket counters registered in this runtime profile. - std::set<std::vector<Counter*>* > bucketing_counters_; + std::set<std::vector<Counter*>*> bucketing_counters_; + + /// Rate counters, which also appear in 'counter_map_'. Tracked separately to enable + /// stopping the counters. + std::vector<Counter*> rate_counters_; + + /// Sampling counters, which also appear in 'counter_map_'. Tracked separately to enable + /// stopping the counters. + std::vector<Counter*> sampling_counters_; + + /// Time series counters. These do not appear in 'counter_map_'. Tracked separately + /// because they are displayed separately in the profile and need to be stopped. + typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap; + TimeSeriesCounterMap time_series_counter_map_; - /// Protects counter_map_, counter_child_map_ and bucketing_counters_. + /// True if this profile has active periodic counters, including bucketing, rate, + /// sampling and time series counters. + bool has_active_periodic_counters_ = false; + + /// Protects counter_map_, child_counter_map_, bucketing_counters_, rate_counters_, + /// sampling_counters_, time_series_counter_map_, and has_active_periodic_counters_. mutable SpinLock counter_map_lock_; /// Child profiles. Does not own memory. @@ -403,12 +438,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// Protects event_sequence_map_. mutable SpinLock event_sequence_lock_; - typedef std::map<std::string, TimeSeriesCounter*> TimeSeriesCounterMap; - TimeSeriesCounterMap time_series_counter_map_; - - /// Protects time_series_counter_map_. - mutable SpinLock time_series_counter_map_lock_; - typedef std::map<std::string, SummaryStatsCounter*> SummaryStatsCounterMap; SummaryStatsCounterMap summary_stats_map_; @@ -430,6 +459,9 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s /// ComputeTimeInProfile() int64_t local_time_ns_; + /// Constructor used by Create(). + RuntimeProfile(ObjectPool* pool, const std::string& name, bool is_averaged_profile); + /// Update a subtree of profiles from nodes, rooted at *idx. /// On return, *idx points to the node immediately following this subtree. void Update(const std::vector<TRuntimeProfileNode>& nodes, int* idx); @@ -454,6 +486,16 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s static RuntimeProfile* CreateFromThrift( ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx); + /// Internal implementations of the Add*Counter() functions for use when the caller + /// holds counter_map_lock_. Also returns 'created', which is true if a new counter was + /// created and false if a counter with the given name already existed. + Counter* AddCounterLocked(const std::string& name, TUnit::type unit, + const std::string& parent_counter_name, bool* created); + HighWaterMarkCounter* AddHighWaterMarkCounterLocked(const std::string& name, + TUnit::type unit, const std::string& parent_counter_name, bool* created); + ConcurrentTimerCounter* AddConcurrentTimerCounterLocked(const std::string& name, + TUnit::type unit, const std::string& parent_counter_name, bool* created); + /// Inserts 'child' before the iterator 'insert_pos' in 'children_'. /// 'children_lock_' must be held by the caller. void AddChildLocked(
