IMPALA-5350: Tidy up thread groups for finst exec threads Put all per-finst threads (profile report, exec thread, async build thread, scanner threads) together in one group "fragment-execution", and add finst ID to all their names.
Remove -<tid> suffix from thread names and add a separate column with it to /threadz web page. Testing: eyeballed with a join query. Change-Id: I6fabfcc923863e5f9db4bb8b016c08ff226c079f Reviewed-on: http://gerrit.cloudera.org:8080/6951 Reviewed-by: Henry Robinson <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9caea9bf Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9caea9bf Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9caea9bf Branch: refs/heads/master Commit: 9caea9bfad025274762642a03cb5483625d86a09 Parents: bbc3ce1 Author: Henry Robinson <[email protected]> Authored: Mon May 22 09:56:48 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Jun 1 22:45:53 2017 +0000 ---------------------------------------------------------------------- be/src/exec/blocking-join-node.cc | 12 ++++++++---- be/src/exec/hdfs-scan-node.cc | 10 +++++++--- be/src/exec/kudu-scan-node.cc | 14 +++++++++----- be/src/runtime/fragment-instance-state.cc | 7 ++++--- be/src/runtime/fragment-instance-state.h | 2 ++ be/src/runtime/query-state.cc | 8 ++++---- be/src/util/thread.cc | 9 +++------ www/thread-group.tmpl | 4 +++- 8 files changed, 40 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/blocking-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index 8fb0756..6e73f77 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -21,6 +21,7 @@ #include "exec/data-sink.h" #include "exprs/expr.h" +#include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" #include "runtime/runtime-state.h" @@ -147,7 +148,7 @@ void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* state, DataSink* bui Status* status) { DCHECK(status != nullptr); SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics()); - if (build_sink == nullptr){ + if (build_sink == nullptr){ *status = ProcessBuildInput(state); } else { *status = SendBuildInputToSink<true>(state, build_sink); @@ -189,9 +190,12 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe( if (!IsInSubplan() && state->resource_pool()->TryAcquireThreadToken()) { Status build_side_status; runtime_profile()->AppendExecOption("Join Build-Side Prepared Asynchronously"); - Thread build_thread( - node_name_, "build thread", bind(&BlockingJoinNode::ProcessBuildInputAsync, this, - state, build_sink, &build_side_status)); + string thread_name = Substitute("join-build-thread (finst:$0, plan-node-id:$1)", + PrintId(state->fragment_instance_id()), id()); + Thread build_thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name, + [this, state, build_sink, status=&build_side_status]() { + ProcessBuildInputAsync(state, build_sink, status); + }); // Open the left child so that it may perform any initialisation in parallel. // Don't exit even if we see an error, we still need to wait for the build thread // to finish. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index c3972b2..576ec55 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -23,6 +23,7 @@ #include "exec/hdfs-scanner.h" #include "exec/scanner-context.h" #include "runtime/descriptors.h" +#include "runtime/fragment-instance-state.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" #include "runtime/mem-tracker.h" @@ -346,10 +347,13 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) COUNTER_ADD(&active_scanner_thread_counter_, 1); COUNTER_ADD(num_scanner_threads_started_counter_, 1); - stringstream ss; - ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() << ")"; + string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)", + PrintId(runtime_state_->fragment_instance_id()), id(), + num_scanner_threads_started_counter_->value()); + + auto fn = [this]() { this->ScannerThread(); }; scanner_threads_.AddThread( - new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, this)); + new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index c345748..beead44 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -22,6 +22,7 @@ #include "exec/kudu-scanner.h" #include "exec/kudu-util.h" #include "gutil/gscoped_ptr.h" +#include "runtime/fragment-instance-state.h" #include "runtime/mem-pool.h" #include "runtime/runtime-state.h" #include "runtime/row-batch.h" @@ -152,14 +153,17 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) { ++num_active_scanners_; COUNTER_ADD(num_scanner_threads_started_counter_, 1); - // Reserve the first token so no other thread picks it up. - const string* token = GetNextScanToken(); - string name = Substitute("scanner-thread($0)", + string name = Substitute( + "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)", + PrintId(runtime_state_->fragment_instance_id()), id(), num_scanner_threads_started_counter_->value()); + // Reserve the first token so no other thread picks it up. + const string* token = GetNextScanToken(); + auto fn = [this, token, name]() { this->RunScannerThread(name, token); }; VLOG_RPC << "Thread started: " << name; - scanner_threads_.AddThread(new Thread("kudu-scan-node", name, - &KuduScanNode::RunScannerThread, this, name, token)); + scanner_threads_.AddThread( + new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn)); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 9376767..5f109f5 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -54,6 +54,7 @@ using namespace impala; using namespace apache::thrift; const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage"; +const string FragmentInstanceState::FINST_THREAD_GROUP_NAME = "fragment-execution"; static const string OPEN_TIMER_NAME = "OpenTime"; static const string PREPARE_TIMER_NAME = "PrepareTime"; @@ -224,10 +225,11 @@ Status FragmentInstanceState::Prepare() { // We need to start the profile-reporting thread before calling Open(), // since it may block. if (FLAGS_status_report_interval > 0) { + string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id())); unique_lock<mutex> l(report_thread_lock_); report_thread_.reset( - new Thread("plan-fragment-executor", "report-profile", - &FragmentInstanceState::ReportProfileThread, this)); + new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name, + [this]() { this->ReportProfileThread(); })); // Make sure the thread started up, otherwise ReportProfileThread() might get into // a race with StopReportThread(). while (!report_thread_active_) report_thread_started_cv_.wait(l); @@ -456,4 +458,3 @@ void FragmentInstanceState::PrintVolumeIds() { << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query=" << query_id() << ":\n" << str.str(); } - http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/fragment-instance-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h index 28b8a54..750f983 100644 --- a/be/src/runtime/fragment-instance-state.h +++ b/be/src/runtime/fragment-instance-state.h @@ -119,6 +119,8 @@ class FragmentInstanceState { const TNetworkAddress& coord_address() const { return query_ctx().coord_address; } ObjectPool* obj_pool(); + static const std::string FINST_THREAD_GROUP_NAME; + private: QueryState* query_state_; const TPlanFragmentCtx& fragment_ctx_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 8fec487..90f0185 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -298,10 +298,10 @@ void QueryState::StartFInstances() { // start new thread to execute instance refcnt_.Add(1); // decremented in ExecFInstance() - Thread t("query-state", - Substitute( - "exec-query-finstance-$0", PrintId(instance_ctx.fragment_instance_id)), - &QueryState::ExecFInstance, this, fis); + string thread_name = Substitute( + "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id)); + Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name, + [this, fis]() { this->ExecFInstance(fis); }); t.Detach(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/util/thread.cc ---------------------------------------------------------------------- diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc index b59f4f9..b3c7d17 100644 --- a/be/src/util/thread.cc +++ b/be/src/util/thread.cc @@ -259,6 +259,7 @@ void ThreadMgr::ThreadGroupUrlCallback(const Webserver::ArgumentMap& args, for (const ThreadCategory::value_type& thread: *category) { Value val(kObjectType); val.AddMember("name", thread.second.name().c_str(), document->GetAllocator()); + val.AddMember("id", thread.second.thread_id(), document->GetAllocator()); ThreadStats stats; Status status = GetThreadStats(thread.second.thread_id(), &stats); if (!status.ok()) { @@ -304,13 +305,9 @@ void Thread::SuperviseThread(const string& name, const string& category, LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg; } // Make a copy, since we want to refer to these variables after the unsafe point below. - string category_copy = category; + string category_copy = category.empty() ? "no-category" : category;; shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager; - stringstream ss; - ss << (name.empty() ? "thread" : name) << "-" << system_tid; - string name_copy = ss.str(); - - if (category_copy.empty()) category_copy = "no-category"; + string name_copy = name.empty() ? Substitute("thread-$0", system_tid) : name; // Use boost's get_id rather than the system thread ID as the unique key for this thread // since the latter is more prone to being recycled. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/www/thread-group.tmpl ---------------------------------------------------------------------- diff --git a/www/thread-group.tmpl b/www/thread-group.tmpl index 143a029..3e16993 100644 --- a/www/thread-group.tmpl +++ b/www/thread-group.tmpl @@ -25,6 +25,7 @@ under the License. <thead> <tr> <th>Thread name</th> + <th>Id</th> <th>Cumulative User CPU(s)</th> <th>Cumulative Kernel CPU(s)</th> <th>Cumulative IO-wait(s)</th> @@ -34,6 +35,7 @@ under the License. {{#threads}} <tr> <td>{{name}}</td> + <td>{{id}}</td> <td>{{user_ns}}</td> <td>{{kernel_ns}}</td> <td>{{iowait_ns}}</td> @@ -45,7 +47,7 @@ under the License. <script> $(document).ready(function() { $('#threads-tbl').DataTable({ - "order": [[ 1, "desc" ]], + "order": [[ 2, "desc" ]], "pageLength": 100 }); });
