IMPALA-5895: clean up runtime profile lifecycle Require callers to explicitly stop counter updating instead of doing it in the destructor. This replaces ad-hoc logic to stop individual counters.
Track which counters need to be stopped in separate lists instead of stopping everything. Force all RuntimeProfiles to use ObjectPools in a uniform way - the profile, its counters and its children all are allocated from the same pool. This is done via a new Create() method. Consolidate 'time_series_counter_map_lock_' and 'counter_map_lock_' to reduce complexity of the locking scheme. I didn't see any benefit to sharding the locks in this way - there are only two time series counters per fragment instance, which a small fraction of the total number of counters. Change-Id: I45c39ac36c8e3c277213d32f5ae5f14be6b7f0df Reviewed-on: http://gerrit.cloudera.org:8080/8069 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7866eec5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7866eec5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7866eec5 Branch: refs/heads/master Commit: 7866eec5bdcbf9194a4aad2c87c354cbaad7b802 Parents: 741b052 Author: Tim Armstrong <[email protected]> Authored: Tue Sep 5 15:51:49 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Sep 20 08:48:38 2017 +0000 ---------------------------------------------------------------------- be/src/benchmarks/hash-benchmark.cc | 8 +- be/src/codegen/llvm-codegen.cc | 32 +-- be/src/codegen/llvm-codegen.h | 4 +- be/src/exec/data-sink.cc | 2 +- be/src/exec/data-source-scan-node.cc | 4 +- be/src/exec/exec-node.cc | 15 +- be/src/exec/exec-node.h | 7 +- be/src/exec/hash-table-test.cc | 2 +- be/src/exec/hbase-scan-node.cc | 2 +- be/src/exec/hdfs-scan-node-base.cc | 26 +-- be/src/exec/hdfs-scan-node-base.h | 11 + be/src/exec/kudu-scan-node-base.cc | 15 -- be/src/exec/kudu-scan-node-base.h | 13 -- be/src/exec/kudu-scan-node-mt.cc | 4 +- be/src/exec/scan-node.cc | 8 + be/src/exec/scan-node.h | 18 +- be/src/experiments/data-provider-test.cc | 6 +- be/src/experiments/tuple-splitter-test.cc | 8 +- be/src/runtime/buffered-tuple-stream-test.cc | 2 +- .../runtime/bufferpool/buffer-allocator-test.cc | 2 +- be/src/runtime/bufferpool/buffer-pool-test.cc | 2 +- .../bufferpool/reservation-tracker-test.cc | 2 +- be/src/runtime/bufferpool/suballocator-test.cc | 8 +- be/src/runtime/coordinator-backend-state.cc | 8 +- be/src/runtime/coordinator.cc | 4 +- be/src/runtime/coordinator.h | 6 +- be/src/runtime/data-stream-recvr.cc | 32 ++- be/src/runtime/data-stream-recvr.h | 7 +- be/src/runtime/data-stream-test.cc | 7 +- be/src/runtime/fragment-instance-state.cc | 12 +- be/src/runtime/query-state.cc | 2 +- be/src/runtime/runtime-state.cc | 11 +- be/src/runtime/runtime-state.h | 4 +- be/src/runtime/tmp-file-mgr-test.cc | 2 +- be/src/service/client-request-state.cc | 95 ++++---- be/src/service/client-request-state.h | 12 +- be/src/service/impala-server.cc | 12 +- be/src/util/dummy-runtime-profile.h | 6 +- be/src/util/periodic-counter-updater.cc | 15 +- be/src/util/periodic-counter-updater.h | 9 +- be/src/util/runtime-profile-test.cc | 220 +++++++++---------- be/src/util/runtime-profile.cc | 131 +++++++---- be/src/util/runtime-profile.h | 94 +++++--- 43 files changed, 460 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/benchmarks/hash-benchmark.cc ---------------------------------------------------------------------- diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc index dadad25..b183915 100644 --- a/be/src/benchmarks/hash-benchmark.cc +++ b/be/src/benchmarks/hash-benchmark.cc @@ -440,10 +440,10 @@ int main(int argc, char **argv) { MemTracker tracker; MemPool mem_pool(&tracker); - RuntimeProfile int_profile(state->obj_pool(), "IntGen"); - RuntimeProfile mixed_profile(state->obj_pool(), "MixedGen"); - DataProvider int_provider(&mem_pool, &int_profile); - DataProvider mixed_provider(&mem_pool, &mixed_profile); + RuntimeProfile* int_profile = RuntimeProfile::Create(state->obj_pool(), "IntGen"); + RuntimeProfile* mixed_profile = RuntimeProfile::Create(state->obj_pool(), "MixedGen"); + DataProvider int_provider(&mem_pool, int_profile); + DataProvider mixed_provider(&mem_pool, mixed_profile); scoped_ptr<LlvmCodeGen> codegen; status = LlvmCodeGen::CreateImpalaCodegen(state, NULL, "test", &codegen); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index 6f8b156..5503969 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -170,8 +170,8 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool, MemTracker* parent_mem_tracker, const string& id) : state_(state), id_(id), - profile_(pool, "CodeGen"), - mem_tracker_(pool->Add(new MemTracker(&profile_, -1, "CodeGen", parent_mem_tracker))), + profile_(RuntimeProfile::Create(pool, "CodeGen")), + mem_tracker_(pool->Add(new MemTracker(profile_, -1, "CodeGen", parent_mem_tracker))), optimizations_enabled_(false), is_corrupt_(false), is_compiled_(false), @@ -181,21 +181,21 @@ LlvmCodeGen::LlvmCodeGen(RuntimeState* state, ObjectPool* pool, loaded_functions_(IRFunction::FN_END, NULL) { DCHECK(llvm_initialized_) << "Must call LlvmCodeGen::InitializeLlvm first."; - load_module_timer_ = ADD_TIMER(&profile_, "LoadTime"); - prepare_module_timer_ = ADD_TIMER(&profile_, "PrepareTime"); - module_bitcode_size_ = ADD_COUNTER(&profile_, "ModuleBitcodeSize", TUnit::BYTES); - codegen_timer_ = ADD_TIMER(&profile_, "CodegenTime"); - optimization_timer_ = ADD_TIMER(&profile_, "OptimizationTime"); - compile_timer_ = ADD_TIMER(&profile_, "CompileTime"); - num_functions_ = ADD_COUNTER(&profile_, "NumFunctions", TUnit::UNIT); - num_instructions_ = ADD_COUNTER(&profile_, "NumInstructions", TUnit::UNIT); + load_module_timer_ = ADD_TIMER(profile_, "LoadTime"); + prepare_module_timer_ = ADD_TIMER(profile_, "PrepareTime"); + module_bitcode_size_ = ADD_COUNTER(profile_, "ModuleBitcodeSize", TUnit::BYTES); + codegen_timer_ = ADD_TIMER(profile_, "CodegenTime"); + optimization_timer_ = ADD_TIMER(profile_, "OptimizationTime"); + compile_timer_ = ADD_TIMER(profile_, "CompileTime"); + num_functions_ = ADD_COUNTER(profile_, "NumFunctions", TUnit::UNIT); + num_instructions_ = ADD_COUNTER(profile_, "NumInstructions", TUnit::UNIT); } Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool, MemTracker* parent_mem_tracker, const string& file, const string& id, scoped_ptr<LlvmCodeGen>* codegen) { codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id)); - SCOPED_TIMER((*codegen)->profile_.total_time_counter()); + SCOPED_TIMER((*codegen)->profile_->total_time_counter()); unique_ptr<Module> loaded_module; RETURN_IF_ERROR((*codegen)->LoadModuleFromFile(file, &loaded_module)); @@ -206,7 +206,7 @@ Status LlvmCodeGen::CreateFromFile(RuntimeState* state, ObjectPool* pool, Status LlvmCodeGen::CreateFromMemory(RuntimeState* state, ObjectPool* pool, MemTracker* parent_mem_tracker, const string& id, scoped_ptr<LlvmCodeGen>* codegen) { codegen->reset(new LlvmCodeGen(state, pool, parent_mem_tracker, id)); - SCOPED_TIMER((*codegen)->profile_.total_time_counter()); + SCOPED_TIMER((*codegen)->profile_->total_time_counter()); // Select the appropriate IR version. We cannot use LLVM IR with SSE4.2 instructions on // a machine without SSE4.2 support. @@ -276,7 +276,7 @@ Status LlvmCodeGen::LoadModuleFromMemory(unique_ptr<MemoryBuffer> module_ir_buf, Status LlvmCodeGen::LinkModule(const string& file) { if (linked_modules_.find(file) != linked_modules_.end()) return Status::OK(); - SCOPED_TIMER(profile_.total_time_counter()); + SCOPED_TIMER(profile_->total_time_counter()); unique_ptr<Module> new_module; RETURN_IF_ERROR(LoadModuleFromFile(file, &new_module)); @@ -324,7 +324,7 @@ Status LlvmCodeGen::CreateImpalaCodegen(RuntimeState* state, LlvmCodeGen* codegen = codegen_ret->get(); // Parse module for cross compiled functions and types - SCOPED_TIMER(codegen->profile_.total_time_counter()); + SCOPED_TIMER(codegen->profile_->total_time_counter()); SCOPED_TIMER(codegen->prepare_module_timer_); // Get type for StringValue @@ -620,7 +620,7 @@ Status LlvmCodeGen::MaterializeFunctionHelper(Function *fn) { } Status LlvmCodeGen::MaterializeFunction(Function *fn) { - SCOPED_TIMER(profile_.total_time_counter()); + SCOPED_TIMER(profile_->total_time_counter()); SCOPED_TIMER(prepare_module_timer_); return MaterializeFunctionHelper(fn); } @@ -1037,7 +1037,7 @@ Status LlvmCodeGen::FinalizeModule() { } if (is_corrupt_) return Status("Module is corrupt."); - SCOPED_TIMER(profile_.total_time_counter()); + SCOPED_TIMER(profile_->total_time_counter()); // Don't waste time optimizing module if there are no functions to JIT. This can happen // if the codegen object is created but no functions are successfully codegen'd. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/codegen/llvm-codegen.h ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h index 8dce330..8aa9f2b 100644 --- a/be/src/codegen/llvm-codegen.h +++ b/be/src/codegen/llvm-codegen.h @@ -159,7 +159,7 @@ class LlvmCodeGen { /// any other API methods after calling close. void Close(); - RuntimeProfile* runtime_profile() { return &profile_; } + RuntimeProfile* runtime_profile() { return profile_; } RuntimeProfile::Counter* codegen_timer() { return codegen_timer_; } /// Turns on/off optimization passes @@ -669,7 +669,7 @@ class LlvmCodeGen { std::string id_; /// Codegen counters - RuntimeProfile profile_; + RuntimeProfile* const profile_; /// MemTracker used for tracking memory consumed by codegen. Connected to a parent /// MemTracker if one was provided during initialization. Owned by the ObjectPool http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index fe23694..c173ed5 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -175,7 +175,7 @@ string DataSink::OutputDmlStats(const PartitionStatusMap& stats, Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { DCHECK(parent_mem_tracker != NULL); - profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName())); + profile_ = RuntimeProfile::Create(state->obj_pool(), GetName()); const string& name = GetName(); mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker)); expr_mem_tracker_.reset( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/data-source-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index 01e0dbe..78ba492 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -365,15 +365,13 @@ Status DataSourceScanNode::Reset(RuntimeState* state) { void DataSourceScanNode::Close(RuntimeState* state) { if (is_closed()) return; SCOPED_TIMER(runtime_profile_->total_time_counter()); - PeriodicCounterUpdater::StopRateCounter(total_throughput_counter()); - PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); input_batch_.reset(); TCloseParams params; params.__set_scan_handle(scan_handle_); TCloseResult result; Status status = data_source_executor_->Close(params, &result); if (!status.ok()) state->LogError(status.msg()); - ExecNode::Close(state); + ScanNode::Close(state); } void DataSourceScanNode::DebugString(int indentation_level, stringstream* out) const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 94e9ed1..b656d2b 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -130,12 +130,14 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl debug_action_(TDebugAction::WAIT), limit_(tnode.limit), num_rows_returned_(0), + runtime_profile_(RuntimeProfile::Create(pool_, + Substitute("$0 (id=$1)", PrintPlanNodeType(tnode.node_type), id_))), rows_returned_counter_(NULL), rows_returned_rate_(NULL), containing_subplan_(NULL), disable_codegen_(tnode.disable_codegen), is_closed_(false) { - InitRuntimeProfile(PrintPlanNodeType(tnode.node_type)); + runtime_profile_->set_metadata(id_); } ExecNode::~ExecNode() { @@ -149,8 +151,8 @@ Status ExecNode::Init(const TPlanNode& tnode, RuntimeState* state) { Status ExecNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::PREPARE, state)); - DCHECK(runtime_profile_.get() != NULL); - mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(), + DCHECK(runtime_profile_ != NULL); + mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(), state->instance_mem_tracker())); expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false)); expr_mem_pool_.reset(new MemPool(expr_mem_tracker_.get())); @@ -462,13 +464,6 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) { CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes); } -void ExecNode::InitRuntimeProfile(const string& name) { - stringstream ss; - ss << name << " (id=" << id_ << ")"; - runtime_profile_.reset(new RuntimeProfile(pool_, ss.str())); - runtime_profile_->set_metadata(id_); -} - Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) { DCHECK_EQ(debug_phase_, phase); if (debug_action_ == TDebugAction::FAIL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 2f3f714..7cba6ac 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -208,7 +208,7 @@ class ExecNode { int64_t limit() const { return limit_; } bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; } - RuntimeProfile* runtime_profile() { return runtime_profile_.get(); } + RuntimeProfile* runtime_profile() { return runtime_profile_; } MemTracker* mem_tracker() { return mem_tracker_.get(); } MemTracker* expr_mem_tracker() { return expr_mem_tracker_.get(); } MemPool* expr_mem_pool() { return expr_mem_pool_.get(); } @@ -310,7 +310,8 @@ class ExecNode { int64_t limit_; // -1: no limit int64_t num_rows_returned_; - boost::scoped_ptr<RuntimeProfile> runtime_profile_; + /// Runtime profile for this node. Owned by the QueryState's ObjectPool. + RuntimeProfile* const runtime_profile_; RuntimeProfile::Counter* rows_returned_counter_; RuntimeProfile::Counter* rows_returned_rate_; @@ -354,8 +355,6 @@ class ExecNode { virtual bool IsScanNode() const { return false; } - void InitRuntimeProfile(const std::string& name); - /// Executes 'debug_action_' if 'phase' matches 'debug_phase_'. /// 'phase' must not be INVALID. Status ExecDebugAction( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index 816e6cf..0b99cbd 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -233,7 +233,7 @@ class HashTableTest : public testing::Test { int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100, int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) { BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool(); - RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht")); + RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "ht"); // Set up memory tracking for the hash table. MemTracker* client_tracker = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hbase-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc index e783731..a74d4b3 100644 --- a/be/src/exec/hbase-scan-node.cc +++ b/be/src/exec/hbase-scan-node.cc @@ -279,7 +279,7 @@ void HBaseScanNode::Close(RuntimeState* state) { JNIEnv* env = getJNIEnv(); hbase_scanner_->Close(env); } - ExecNode::Close(state); + ScanNode::Close(state); } void HBaseScanNode::DebugString(int indentation_level, stringstream* out) const { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index ca71201..b5169a8 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -97,7 +97,8 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, thrift_dict_filter_conjuncts_map_( tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ? &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr), - disks_accessed_bitmap_(TUnit::UNIT, 0) { + disks_accessed_bitmap_(TUnit::UNIT, 0), + active_hdfs_read_thread_counter_(TUnit::UNIT, 0) { } HdfsScanNodeBase::~HdfsScanNodeBase() { @@ -142,8 +143,8 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false); string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id, PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); - RuntimeProfile* profile = state->obj_pool()->Add( - new RuntimeProfile(state->obj_pool(), filter_profile_title)); + RuntimeProfile* profile = + RuntimeProfile::Create(state->obj_pool(), filter_profile_title); runtime_profile_->AddChild(profile); filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, target.is_bound_by_partition_columns)); @@ -441,12 +442,8 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter( "MaxCompressedTextFileLength", TUnit::BYTES); - for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) { - hdfs_read_thread_concurrency_bucket_.push_back( - pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); - } - runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_, - &hdfs_read_thread_concurrency_bucket_); + hdfs_read_thread_concurrency_bucket_ = runtime_profile()->AddBucketingCounters( + &active_hdfs_read_thread_counter_, state->io_mgr()->num_total_disks() + 1); counters_running_ = true; @@ -851,18 +848,13 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { if (!counters_running_) return; counters_running_ = false; - PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); - PeriodicCounterUpdater::StopRateCounter(total_throughput_counter()); - PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_); - PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_); - PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_, - true); + runtime_profile_->StopPeriodicCounters(); // Output hdfs read thread concurrency into info string stringstream ss; - for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) { + for (int i = 0; i < hdfs_read_thread_concurrency_bucket_->size(); ++i) { ss << i << ":" << setprecision(4) - << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% "; + << (*hdfs_read_thread_concurrency_bucket_)[i]->double_value() << "% "; } runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index e1c431f..e33de5a 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -469,6 +469,17 @@ class HdfsScanNodeBase : public ScanNode { /// Total number of file handle opens where the file handle was not in the cache RuntimeProfile::Counter* cached_file_handles_miss_count_ = nullptr; + /// The number of active hdfs reading threads reading for this node. + RuntimeProfile::Counter active_hdfs_read_thread_counter_; + + /// Average number of active hdfs reading threads + /// This should be created in Open() and stopped when all the scanner threads are done. + RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_ = nullptr; + + /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample + /// taken where there are i concurrent hdfs read thread running. Created in Open(). + std::vector<RuntimeProfile::Counter*>* hdfs_read_thread_concurrency_bucket_ = nullptr; + /// Pool for allocating some amounts of memory that is shared between scanners. /// e.g. partition key tuple and their string buffers boost::scoped_ptr<MemPool> scan_node_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index d587660..526374c 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -52,7 +52,6 @@ KuduScanNodeBase::KuduScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, : ScanNode(pool, tnode, descs), tuple_id_(tnode.kudu_scan_node.tuple_id), client_(nullptr), - counters_running_(false), next_scan_token_idx_(0) { DCHECK(KuduIsAvailable()); } @@ -69,7 +68,6 @@ Status KuduScanNodeBase::Prepare(RuntimeState* state) { ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT); kudu_round_trips_ = ADD_COUNTER(runtime_profile(), KUDU_ROUND_TRIPS, TUnit::UNIT); kudu_remote_tokens_ = ADD_COUNTER(runtime_profile(), KUDU_REMOTE_TOKENS, TUnit::UNIT); - counters_running_ = true; DCHECK(state->desc_tbl().GetTupleDescriptor(tuple_id_) != NULL); tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); @@ -108,12 +106,6 @@ Status KuduScanNodeBase::Open(RuntimeState* state) { return Status::OK(); } -void KuduScanNodeBase::Close(RuntimeState* state) { - if (is_closed()) return; - StopAndFinalizeCounters(); - ExecNode::Close(state); -} - void KuduScanNodeBase::DebugString(int indentation_level, stringstream* out) const { string indent(indentation_level * 2, ' '); *out << indent << "KuduScanNode(tupleid=" << tuple_id_ << ")"; @@ -129,12 +121,5 @@ const string* KuduScanNodeBase::GetNextScanToken() { return token; } -void KuduScanNodeBase::StopAndFinalizeCounters() { - if (!counters_running_) return; - counters_running_ = false; - - PeriodicCounterUpdater::StopRateCounter(total_throughput_counter()); - PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); -} } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 49af13c..08289e1 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -42,7 +42,6 @@ class KuduScanNodeBase : public ScanNode { virtual Status Prepare(RuntimeState* state); virtual Status Open(RuntimeState* state); virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) = 0; - virtual void Close(RuntimeState* state); protected: virtual void DebugString(int indentation_level, std::stringstream* out) const; @@ -59,14 +58,6 @@ class KuduScanNodeBase : public ScanNode { RuntimeState* runtime_state_; - /// Stops periodic counters and aggregates counter values for the entire scan node. - /// This should be called as soon as the scan node is complete to get the most accurate - /// counter values. - /// This can be called multiple times, subsequent calls will be ignored. - /// This must be called on Close() to unregister counters. - /// Scan nodes with a RowBatch queue may have to synchronize calls to this function. - void StopAndFinalizeCounters(); - private: friend class KuduScanner; @@ -83,10 +74,6 @@ class KuduScanNodeBase : public ScanNode { /// Kudu table reference. Shared between scanner threads for KuduScanNode. kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_; - /// If true, counters are actively running and need to be reported in the runtime - /// profile. - bool counters_running_; - /// Set of scan tokens to be deserialized into Kudu scanners. std::vector<std::string> scan_tokens_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/kudu-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc index 8723daa..2cb7619 100644 --- a/be/src/exec/kudu-scan-node-mt.cc +++ b/be/src/exec/kudu-scan-node-mt.cc @@ -62,7 +62,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e if (scan_token_ == nullptr) { scan_token_ = GetNextScanToken(); if (scan_token_ == nullptr) { - StopAndFinalizeCounters(); + runtime_profile_->StopPeriodicCounters(); scanner_->Close(); scanner_.reset(); *eos = true; @@ -85,7 +85,7 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e row_batch->set_num_rows(row_batch->num_rows() - num_rows_over); num_rows_returned_ -= num_rows_over; scan_token_ = nullptr; - StopAndFinalizeCounters(); + runtime_profile_->StopPeriodicCounters(); scanner_->Close(); scanner_.reset(); *eos = true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index 0df0c3f..d09bb6b 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -68,4 +68,12 @@ Status ScanNode::Prepare(RuntimeState* state) { return Status::OK(); } +void ScanNode::Close(RuntimeState* state) { + if (is_closed()) return; + // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_' + // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters. + runtime_profile_->StopPeriodicCounters(); + ExecNode::Close(state); +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/exec/scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h index 0f73c2b..4b11361 100644 --- a/be/src/exec/scan-node.h +++ b/be/src/exec/scan-node.h @@ -82,11 +82,14 @@ class ScanNode : public ExecNode { ScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), scan_range_params_(NULL), - active_scanner_thread_counter_(TUnit::UNIT, 0), - active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {} + active_scanner_thread_counter_(TUnit::UNIT, 0) {} virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; + /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can + /// start periodic counters and rely on this function stopping them. + virtual void Close(RuntimeState* state); + /// This should be called before Prepare(), and the argument must be not destroyed until /// after Prepare(). void SetScanRanges(const std::vector<TScanRangeParams>& scan_range_params) { @@ -172,18 +175,7 @@ class ScanNode : public ExecNode { /// This should be created in Open and stopped when all the scanner threads are done. RuntimeProfile::Counter* average_scanner_thread_concurrency_; - /// The number of active hdfs reading threads reading for this node. - RuntimeProfile::Counter active_hdfs_read_thread_counter_; - - /// Average number of active hdfs reading threads - /// This should be created in Open and stopped when all the scanner threads are done. - RuntimeProfile::Counter* average_hdfs_read_thread_concurrency_; - RuntimeProfile::Counter* num_scanner_threads_started_counter_; - - /// HDFS read thread concurrency bucket: bucket[i] refers to the number of sample - /// taken where there are i concurrent hdfs read thread running - std::vector<RuntimeProfile::Counter*> hdfs_read_thread_concurrency_bucket_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/data-provider-test.cc ---------------------------------------------------------------------- diff --git a/be/src/experiments/data-provider-test.cc b/be/src/experiments/data-provider-test.cc index 9f9e0cb..3c5a92f 100644 --- a/be/src/experiments/data-provider-test.cc +++ b/be/src/experiments/data-provider-test.cc @@ -55,11 +55,11 @@ int main(int argc, char **argv) { cols.push_back(DataProvider::ColDesc::Create<StringValue>(min_str, max_str)); ObjectPool obj_pool; - RuntimeProfile profile(&obj_pool, "DataGenTest"); + RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "DataGenTest"); MemTracker tracker; MemPool pool(&tracker); - DataProvider provider(&pool, &profile); + DataProvider provider(&pool, profile); provider.Reset(20, 2, cols); int rows; void* data; @@ -70,7 +70,7 @@ int main(int argc, char **argv) { provider.Print(&cout, reinterpret_cast<char*>(data), rows); } - profile.PrettyPrint(&cout); + profile->PrettyPrint(&cout); cout << endl << "Done." << endl; return 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/experiments/tuple-splitter-test.cc ---------------------------------------------------------------------- diff --git a/be/src/experiments/tuple-splitter-test.cc b/be/src/experiments/tuple-splitter-test.cc index 87a52bd..7d68f8e 100644 --- a/be/src/experiments/tuple-splitter-test.cc +++ b/be/src/experiments/tuple-splitter-test.cc @@ -380,14 +380,14 @@ int main(int argc, char **argv) { MemTracker tracker; MemPool pool(&tracker); ObjectPool obj_pool; - RuntimeProfile profile(&obj_pool, "PartitioningTest"); + RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "PartitioningTest"); - DataProvider provider(&pool, &profile); + DataProvider provider(&pool, profile); provider.Reset(50*1024*1024, 1024, cols); //provider.Reset(100*1024, 1024, cols); //provider.Reset(100, 1024, cols); - DataPartitioner partitioner(&pool, &profile, provider.row_size(), 0); + DataPartitioner partitioner(&pool, profile, provider.row_size(), 0); cout << "Begin processing: " << provider.total_rows() << endl; int rows; @@ -437,7 +437,7 @@ int main(int argc, char **argv) { cout << "Largest Partition: " << largest_partition << endl;; cout << endl; - profile.PrettyPrint(&cout); + profile->PrettyPrint(&cout); LOG(ERROR) << "Done."; return 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 0b89498..08ce7c3 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -132,7 +132,7 @@ class SimpleTupleStreamTest : public testing::Test { ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_)); query_state_ = runtime_state_->query_state(); - RuntimeProfile* client_profile = pool_.Add(new RuntimeProfile(&pool_, "client")); + RuntimeProfile* client_profile = RuntimeProfile::Create(&pool_, "client"); MemTracker* client_tracker = pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker())); ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient("", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-allocator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc index 6427648..21a9c08 100644 --- a/be/src/runtime/bufferpool/buffer-allocator-test.cc +++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc @@ -42,7 +42,7 @@ class BufferAllocatorTest : public ::testing::Test { dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0)); dummy_reservation_.InitRootTracker(nullptr, 0); ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0, - obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_)); + RuntimeProfile::Create(&obj_pool_, ""), &dummy_client_)); } virtual void TearDown() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 06ff827..720dc13 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -139,7 +139,7 @@ class BufferPoolTest : public ::testing::Test { } RuntimeProfile* NewProfile() { - return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + return RuntimeProfile::Create(&obj_pool_, "test profile"); } /// Create a new file group with the default configs. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/reservation-tracker-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc index 0d57488..3fc0e0b 100644 --- a/be/src/runtime/bufferpool/reservation-tracker-test.cc +++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc @@ -44,7 +44,7 @@ class ReservationTrackerTest : public ::testing::Test { protected: RuntimeProfile* NewProfile() { - return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile")); + return RuntimeProfile::Create(&obj_pool_, "test profile"); } ObjectPool obj_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/bufferpool/suballocator-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc index 32acfaf..6cd53fb 100644 --- a/be/src/runtime/bufferpool/suballocator-test.cc +++ b/be/src/runtime/bufferpool/suballocator-test.cc @@ -45,7 +45,7 @@ class SuballocatorTest : public ::testing::Test { public: virtual void SetUp() override { RandTestUtil::SeedRng("SUBALLOCATOR_TEST_SEED", &rng_); - profile_.reset(new RuntimeProfile(&obj_pool_, "test profile")); + profile_ = RuntimeProfile::Create(&obj_pool_, "test profile"); } virtual void TearDown() override { @@ -55,7 +55,6 @@ class SuballocatorTest : public ::testing::Test { clients_.clear(); buffer_pool_.reset(); global_reservation_.Close(); - profile_.reset(); obj_pool_.Clear(); } @@ -78,7 +77,7 @@ class SuballocatorTest : public ::testing::Test { clients_.push_back(make_unique<BufferPool::ClientHandle>()); *client = clients_.back().get(); ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL, - numeric_limits<int64_t>::max(), profile(), *client)); + numeric_limits<int64_t>::max(), profile_, *client)); } /// Assert that the memory for all of the suballocations is writable and disjoint by @@ -97,7 +96,6 @@ class SuballocatorTest : public ::testing::Test { EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString(); } - RuntimeProfile* profile() { return profile_.get(); } BufferPool* buffer_pool() { return buffer_pool_.get(); } /// Pool for objects with per-test lifetime. Cleared after every test. @@ -114,7 +112,7 @@ class SuballocatorTest : public ::testing::Test { vector<unique_ptr<BufferPool::ClientHandle>> clients_; /// Global profile - recreated for every test. - scoped_ptr<RuntimeProfile> profile_; + RuntimeProfile* profile_; /// Per-test random number generator. Seeded before every test. mt19937 rng_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator-backend-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc index 1b7fd20..0ee4bd7 100644 --- a/be/src/runtime/coordinator-backend-state.cc +++ b/be/src/runtime/coordinator-backend-state.cc @@ -433,7 +433,7 @@ Coordinator::BackendState::InstanceStats::InstanceStats( total_ranges_complete_(0) { const string& profile_name = Substitute("Instance $0 (host=$1)", PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host)); - profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name)); + profile_ = RuntimeProfile::Create(obj_pool, profile_name); fragment_stats->root_profile()->AddChild(profile_); // add total split size to fragment_stats->bytes_assigned() @@ -514,10 +514,8 @@ void Coordinator::BackendState::InstanceStats::Update( Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name, const string& root_profile_name, int num_instances, ObjectPool* obj_pool) - : avg_profile_( - obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))), - root_profile_( - obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))), + : avg_profile_(RuntimeProfile::Create(obj_pool, avg_profile_name, true)), + root_profile_(RuntimeProfile::Create(obj_pool, root_profile_name)), num_instances_(num_instances) { } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index c8df1f5..e022a21 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -123,8 +123,8 @@ Status Coordinator::Exec() { query_ctx_.__set_desc_tbl(request.desc_tbl); query_ctx_.__set_request_pool(schedule_.request_pool()); - query_profile_.reset( - new RuntimeProfile(obj_pool(), "Execution Profile " + PrintId(query_id()))); + query_profile_ = + RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id())); finalization_timer_ = ADD_TIMER(query_profile_, "FinalizationTimer"); filter_updates_received_ = ADD_COUNTER(query_profile_, "FiltersReceived", TUnit::UNIT); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index e67ef13..5802c83 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -150,7 +150,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Get cumulative profile aggregated over all fragments of the query. /// This is a snapshot of the current state of execution and will change in /// the future if not all fragments have finished execution. - RuntimeProfile* query_profile() const { return query_profile_.get(); } + RuntimeProfile* query_profile() const { return query_profile_; } const TUniqueId& query_id() const; @@ -278,8 +278,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save ExecSummary exec_summary_; - /// Aggregate counters for the entire query. - boost::scoped_ptr<RuntimeProfile> query_profile_; + /// Aggregate counters for the entire query. Lives in 'obj_pool_'. + RuntimeProfile* query_profile_ = nullptr; /// Protects all fields below. This is held while making RPCs, so this lock should /// only be acquired if the acquiring thread is prepared to wait for a significant http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc index 35076f5..d8150eb 100644 --- a/be/src/runtime/data-stream-recvr.cc +++ b/be/src/runtime/data-stream-recvr.cc @@ -37,7 +37,7 @@ namespace impala { // rows from all senders are placed in the same queue. class DataStreamRecvr::SenderQueue { public: - SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, RuntimeProfile* profile); + SenderQueue(DataStreamRecvr* parent_recvr, int num_senders); // Return the next batch from this sender queue. Sets the returned batch in cur_batch_. // A returned batch that is not filled to capacity does *not* indicate @@ -102,8 +102,7 @@ class DataStreamRecvr::SenderQueue { bool received_first_batch_; }; -DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders, - RuntimeProfile* profile) +DataStreamRecvr::SenderQueue::SenderQueue(DataStreamRecvr* parent_recvr, int num_senders) : recvr_(parent_recvr), is_cancelled_(false), num_remaining_senders_(num_senders), @@ -242,8 +241,6 @@ void DataStreamRecvr::SenderQueue::Cancel() { // notice that the stream is cancelled and handle it. data_arrival_cv_.notify_all(); data_removal__cv_.notify_all(); - PeriodicCounterUpdater::StopTimeSeriesCounter( - recvr_->bytes_received_time_series_counter_); } void DataStreamRecvr::SenderQueue::Close() { @@ -286,7 +283,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) { DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker, const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t total_buffer_limit, - RuntimeProfile* profile) + RuntimeProfile* parent_profile) : mgr_(stream_mgr), fragment_instance_id_(fragment_instance_id), dest_node_id_(dest_node_id), @@ -294,30 +291,28 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t row_desc_(row_desc), is_merging_(is_merging), num_buffered_bytes_(0), - profile_(profile) { + profile_(parent_profile->CreateChild("DataStreamReceiver")) { // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; sender_queues_.reserve(num_queues); int num_sender_per_queue = is_merging ? 1 : num_senders; for (int i = 0; i < num_queues; ++i) { SenderQueue* queue = sender_queue_pool_.Add(new SenderQueue(this, - num_sender_per_queue, profile)); + num_sender_per_queue)); sender_queues_.push_back(queue); } - RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver"); - mem_tracker_.reset( - new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker)); + mem_tracker_.reset(new MemTracker(profile_, -1, "DataStreamRecvr", parent_tracker)); // Initialize the counters - bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", TUnit::BYTES); + bytes_received_counter_ = ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES); bytes_received_time_series_counter_ = - ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", bytes_received_counter_); - deserialize_row_batch_timer_ = ADD_TIMER(child_profile, "DeserializeRowBatchTimer"); - buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer"); - buffer_full_total_timer_ = ADD_TIMER(child_profile, "SendersBlockedTotalTimer(*)"); - data_arrival_timer_ = child_profile->inactive_timer(); - first_batch_wait_total_timer_ = ADD_TIMER(child_profile, "FirstBatchArrivalWaitTime"); + ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", bytes_received_counter_); + deserialize_row_batch_timer_ = ADD_TIMER(profile_, "DeserializeRowBatchTimer"); + buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer"); + buffer_full_total_timer_ = ADD_TIMER(profile_, "SendersBlockedTotalTimer(*)"); + data_arrival_timer_ = profile_->inactive_timer(); + first_batch_wait_total_timer_ = ADD_TIMER(profile_, "FirstBatchArrivalWaitTime"); } Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) { @@ -354,6 +349,7 @@ void DataStreamRecvr::Close() { } merger_.reset(); mem_tracker_->Close(); + profile_->StopPeriodicCounters(); } DataStreamRecvr::~DataStreamRecvr() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-recvr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-recvr.h b/be/src/runtime/data-stream-recvr.h index fad588d..9545f82 100644 --- a/be/src/runtime/data-stream-recvr.h +++ b/be/src/runtime/data-stream-recvr.h @@ -105,7 +105,7 @@ class DataStreamRecvr : public DataStreamRecvrBase { DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker, const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, bool is_merging, - int64_t total_buffer_limit, RuntimeProfile* profile); + int64_t total_buffer_limit, RuntimeProfile* parent_profile); /// Add a new batch of rows to the appropriate sender queue, blocking if the queue is /// full. Called from DataStreamMgr. @@ -161,8 +161,9 @@ class DataStreamRecvr : public DataStreamRecvrBase { /// Pool of sender queues. ObjectPool sender_queue_pool_; - /// Runtime profile storing the counters below. - RuntimeProfile* profile_; + /// Runtime profile storing the counters below. Child of 'parent_profile' passed into + /// constructor. + RuntimeProfile* const profile_; /// Number of bytes received RuntimeProfile::Counter* bytes_received_counter_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index 5ea6756..8e85894 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -337,8 +337,7 @@ class DataStreamTest : public testing::Test { void StartReceiver(TPartitionType::type stream_type, int num_senders, int receiver_num, int buffer_size, bool is_merging, TUniqueId* out_id = nullptr) { VLOG_QUERY << "start receiver"; - RuntimeProfile* profile = - obj_pool_.Add(new RuntimeProfile(&obj_pool_, "TestReceiver")); + RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver"); TUniqueId instance_id; GetNextInstanceId(&instance_id); receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num)); @@ -607,13 +606,13 @@ TEST_F(DataStreamTest, BasicTest) { // TODO: Make lifecycle requirements more explicit. TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) { scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_)); - scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver")); + RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver"); // Start just one receiver. TUniqueId instance_id; GetNextInstanceId(&instance_id); shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr( - runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), + runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile, false); // Perform tear down, but keep a reference to the receiver so that it is deleted last http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 74f5495..a7d3c86 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -123,8 +123,8 @@ Status FragmentInstanceState::Prepare() { // total_time_counter() is in the runtime_state_ so start it up now. SCOPED_TIMER(profile()->total_time_counter()); - timings_profile_ = obj_pool()->Add( - new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings")); + timings_profile_ = + RuntimeProfile::Create(obj_pool(), "Fragment Instance Lifecycle Timings"); profile()->AddChild(timings_profile_); SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME)); @@ -289,12 +289,8 @@ void FragmentInstanceState::Close() { // guard against partially-finished Prepare() if (sink_ != nullptr) sink_->Close(runtime_state_); - // disconnect mem_usage_sampled_counter_ from the periodic updater before - // RuntimeState::ReleaseResources(), it references the instance memtracker - if (mem_usage_sampled_counter_ != nullptr) { - PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_); - mem_usage_sampled_counter_ = nullptr; - } + // Stop updating profile counters in background. + profile()->StopPeriodicCounters(); // We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_ // in runtime_state_->ReleaseResources(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 4c5eb17..ea24411 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -169,7 +169,7 @@ Status QueryState::InitBufferPoolState() { // TODO: once there's a mechanism for reporting non-fragment-local profiles, // should make sure to report this profile so it's not going into a black hole. - RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy")); + RuntimeProfile* dummy_profile = RuntimeProfile::Create(&obj_pool_, "dummy"); // Only create file group if spilling is enabled. if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) { file_group_ = obj_pool_.Add( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 8f48439..0565cf5 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -69,7 +69,8 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag utc_timestamp_(new TimestampValue(TimestampValue::Parse( query_state->query_ctx().utc_timestamp_string))), exec_env_(exec_env), - profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), + profile_(RuntimeProfile::Create( + obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id))), instance_buffer_reservation_(new ReservationTracker) { Init(); } @@ -83,7 +84,7 @@ RuntimeState::RuntimeState( now_(new TimestampValue(TimestampValue::Parse(qctx.now_string))), utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))), exec_env_(exec_env), - profile_(obj_pool(), "<unnamed>") { + profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) { if (query_ctx().request_pool.empty()) { const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool"; } @@ -96,7 +97,7 @@ RuntimeState::~RuntimeState() { } void RuntimeState::Init() { - SCOPED_TIMER(profile_.total_time_counter()); + SCOPED_TIMER(profile_->total_time_counter()); // Register with the thread mgr resource_pool_ = exec_env_->thread_mgr()->RegisterPool(); @@ -111,7 +112,7 @@ void RuntimeState::Init() { runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker())); if (instance_buffer_reservation_ != nullptr) { - instance_buffer_reservation_->InitChildTracker(&profile_, + instance_buffer_reservation_->InitChildTracker(profile_, query_state_->buffer_reservation(), instance_mem_tracker_.get(), numeric_limits<int64_t>::max()); } @@ -127,7 +128,7 @@ Status RuntimeState::CreateCodegen() { RETURN_IF_ERROR(LlvmCodeGen::CreateImpalaCodegen(this, instance_mem_tracker_.get(), PrintId(fragment_instance_id()), &codegen_)); codegen_->EnableOptimizations(true); - profile_.AddChild(codegen_->runtime_profile()); + profile_->AddChild(codegen_->runtime_profile()); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 49ea9a6..3da9f8c 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -149,7 +149,7 @@ class RuntimeState { PartitionStatusMap* per_partition_status() { return &per_partition_status_; } /// Returns runtime state profile - RuntimeProfile* runtime_profile() { return &profile_; } + RuntimeProfile* runtime_profile() { return profile_; } /// Returns the LlvmCodeGen object for this fragment instance. LlvmCodeGen* codegen() { return codegen_.get(); } @@ -354,7 +354,7 @@ class RuntimeState { /// Records summary statistics for the results of inserts into Hdfs partitions. PartitionStatusMap per_partition_status_; - RuntimeProfile profile_; + RuntimeProfile* const profile_; /// Total time waiting in storage (across all threads) RuntimeProfile::Counter* total_storage_wait_timer_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/runtime/tmp-file-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc index 1a5eb58..5f482ba 100644 --- a/be/src/runtime/tmp-file-mgr-test.cc +++ b/be/src/runtime/tmp-file-mgr-test.cc @@ -51,7 +51,7 @@ class TmpFileMgrTest : public ::testing::Test { public: virtual void SetUp() { metrics_.reset(new MetricGroup("tmp-file-mgr-test")); - profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test")); + profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test"); test_env_.reset(new TestEnv); ASSERT_OK(test_env_->Init()); cb_counter_ = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.cc ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index 2a5b379..ef6a69d 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -74,9 +74,10 @@ ClientRequestState::ClientRequestState( schedule_(NULL), coord_(NULL), result_cache_max_size_(-1), - profile_(&profile_pool_, "Query"), // assign name w/ id after planning - server_profile_(&profile_pool_, "ImpalaServer"), - summary_profile_(&profile_pool_, "Summary"), + // Profile is assigned name w/ id after planning + profile_(RuntimeProfile::Create(&profile_pool_, "Query")), + server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")), + summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")), is_cancelled_(false), eos_(false), query_state_(beeswax::QueryState::CREATED), @@ -89,36 +90,36 @@ ClientRequestState::ClientRequestState( parent_server_(server), start_time_(TimestampValue::LocalTime()) { #ifndef NDEBUG - profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " + profile_->AddInfoString("DEBUG MODE WARNING", "Query profile created while running a " "DEBUG build of Impala. Use RELEASE builds to measure query performance."); #endif - row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer"); - client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer"); - query_events_ = summary_profile_.AddEventSequence("Query Timeline"); + row_materialization_timer_ = ADD_TIMER(server_profile_, "RowMaterializationTimer"); + client_wait_timer_ = ADD_TIMER(server_profile_, "ClientFetchWaitTimer"); + query_events_ = summary_profile_->AddEventSequence("Query Timeline"); query_events_->Start(); - profile_.AddChild(&summary_profile_); + profile_->AddChild(summary_profile_); - profile_.set_name("Query (id=" + PrintId(query_id()) + ")"); - summary_profile_.AddInfoString("Session ID", PrintId(session_id())); - summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type())); + profile_->set_name("Query (id=" + PrintId(query_id()) + ")"); + summary_profile_->AddInfoString("Session ID", PrintId(session_id())); + summary_profile_->AddInfoString("Session Type", PrintTSessionType(session_type())); if (session_type() == TSessionType::HIVESERVER2) { - summary_profile_.AddInfoString("HiveServer2 Protocol Version", + summary_profile_->AddInfoString("HiveServer2 Protocol Version", Substitute("V$0", 1 + session->hs2_version)); } - summary_profile_.AddInfoString("Start Time", start_time().ToString()); - summary_profile_.AddInfoString("End Time", ""); - summary_profile_.AddInfoString("Query Type", "N/A"); - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); - summary_profile_.AddInfoString("Query Status", "OK"); - summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true)); - summary_profile_.AddInfoString("User", effective_user()); - summary_profile_.AddInfoString("Connected User", connected_user()); - summary_profile_.AddInfoString("Delegated User", do_as_user()); - summary_profile_.AddInfoString("Network Address", + summary_profile_->AddInfoString("Start Time", start_time().ToString()); + summary_profile_->AddInfoString("End Time", ""); + summary_profile_->AddInfoString("Query Type", "N/A"); + summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_)); + summary_profile_->AddInfoString("Query Status", "OK"); + summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true)); + summary_profile_->AddInfoString("User", effective_user()); + summary_profile_->AddInfoString("Connected User", connected_user()); + summary_profile_->AddInfoString("Delegated User", do_as_user()); + summary_profile_->AddInfoString("Network Address", lexical_cast<string>(session_->network_address)); - summary_profile_.AddInfoString("Default Db", default_db()); - summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt); - summary_profile_.AddInfoString("Coordinator", + summary_profile_->AddInfoString("Default Db", default_db()); + summary_profile_->AddInfoString("Sql Statement", query_ctx_.client_request.stmt); + summary_profile_->AddInfoString("Coordinator", TNetworkAddressToString(exec_env->backend_address())); } @@ -144,11 +145,11 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) { MarkActive(); exec_request_ = *exec_request; - profile_.AddChild(&server_profile_); - summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type())); - summary_profile_.AddInfoString("Query Options (set by configuration)", + profile_->AddChild(server_profile_); + summary_profile_->AddInfoString("Query Type", PrintTStmtType(stmt_type())); + summary_profile_->AddInfoString("Query Options (set by configuration)", DebugQueryOptions(query_ctx_.client_request.query_options)); - summary_profile_.AddInfoString("Query Options (set by configuration and planner)", + summary_profile_->AddInfoString("Query Options (set by configuration and planner)", DebugQueryOptions(exec_request_.query_options)); switch (exec_request->stmt_type) { @@ -182,7 +183,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) { reset_req.reset_metadata_params.__set_table_name( exec_request_.load_data_request.table_name); catalog_op_executor_.reset( - new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); + new CatalogOpExecutor(exec_env_, frontend_, server_profile_)); RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req)); RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult( *catalog_op_executor_->update_catalog_result(), @@ -298,7 +299,7 @@ Status ClientRequestState::ExecLocalCatalogOp( // Verify the user has privileges to perform this operation by checking against // the Sentry Service (via the Catalog Server). catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); + server_profile_)); TSentryAdminCheckRequest req; req.__set_header(TCatalogServiceRequestHeader()); @@ -319,7 +320,7 @@ Status ClientRequestState::ExecLocalCatalogOp( // Verify the user has privileges to perform this operation by checking against // the Sentry Service (via the Catalog Server). catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); + server_profile_)); TSentryAdminCheckRequest req; req.__set_header(TCatalogServiceRequestHeader()); @@ -392,13 +393,13 @@ Status ClientRequestState::ExecQueryOrDmlRequest( plan_ss << "\n----------------\n" << query_exec_request.query_plan << "----------------"; - summary_profile_.AddInfoString("Plan", plan_ss.str()); + summary_profile_->AddInfoString("Plan", plan_ss.str()); } // Add info strings consumed by CM: Estimated mem and tables missing stats. if (query_exec_request.__isset.per_host_mem_estimate) { stringstream ss; ss << query_exec_request.per_host_mem_estimate; - summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str()); + summary_profile_->AddInfoString(PER_HOST_MEM_KEY, ss.str()); } if (!query_exec_request.query_ctx.__isset.parent_query_id && query_exec_request.query_ctx.__isset.tables_missing_stats && @@ -409,7 +410,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest( if (i != 0) ss << ","; ss << tbls[i].db_name << "." << tbls[i].table_name; } - summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); } if (!query_exec_request.query_ctx.__isset.parent_query_id && @@ -422,7 +423,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest( if (i != 0) ss << ","; ss << tbls[i].db_name << "." << tbls[i].table_name; } - summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); } if (query_exec_request.query_ctx.__isset.tables_missing_diskids && @@ -434,7 +435,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest( if (i != 0) ss << ","; ss << tbls[i].db_name << "." << tbls[i].table_name; } - summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); + summary_profile_->AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str()); } { @@ -442,7 +443,7 @@ Status ClientRequestState::ExecQueryOrDmlRequest( // Don't start executing the query if Cancel() was called concurrently with Exec(). if (is_cancelled_) return Status::CANCELLED; schedule_.reset(new QuerySchedule(query_id(), query_exec_request, - exec_request_.query_options, &summary_profile_, query_events_)); + exec_request_.query_options, summary_profile_, query_events_)); } Status status = exec_env_->scheduler()->Schedule(schedule_.get()); { @@ -465,14 +466,14 @@ Status ClientRequestState::ExecQueryOrDmlRequest( RETURN_IF_ERROR(UpdateQueryStatus(status)); } - profile_.AddChild(coord_->query_profile()); + profile_->AddChild(coord_->query_profile()); return Status::OK(); } Status ClientRequestState::ExecDdlRequest() { string op_type = catalog_op_type() == TCatalogOpType::DDL ? PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type()); - summary_profile_.AddInfoString("DDL Type", op_type); + summary_profile_->AddInfoString("DDL Type", op_type); if (catalog_op_type() != TCatalogOpType::DDL && catalog_op_type() != TCatalogOpType::RESET_METADATA) { @@ -502,7 +503,7 @@ Status ClientRequestState::ExecDdlRequest() { } catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_, - &server_profile_)); + server_profile_)); Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request); { lock_guard<mutex> l(lock_); @@ -564,7 +565,7 @@ void ClientRequestState::Done() { unique_lock<mutex> l(lock_); end_time_ = TimestampValue::LocalTime(); - summary_profile_.AddInfoString("End Time", end_time().ToString()); + summary_profile_->AddInfoString("End Time", end_time().ToString()); query_events_->MarkEvent("Unregister query"); // Update result set cache metrics, and update mem limit accounting before tearing @@ -586,7 +587,7 @@ void ClientRequestState::Done() { Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) { TResultSet metadata_op_result; // Like the other Exec(), fill out as much profile information as we're able to. - summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL)); + summary_profile_->AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL)); RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request, &metadata_op_result)); result_metadata_ = metadata_op_result.schema; @@ -722,7 +723,7 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) { if (!status.ok() && query_status_.ok()) { UpdateQueryState(beeswax::QueryState::EXCEPTION); query_status_ = status; - summary_profile_.AddInfoString("Query Status", query_status_.GetDetail()); + summary_profile_->AddInfoString("Query Status", query_status_.GetDetail()); } return status; @@ -898,7 +899,7 @@ Status ClientRequestState::UpdateCatalog() { } query_events_->MarkEvent("DML data written"); - SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer")); + SCOPED_TIMER(ADD_TIMER(server_profile_, "MetastoreUpdateTimer")); TQueryExecRequest query_exec_request = exec_request().query_exec_request; if (query_exec_request.__isset.finalize_params) { @@ -1031,7 +1032,7 @@ Status ClientRequestState::UpdateTableAndColumnStats( DCHECK_GE(child_queries.size(), 1); DCHECK_LE(child_queries.size(), 2); catalog_op_executor_.reset( - new CatalogOpExecutor(exec_env_, frontend_, &server_profile_)); + new CatalogOpExecutor(exec_env_, frontend_, server_profile_)); // If there was no column stats query, pass in empty thrift structures to // ExecComputeStats(). Otherwise pass in the column stats result. @@ -1078,7 +1079,7 @@ void ClientRequestState::ClearResultCache() { void ClientRequestState::UpdateQueryState( beeswax::QueryState::type query_state) { query_state_ = query_state; - summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_)); + summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/client-request-state.h ---------------------------------------------------------------------- diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h index 6846165..1c015c3 100644 --- a/be/src/service/client-request-state.h +++ b/be/src/service/client-request-state.h @@ -184,8 +184,8 @@ class ClientRequestState { void set_user_profile_access(bool user_has_profile_access) { user_has_profile_access_ = user_has_profile_access; } - const RuntimeProfile& profile() const { return profile_; } - const RuntimeProfile& summary_profile() const { return summary_profile_; } + const RuntimeProfile* profile() const { return profile_; } + const RuntimeProfile* summary_profile() const { return summary_profile_; } const TimestampValue& start_time() const { return start_time_; } const TimestampValue& end_time() const { return end_time_; } const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; } @@ -211,7 +211,7 @@ class ClientRequestState { } RuntimeProfile::EventSequence* query_events() const { return query_events_; } - RuntimeProfile* summary_profile() { return &summary_profile_; } + RuntimeProfile* summary_profile() { return summary_profile_; } private: const TQueryCtx query_ctx_; @@ -299,9 +299,9 @@ class ClientRequestState { /// There's a fourth profile which is not built here (but is a /// child of profile_); the execution profile which tracks the /// actual fragment execution. - RuntimeProfile profile_; - RuntimeProfile server_profile_; - RuntimeProfile summary_profile_; + RuntimeProfile* const profile_; + RuntimeProfile* const server_profile_; + RuntimeProfile* const summary_profile_; RuntimeProfile::Counter* row_materialization_timer_; /// Tracks how long we are idle waiting for a client to fetch rows. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 8ba2894..051cfaf 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -623,9 +623,9 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id, RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(), request_state->user_has_profile_access())); if (base64_encoded) { - RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output)); + RETURN_IF_ERROR(request_state->profile()->SerializeToArchiveString(output)); } else { - request_state->profile().PrettyPrint(output); + request_state->profile()->PrettyPrint(output); } return Status::OK(); } @@ -741,7 +741,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use void ImpalaServer::ArchiveQuery(const ClientRequestState& query) { string encoded_profile_str; - Status status = query.profile().SerializeToArchiveString(&encoded_profile_str); + Status status = query.profile()->SerializeToArchiveString(&encoded_profile_str); if (!status.ok()) { // Didn't serialize the string. Continue with empty string. LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string " @@ -1677,7 +1677,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque id = request_state.query_id(); const TExecRequest& request = request_state.exec_request(); - const string* plan_str = request_state.summary_profile().GetInfoString("Plan"); + const string* plan_str = request_state.summary_profile()->GetInfoString("Plan"); if (plan_str != nullptr) plan = *plan_str; stmt = request_state.sql_stmt(); stmt_type = request.stmt_type; @@ -1701,11 +1701,11 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque if (copy_profile) { stringstream ss; - request_state.profile().PrettyPrint(&ss); + request_state.profile()->PrettyPrint(&ss); profile_str = ss.str(); if (encoded_profile.empty()) { Status status = - request_state.profile().SerializeToArchiveString(&encoded_profile_str); + request_state.profile()->SerializeToArchiveString(&encoded_profile_str); if (!status.ok()) { LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string " << status.GetDetail(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/dummy-runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h index 83bccbf..1642d4e 100644 --- a/be/src/util/dummy-runtime-profile.h +++ b/be/src/util/dummy-runtime-profile.h @@ -28,12 +28,12 @@ namespace impala { /// but not always so that the object can still allocate counters in the same way. class DummyProfile { public: - DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {} - RuntimeProfile* profile() { return &profile_; } + DummyProfile() : pool_(), profile_(RuntimeProfile::Create(&pool_, "dummy", false)) {} + RuntimeProfile* profile() { return profile_; } private: ObjectPool pool_; - RuntimeProfile profile_; + RuntimeProfile* const profile_; }; } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.cc ---------------------------------------------------------------------- diff --git a/be/src/util/periodic-counter-updater.cc b/be/src/util/periodic-counter-updater.cc index b1c755a..098e683 100644 --- a/be/src/util/periodic-counter-updater.cc +++ b/be/src/util/periodic-counter-updater.cc @@ -93,21 +93,20 @@ void PeriodicCounterUpdater::RegisterBucketingCounters( } void PeriodicCounterUpdater::StopBucketingCounters( - vector<RuntimeProfile::Counter*>* buckets, bool convert) { + vector<RuntimeProfile::Counter*>* buckets) { int64_t num_sampled = 0; { lock_guard<SpinLock> bucketinglock(instance_->bucketing_lock_); BucketCountersMap::iterator itr = instance_->bucketing_counters_.find(buckets); - if (itr != instance_->bucketing_counters_.end()) { - num_sampled = itr->second.num_sampled; - instance_->bucketing_counters_.erase(itr); - } + // If not registered, we have nothing to do. + if (itr == instance_->bucketing_counters_.end()) return; + num_sampled = itr->second.num_sampled; + instance_->bucketing_counters_.erase(itr); } - if (convert && num_sampled > 0) { - for (int i = 0; i < buckets->size(); ++i) { - RuntimeProfile::Counter* counter = (*buckets)[i]; + if (num_sampled > 0) { + for (RuntimeProfile::Counter* counter : *buckets) { double perc = 100 * counter->value() / (double)num_sampled; counter->Set(perc); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7866eec5/be/src/util/periodic-counter-updater.h ---------------------------------------------------------------------- diff --git a/be/src/util/periodic-counter-updater.h b/be/src/util/periodic-counter-updater.h index c603522..762f372 100644 --- a/be/src/util/periodic-counter-updater.h +++ b/be/src/util/periodic-counter-updater.h @@ -70,12 +70,11 @@ class PeriodicCounterUpdater { /// Stops updating the value of 'counter'. static void StopSamplingCounter(RuntimeProfile::Counter* counter); - /// Stops updating the bucket counter. - /// If convert is true, convert the buckets from count to percentage. - /// Sampling counters are updated periodically so should be removed as soon as the + /// If the bucketing counters 'buckets' are registered, stops updating the counters and + /// convert the buckets from count to percentage. If not registered, has no effect. + /// Perioidic counters are updated periodically so should be removed as soon as the /// underlying counter is no longer going to change. - static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets, - bool convert); + static void StopBucketingCounters(std::vector<RuntimeProfile::Counter*>* buckets); /// Stops 'counter' from receiving any more samples. static void StopTimeSeriesCounter(RuntimeProfile::TimeSeriesCounter* counter);
