IMPALA-5350: Tidy up thread groups for finst exec threads

Put all per-finst threads (profile report, exec thread, async build
thread, scanner threads) together in one group "fragment-execution", and
add finst ID to all their names.

Remove -<tid> suffix from thread names and add a separate column with it
to /threadz web page.

Testing: eyeballed with a join query.

Change-Id: I6fabfcc923863e5f9db4bb8b016c08ff226c079f
Reviewed-on: http://gerrit.cloudera.org:8080/6951
Reviewed-by: Henry Robinson <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9caea9bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9caea9bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9caea9bf

Branch: refs/heads/master
Commit: 9caea9bfad025274762642a03cb5483625d86a09
Parents: bbc3ce1
Author: Henry Robinson <[email protected]>
Authored: Mon May 22 09:56:48 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Jun 1 22:45:53 2017 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc         | 12 ++++++++----
 be/src/exec/hdfs-scan-node.cc             | 10 +++++++---
 be/src/exec/kudu-scan-node.cc             | 14 +++++++++-----
 be/src/runtime/fragment-instance-state.cc |  7 ++++---
 be/src/runtime/fragment-instance-state.h  |  2 ++
 be/src/runtime/query-state.cc             |  8 ++++----
 be/src/util/thread.cc                     |  9 +++------
 www/thread-group.tmpl                     |  4 +++-
 8 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index 8fb0756..6e73f77 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -21,6 +21,7 @@
 
 #include "exec/data-sink.h"
 #include "exprs/expr.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
@@ -147,7 +148,7 @@ void BlockingJoinNode::ProcessBuildInputAsync(RuntimeState* 
state, DataSink* bui
     Status* status) {
   DCHECK(status != nullptr);
   SCOPED_THREAD_COUNTER_MEASUREMENT(state->total_thread_statistics());
-  if  (build_sink == nullptr){
+  if (build_sink == nullptr){
     *status = ProcessBuildInput(state);
   } else {
     *status = SendBuildInputToSink<true>(state, build_sink);
@@ -189,9 +190,12 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
   if (!IsInSubplan() && state->resource_pool()->TryAcquireThreadToken()) {
     Status build_side_status;
     runtime_profile()->AppendExecOption("Join Build-Side Prepared 
Asynchronously");
-    Thread build_thread(
-        node_name_, "build thread", 
bind(&BlockingJoinNode::ProcessBuildInputAsync, this,
-                                        state, build_sink, 
&build_side_status));
+    string thread_name = Substitute("join-build-thread (finst:$0, 
plan-node-id:$1)",
+        PrintId(state->fragment_instance_id()), id());
+    Thread build_thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, 
thread_name,
+        [this, state, build_sink, status=&build_side_status]() {
+          ProcessBuildInputAsync(state, build_sink, status);
+        });
     // Open the left child so that it may perform any initialisation in 
parallel.
     // Don't exit even if we see an error, we still need to wait for the build 
thread
     // to finish.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index c3972b2..576ec55 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -23,6 +23,7 @@
 #include "exec/hdfs-scanner.h"
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
@@ -346,10 +347,13 @@ void 
HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
 
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
     COUNTER_ADD(num_scanner_threads_started_counter_, 1);
-    stringstream ss;
-    ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() 
<< ")";
+    string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, 
thread-idx:$2)",
+        PrintId(runtime_state_->fragment_instance_id()), id(),
+        num_scanner_threads_started_counter_->value());
+
+    auto fn = [this]() { this->ScannerThread(); };
     scanner_threads_.AddThread(
-        new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, 
this));
+        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index c345748..beead44 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -22,6 +22,7 @@
 #include "exec/kudu-scanner.h"
 #include "exec/kudu-util.h"
 #include "gutil/gscoped_ptr.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
@@ -152,14 +153,17 @@ void 
KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     ++num_active_scanners_;
     COUNTER_ADD(num_scanner_threads_started_counter_, 1);
 
-    // Reserve the first token so no other thread picks it up.
-    const string* token = GetNextScanToken();
-    string name = Substitute("scanner-thread($0)",
+    string name = Substitute(
+        "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
+        PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
+    // Reserve the first token so no other thread picks it up.
+    const string* token = GetNextScanToken();
+    auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
     VLOG_RPC << "Thread started: " << name;
-    scanner_threads_.AddThread(new Thread("kudu-scan-node", name,
-        &KuduScanNode::RunScannerThread, this, name, token));
+    scanner_threads_.AddThread(
+        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc 
b/be/src/runtime/fragment-instance-state.cc
index 9376767..5f109f5 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -54,6 +54,7 @@ using namespace impala;
 using namespace apache::thrift;
 
 const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = 
"PerHostPeakMemUsage";
+const string FragmentInstanceState::FINST_THREAD_GROUP_NAME = 
"fragment-execution";
 
 static const string OPEN_TIMER_NAME = "OpenTime";
 static const string PREPARE_TIMER_NAME = "PrepareTime";
@@ -224,10 +225,11 @@ Status FragmentInstanceState::Prepare() {
   // We need to start the profile-reporting thread before calling Open(),
   // since it may block.
   if (FLAGS_status_report_interval > 0) {
+    string thread_name = Substitute("profile-report (finst:$0)", 
PrintId(instance_id()));
     unique_lock<mutex> l(report_thread_lock_);
     report_thread_.reset(
-        new Thread("plan-fragment-executor", "report-profile",
-            &FragmentInstanceState::ReportProfileThread, this));
+        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+            [this]() { this->ReportProfileThread(); }));
     // Make sure the thread started up, otherwise ReportProfileThread() might 
get into
     // a race with StopReportThread().
     while (!report_thread_active_) report_thread_started_cv_.wait(l);
@@ -456,4 +458,3 @@ void FragmentInstanceState::PrintVolumeIds() {
       << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
       << query_id() << ":\n" << str.str();
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h 
b/be/src/runtime/fragment-instance-state.h
index 28b8a54..750f983 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -119,6 +119,8 @@ class FragmentInstanceState {
   const TNetworkAddress& coord_address() const { return 
query_ctx().coord_address; }
   ObjectPool* obj_pool();
 
+  static const std::string FINST_THREAD_GROUP_NAME;
+
  private:
   QueryState* query_state_;
   const TPlanFragmentCtx& fragment_ctx_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 8fec487..90f0185 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -298,10 +298,10 @@ void QueryState::StartFInstances() {
 
     // start new thread to execute instance
     refcnt_.Add(1);  // decremented in ExecFInstance()
-    Thread t("query-state",
-        Substitute(
-          "exec-query-finstance-$0", 
PrintId(instance_ctx.fragment_instance_id)),
-        &QueryState::ExecFInstance, this, fis);
+    string thread_name = Substitute(
+        "exec-finstance (finst:$0)", 
PrintId(instance_ctx.fragment_instance_id));
+    Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
+        [this, fis]() { this->ExecFInstance(fis); });
     t.Detach();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index b59f4f9..b3c7d17 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -259,6 +259,7 @@ void ThreadMgr::ThreadGroupUrlCallback(const 
Webserver::ArgumentMap& args,
     for (const ThreadCategory::value_type& thread: *category) {
       Value val(kObjectType);
       val.AddMember("name", thread.second.name().c_str(), 
document->GetAllocator());
+      val.AddMember("id", thread.second.thread_id(), document->GetAllocator());
       ThreadStats stats;
       Status status = GetThreadStats(thread.second.thread_id(), &stats);
       if (!status.ok()) {
@@ -304,13 +305,9 @@ void Thread::SuperviseThread(const string& name, const 
string& category,
     LOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
   }
   // Make a copy, since we want to refer to these variables after the unsafe 
point below.
-  string category_copy = category;
+  string category_copy = category.empty() ? "no-category" : category;;
   shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
-  stringstream ss;
-  ss << (name.empty() ? "thread" : name) << "-" << system_tid;
-  string name_copy = ss.str();
-
-  if (category_copy.empty()) category_copy = "no-category";
+  string name_copy = name.empty() ? Substitute("thread-$0", system_tid) : name;
 
   // Use boost's get_id rather than the system thread ID as the unique key for 
this thread
   // since the latter is more prone to being recycled.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9caea9bf/www/thread-group.tmpl
----------------------------------------------------------------------
diff --git a/www/thread-group.tmpl b/www/thread-group.tmpl
index 143a029..3e16993 100644
--- a/www/thread-group.tmpl
+++ b/www/thread-group.tmpl
@@ -25,6 +25,7 @@ under the License.
   <thead>
     <tr>
       <th>Thread name</th>
+      <th>Id</th>
       <th>Cumulative User CPU(s)</th>
       <th>Cumulative Kernel CPU(s)</th>
       <th>Cumulative IO-wait(s)</th>
@@ -34,6 +35,7 @@ under the License.
     {{#threads}}
     <tr>
       <td>{{name}}</td>
+      <td>{{id}}</td>
       <td>{{user_ns}}</td>
       <td>{{kernel_ns}}</td>
       <td>{{iowait_ns}}</td>
@@ -45,7 +47,7 @@ under the License.
 <script>
     $(document).ready(function() {
         $('#threads-tbl').DataTable({
-            "order": [[ 1, "desc" ]],
+            "order": [[ 2, "desc" ]],
             "pageLength": 100
         });
     });

Reply via email to