http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread.cc b/be/src/kudu/util/thread.cc
new file mode 100644
index 0000000..4abc7c1
--- /dev/null
+++ b/be/src/kudu/util/thread.cc
@@ -0,0 +1,628 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#include "kudu/util/thread.h"
+
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif // defined(__linux__)
+#include <sys/resource.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <cerrno>
+#include <cstring>
+#include <map>
+#include <memory>
+#include <sstream>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/smart_ptr/shared_ptr.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/url-coding.h"
+#include "kudu/util/web_callback_registry.h"
+
+using boost::bind;
+using boost::mem_fn;
+using std::endl;
+using std::map;
+using std::ostringstream;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_gauge_uint64(server, threads_started,
+                           "Threads Started",
+                           kudu::MetricUnit::kThreads,
+                           "Total number of threads started on this server",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, threads_running,
+                           "Threads Running",
+                           kudu::MetricUnit::kThreads,
+                           "Current number of running threads");
+
+METRIC_DEFINE_gauge_uint64(server, cpu_utime,
+                           "User CPU Time",
+                           kudu::MetricUnit::kMilliseconds,
+                           "Total user CPU time of the process",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, cpu_stime,
+                           "System CPU Time",
+                           kudu::MetricUnit::kMilliseconds,
+                           "Total system CPU time of the process",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches,
+                           "Voluntary Context Switches",
+                           kudu::MetricUnit::kContextSwitches,
+                           "Total voluntary context switches",
+                           kudu::EXPOSE_AS_COUNTER);
+
+METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches,
+                           "Involuntary Context Switches",
+                           kudu::MetricUnit::kContextSwitches,
+                           "Total involuntary context switches",
+                           kudu::EXPOSE_AS_COUNTER);
+
+DEFINE_int32(thread_inject_start_latency_ms, 0,
+             "Number of ms to sleep when starting a new thread. (For tests).");
+TAG_FLAG(thread_inject_start_latency_ms, hidden);
+TAG_FLAG(thread_inject_start_latency_ms, unsafe);
+
+namespace kudu {
+
+static uint64_t GetCpuUTime() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL;
+}
+
+static uint64_t GetCpuSTime() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL;
+}
+
+static uint64_t GetVoluntaryContextSwitches() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_nvcsw;;
+}
+
+static uint64_t GetInVoluntaryContextSwitches() {
+  rusage ru;
+  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
+  return ru.ru_nivcsw;
+}
+
+class ThreadMgr;
+
+__thread Thread* Thread::tls_ = NULL;
+
+// Singleton instance of ThreadMgr. Only visible in this file, used only by 
Thread.
+// The Thread class adds a reference to thread_manager while it is supervising 
a thread so
+// that a race between the end of the process's main thread (and therefore the 
destruction
+// of thread_manager) and the end of a thread that tries to remove itself from 
the
+// manager after the destruction can be avoided.
+static shared_ptr<ThreadMgr> thread_manager;
+
+// Controls the single (lazy) initialization of thread_manager.
+static GoogleOnceType once = GOOGLE_ONCE_INIT;
+
+// A singleton class that tracks all live threads, and groups them together 
for easy
+// auditing. Used only by Thread.
+class ThreadMgr {
+ public:
+  ThreadMgr()
+      : threads_started_metric_(0),
+        threads_running_metric_(0) {
+  }
+
+  ~ThreadMgr() {
+    MutexLock l(lock_);
+    thread_categories_.clear();
+  }
+
+  static void SetThreadName(const std::string& name, int64_t tid);
+
+  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, 
WebCallbackRegistry* web);
+
+  // Registers a thread to the supplied category. The key is a pthread_t,
+  // not the system TID, since pthread_t is less prone to being recycled.
+  void AddThread(const pthread_t& pthread_id, const string& name, const 
string& category,
+      int64_t tid);
+
+  // Removes a thread from the supplied category. If the thread has
+  // already been removed, this is a no-op.
+  void RemoveThread(const pthread_t& pthread_id, const string& category);
+
+ private:
+  // Container class for any details we want to capture about a thread
+  // TODO: Add start-time.
+  // TODO: Track fragment ID.
+  class ThreadDescriptor {
+   public:
+    ThreadDescriptor() { }
+    ThreadDescriptor(string category, string name, int64_t thread_id)
+        : name_(std::move(name)),
+          category_(std::move(category)),
+          thread_id_(thread_id) {}
+
+    const string& name() const { return name_; }
+    const string& category() const { return category_; }
+    int64_t thread_id() const { return thread_id_; }
+
+   private:
+    string name_;
+    string category_;
+    int64_t thread_id_;
+  };
+
+  // A ThreadCategory is a set of threads that are logically related.
+  // TODO: unordered_map is incompatible with pthread_t, but would be more
+  // efficient here.
+  typedef map<const pthread_t, ThreadDescriptor> ThreadCategory;
+
+  // All thread categorys, keyed on the category name.
+  typedef map<string, ThreadCategory> ThreadCategoryMap;
+
+  // Protects thread_categories_ and thread metrics.
+  Mutex lock_;
+
+  // All thread categorys that ever contained a thread, even if empty
+  ThreadCategoryMap thread_categories_;
+
+  // Counters to track all-time total number of threads, and the
+  // current number of running threads.
+  uint64_t threads_started_metric_;
+  uint64_t threads_running_metric_;
+
+  // Metric callbacks.
+  uint64_t ReadThreadsStarted();
+  uint64_t ReadThreadsRunning();
+
+  // Webpage callback; prints all threads by category.
+  void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
+                         WebCallbackRegistry::PrerenderedWebResponse* resp);
+  void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* 
output);
+};
+
+void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
+  // On linux we can get the thread names to show up in the debugger by setting
+  // the process name for the LWP.  We don't want to do this for the main
+  // thread because that would rename the process, causing tools like killall
+  // to stop working.
+  if (tid == getpid()) {
+    return;
+  }
+
+#if defined(__linux__)
+  // http://0pointer.de/blog/projects/name-your-threads.html
+  // Set the name for the LWP (which gets truncated to 15 characters).
+  // Note that glibc also has a 'pthread_setname_np' api, but it may not be
+  // available everywhere and it's only benefit over using prctl directly is
+  // that it can set the name of threads other than the current thread.
+  int err = prctl(PR_SET_NAME, name.c_str());
+#else
+  int err = pthread_setname_np(name.c_str());
+#endif // defined(__linux__)
+  // We expect EPERM failures in sandboxed processes, just ignore those.
+  if (err < 0 && errno != EPERM) {
+    PLOG(ERROR) << "SetThreadName";
+  }
+}
+
+Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& 
metrics,
+                                       WebCallbackRegistry* web) {
+  MutexLock l(lock_);
+
+  // Use function gauges here so that we can register a unique copy of these 
metrics in
+  // multiple tservers, even though the ThreadMgr is itself a singleton.
+  metrics->NeverRetire(
+      METRIC_threads_started.InstantiateFunctionGauge(metrics,
+        Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this))));
+  metrics->NeverRetire(
+      METRIC_threads_running.InstantiateFunctionGauge(metrics,
+        Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this))));
+  metrics->NeverRetire(
+      METRIC_cpu_utime.InstantiateFunctionGauge(metrics,
+        Bind(&GetCpuUTime)));
+  metrics->NeverRetire(
+      METRIC_cpu_stime.InstantiateFunctionGauge(metrics,
+        Bind(&GetCpuSTime)));
+  metrics->NeverRetire(
+      METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics,
+        Bind(&GetVoluntaryContextSwitches)));
+  metrics->NeverRetire(
+      METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics,
+        Bind(&GetInVoluntaryContextSwitches)));
+
+  if (web) {
+    WebCallbackRegistry::PrerenderedPathHandlerCallback thread_callback =
+        bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler), this, _1, _2);
+    DCHECK_NOTNULL(web)->RegisterPrerenderedPathHandler("/threadz", "Threads", 
thread_callback,
+                                                        true /* is_styled*/,
+                                                        true /* is_on_nav_bar 
*/);
+  }
+  return Status::OK();
+}
+
+uint64_t ThreadMgr::ReadThreadsStarted() {
+  MutexLock l(lock_);
+  return threads_started_metric_;
+}
+
+uint64_t ThreadMgr::ReadThreadsRunning() {
+  MutexLock l(lock_);
+  return threads_running_metric_;
+}
+
+void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
+    const string& category, int64_t tid) {
+  // These annotations cause TSAN to ignore the synchronization on lock_
+  // without causing the subsequent mutations to be treated as data races
+  // in and of themselves (that's what IGNORE_READS_AND_WRITES does).
+  //
+  // Why do we need them here and in SuperviseThread()? TSAN operates by
+  // observing synchronization events and using them to establish "happens
+  // before" relationships between threads. Where these relationships are
+  // not built, shared state access constitutes a data race. The
+  // synchronization events here, in RemoveThread(), and in
+  // SuperviseThread() may cause TSAN to establish a "happens before"
+  // relationship between thread functors, ignoring potential data races.
+  // The annotations prevent this from happening.
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+  {
+    MutexLock l(lock_);
+    thread_categories_[category][pthread_id] = ThreadDescriptor(category, 
name, tid);
+    threads_running_metric_++;
+    threads_started_metric_++;
+  }
+  ANNOTATE_IGNORE_SYNC_END();
+  ANNOTATE_IGNORE_READS_AND_WRITES_END();
+}
+
+void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& 
category) {
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
+  {
+    MutexLock l(lock_);
+    auto category_it = thread_categories_.find(category);
+    DCHECK(category_it != thread_categories_.end());
+    category_it->second.erase(pthread_id);
+    threads_running_metric_--;
+  }
+  ANNOTATE_IGNORE_SYNC_END();
+  ANNOTATE_IGNORE_READS_AND_WRITES_END();
+}
+
+void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category,
+    ostringstream* output) {
+  for (const ThreadCategory::value_type& thread : category) {
+    ThreadStats stats;
+    Status status = GetThreadStats(thread.second.thread_id(), &stats);
+    if (!status.ok()) {
+      KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
+                              << status.ToString();
+    }
+    (*output) << "<tr><td>" << thread.second.name() << "</td><td>"
+              << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>"
+              << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>"
+              << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>";
+  }
+}
+
+void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
+                                  WebCallbackRegistry::PrerenderedWebResponse* 
resp) {
+  ostringstream* output = resp->output;
+  MutexLock l(lock_);
+  vector<const ThreadCategory*> categories_to_print;
+  auto category_name = req.parsed_args.find("group");
+  if (category_name != req.parsed_args.end()) {
+    string group = EscapeForHtmlToString(category_name->second);
+    (*output) << "<h2>Thread Group: " << group << "</h2>" << endl;
+    if (group != "all") {
+      ThreadCategoryMap::const_iterator category = 
thread_categories_.find(group);
+      if (category == thread_categories_.end()) {
+        (*output) << "Thread group '" << group << "' not found" << endl;
+        return;
+      }
+      categories_to_print.push_back(&category->second);
+      (*output) << "<h3>" << category->first << " : " << 
category->second.size()
+                << "</h3>";
+    } else {
+      for (const ThreadCategoryMap::value_type& category : thread_categories_) 
{
+        categories_to_print.push_back(&category.second);
+      }
+      (*output) << "<h3>All Threads : </h3>";
+    }
+
+    (*output) << "<table class='table table-hover table-border'>";
+    (*output) << "<thead><tr><th>Thread name</th><th>Cumulative User 
CPU(s)</th>"
+              << "<th>Cumulative Kernel CPU(s)</th>"
+              << "<th>Cumulative IO-wait(s)</th></tr></thead>";
+    (*output) << "<tbody>\n";
+
+    for (const ThreadCategory* category : categories_to_print) {
+      PrintThreadCategoryRows(*category, output);
+    }
+    (*output) << "</tbody></table>";
+  } else {
+    (*output) << "<h2>Thread Groups</h2>";
+    (*output) << "<h4>" << threads_running_metric_ << " thread(s) running";
+    (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>";
+
+    for (const ThreadCategoryMap::value_type& category : thread_categories_) {
+      string category_arg;
+      UrlEncode(category.first, &category_arg);
+      (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>"
+                << category.first << " : " << category.second.size() << 
"</h3></a>";
+    }
+  }
+}
+
+static void InitThreading() {
+  thread_manager.reset(new ThreadMgr());
+}
+
+Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& 
server_metrics,
+                                  WebCallbackRegistry* web) {
+  GoogleOnceInit(&once, &InitThreading);
+  return thread_manager->StartInstrumentation(server_metrics, web);
+}
+
+ThreadJoiner::ThreadJoiner(Thread* thr)
+  : thread_(CHECK_NOTNULL(thr)),
+    warn_after_ms_(kDefaultWarnAfterMs),
+    warn_every_ms_(kDefaultWarnEveryMs),
+    give_up_after_ms_(kDefaultGiveUpAfterMs) {
+}
+
+ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
+  warn_after_ms_ = ms;
+  return *this;
+}
+
+ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
+  warn_every_ms_ = ms;
+  return *this;
+}
+
+ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
+  give_up_after_ms_ = ms;
+  return *this;
+}
+
+Status ThreadJoiner::Join() {
+  if (Thread::current_thread() &&
+      Thread::current_thread()->tid() == thread_->tid()) {
+    return Status::InvalidArgument("Can't join on own thread", thread_->name_);
+  }
+
+  // Early exit: double join is a no-op.
+  if (!thread_->joinable_) {
+    return Status::OK();
+  }
+
+  int waited_ms = 0;
+  bool keep_trying = true;
+  while (keep_trying) {
+    if (waited_ms >= warn_after_ms_) {
+      LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid 
$2)",
+                                 waited_ms, thread_->name_, thread_->tid_);
+    }
+
+    int remaining_before_giveup = MathLimits<int>::kMax;
+    if (give_up_after_ms_ != -1) {
+      remaining_before_giveup = give_up_after_ms_ - waited_ms;
+    }
+
+    int remaining_before_next_warn = warn_every_ms_;
+    if (waited_ms < warn_after_ms_) {
+      remaining_before_next_warn = warn_after_ms_ - waited_ms;
+    }
+
+    if (remaining_before_giveup < remaining_before_next_warn) {
+      keep_trying = false;
+    }
+
+    int wait_for = std::min(remaining_before_giveup, 
remaining_before_next_warn);
+
+    if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) {
+      // Unconditionally join before returning, to guarantee that any TLS
+      // has been destroyed (pthread_key_create() destructors only run
+      // after a pthread's user method has returned).
+      int ret = pthread_join(thread_->thread_, NULL);
+      CHECK_EQ(ret, 0);
+      thread_->joinable_ = false;
+      return Status::OK();
+    }
+    waited_ms += wait_for;
+  }
+  return Status::Aborted(strings::Substitute("Timed out after $0ms joining on 
$1",
+                                             waited_ms, thread_->name_));
+}
+
+Thread::~Thread() {
+  if (joinable_) {
+    int ret = pthread_detach(thread_);
+    CHECK_EQ(ret, 0);
+  }
+}
+
+std::string Thread::ToString() const {
+  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), 
name_, category_);
+}
+
+int64_t Thread::WaitForTid() const {
+  const string log_prefix = Substitute("$0 ($1) ", name_, category_);
+  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
+                                   "waiting for new thread to publish its 
TID");
+  int loop_count = 0;
+  while (true) {
+    int64_t t = Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) return t;
+    boost::detail::yield(loop_count++);
+  }
+}
+
+
+Status Thread::StartThread(const std::string& category, const std::string& 
name,
+                           const ThreadFunctor& functor, uint64_t flags,
+                           scoped_refptr<Thread> *holder) {
+  TRACE_COUNTER_INCREMENT("threads_started", 1);
+  TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
+  GoogleOnceInit(&once, &InitThreading);
+
+  const string log_prefix = Substitute("$0 ($1) ", name, category);
+  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, 
"starting thread");
+
+  // Temporary reference for the duration of this function.
+  scoped_refptr<Thread> t(new Thread(category, name, functor));
+
+  // Optional, and only set if the thread was successfully created.
+  //
+  // We have to set this before we even start the thread because it's
+  // allowed for the thread functor to access 'holder'.
+  if (holder) {
+    *holder = t;
+  }
+
+  t->tid_ = PARENT_WAITING_TID;
+
+  // Add a reference count to the thread since SuperviseThread() needs to
+  // access the thread object, and we have no guarantee that our caller
+  // won't drop the reference as soon as we return. This is dereferenced
+  // in FinishThread().
+  t->AddRef();
+
+  auto cleanup = MakeScopedCleanup([&]() {
+      // If we failed to create the thread, we need to undo all of our prep 
work.
+      t->tid_ = INVALID_TID;
+      t->Release();
+    });
+
+  if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
+    LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms 
sleep on thread start";
+    
SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms));
+  }
+
+  {
+    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, 
"creating pthread");
+    SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250);
+    int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, 
t.get());
+    if (ret) {
+      return Status::RuntimeError("Could not create thread", strerror(ret), 
ret);
+    }
+  }
+
+  // The thread has been created and is now joinable.
+  //
+  // Why set this in the parent and not the child? Because only the parent
+  // (or someone communicating with the parent) can join, so joinable must
+  // be set before the parent returns.
+  t->joinable_ = true;
+  cleanup.cancel();
+
+  VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name;
+  return Status::OK();
+}
+
+void* Thread::SuperviseThread(void* arg) {
+  Thread* t = static_cast<Thread*>(arg);
+  int64_t system_tid = Thread::CurrentThreadId();
+  PCHECK(system_tid != -1);
+
+  // Take an additional reference to the thread manager, which we'll need 
below.
+  ANNOTATE_IGNORE_SYNC_BEGIN();
+  shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
+  ANNOTATE_IGNORE_SYNC_END();
+
+  // Set up the TLS.
+  //
+  // We could store a scoped_refptr in the TLS itself, but as its
+  // lifecycle is poorly defined, we'll use a bare pointer. We
+  // already incremented the reference count in StartThread.
+  Thread::tls_ = t;
+
+  // Publish our tid to 'tid_', which unblocks any callers waiting in
+  // WaitForTid().
+  Release_Store(&t->tid_, system_tid);
+
+  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+  thread_manager->SetThreadName(name, t->tid_);
+  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_);
+
+  // FinishThread() is guaranteed to run (even if functor_ throws an
+  // exception) because pthread_cleanup_push() creates a scoped object
+  // whose destructor invokes the provided callback.
+  pthread_cleanup_push(&Thread::FinishThread, t);
+  t->functor_();
+  pthread_cleanup_pop(true);
+
+  return NULL;
+}
+
+void Thread::FinishThread(void* arg) {
+  Thread* t = static_cast<Thread*>(arg);
+
+  // We're here either because of the explicit pthread_cleanup_pop() in
+  // SuperviseThread() or through pthread_exit(). In either case,
+  // thread_manager is guaranteed to be live because thread_mgr_ref in
+  // SuperviseThread() is still live.
+  thread_manager->RemoveThread(pthread_self(), t->category());
+
+  // Signal any Joiner that we're done.
+  t->done_.CountDown();
+
+  VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category() << ":" << 
t->name();
+  t->Release();
+  // NOTE: the above 'Release' call could be the last reference to 'this',
+  // so 'this' could be destructed at this point. Do not add any code
+  // following here!
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread.h b/be/src/kudu/util/thread.h
new file mode 100644
index 0000000..dd035f8
--- /dev/null
+++ b/be/src/kudu/util/thread.h
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Copied from Impala and adapted to Kudu.
+
+#ifndef KUDU_UTIL_THREAD_H
+#define KUDU_UTIL_THREAD_H
+
+#include <pthread.h>
+#if defined(__linux__)
+#include <syscall.h>
+#else
+#include <sys/syscall.h>
+#endif
+#include <unistd.h>
+
+#include <cstdint>
+#include <string>
+#include <utility>
+
+#include <boost/bind.hpp>     // IWYU pragma: keep
+#include <boost/function.hpp> // IWYU pragma: keep
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MetricEntity;
+class Thread;
+class WebCallbackRegistry;
+
+// Utility to join on a thread, printing warning messages if it
+// takes too long. For example:
+//
+//   ThreadJoiner(&my_thread, "processing thread")
+//     .warn_after_ms(1000)
+//     .warn_every_ms(5000)
+//     .Join();
+//
+// TODO: would be nice to offer a way to use ptrace() or signals to
+// dump the stack trace of the thread we're trying to join on if it
+// gets stuck. But, after looking for 20 minutes or so, it seems
+// pretty complicated to get right.
+class ThreadJoiner {
+ public:
+  explicit ThreadJoiner(Thread* thread);
+
+  // Start emitting warnings after this many milliseconds.
+  //
+  // Default: 1000 ms.
+  ThreadJoiner& warn_after_ms(int ms);
+
+  // After the warnings after started, emit another warning at the
+  // given interval.
+  //
+  // Default: 1000 ms.
+  ThreadJoiner& warn_every_ms(int ms);
+
+  // If the thread has not stopped after this number of milliseconds, give up
+  // joining on it and return Status::Aborted.
+  //
+  // -1 (the default) means to wait forever trying to join.
+  ThreadJoiner& give_up_after_ms(int ms);
+
+  // Join the thread, subject to the above parameters. If the thread joining
+  // fails for any reason, returns RuntimeError. If it times out, returns
+  // Aborted.
+  Status Join();
+
+ private:
+  enum {
+    kDefaultWarnAfterMs = 1000,
+    kDefaultWarnEveryMs = 1000,
+    kDefaultGiveUpAfterMs = -1 // forever
+  };
+
+  Thread* thread_;
+
+  int warn_after_ms_;
+  int warn_every_ms_;
+  int give_up_after_ms_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
+};
+
+// Thin wrapper around pthread that can register itself with the singleton 
ThreadMgr
+// (a private class implemented in thread.cc entirely, which tracks all live 
threads so
+// that they may be monitored via the debug webpages). This class has a 
limited subset of
+// boost::thread's API. Construction is almost the same, but clients must 
supply a
+// category and a name for each thread so that they can be identified in the 
debug web
+// UI. Otherwise, Join() is the only supported method from boost::thread.
+//
+// Each Thread object knows its operating system thread ID (TID), which can be 
used to
+// attach debuggers to specific threads, to retrieve resource-usage statistics 
from the
+// operating system, and to assign threads to resource control groups.
+//
+// Threads are shared objects, but in a degenerate way. They may only have
+// up to two referents: the caller that created the thread (parent), and
+// the thread itself (child). Moreover, the only two methods to mutate state
+// (Join() and the destructor) are constrained: the child may not Join() on
+// itself, and the destructor is only run when there's one referent left.
+// These constraints allow us to access thread internals without any locks.
+class Thread : public RefCountedThreadSafe<Thread> {
+ public:
+
+  // Flags passed to Thread::CreateWithFlags().
+  enum CreateFlags {
+    NO_FLAGS = 0,
+
+    // Disable the use of KernelStackWatchdog to detect and log slow
+    // thread creations. This is necessary when starting the kernel stack
+    // watchdog thread itself to avoid reentrancy.
+    NO_STACK_WATCHDOG = 1 << 0
+  };
+
+  // This constructor pattern mimics that in boost::thread. There is
+  // one constructor for each number of arguments that the thread
+  // function accepts. To extend the set of acceptable signatures, add
+  // another constructor with <class F, class A1.... class An>.
+  //
+  // In general:
+  //  - category: string identifying the thread category to which this thread 
belongs,
+  //    used for organising threads together on the debug UI.
+  //  - name: name of this thread. Will be appended with "-<thread-id>" to 
ensure
+  //    uniqueness.
+  //  - F - a method type that supports operator(), and the instance passed to 
the
+  //    constructor is executed immediately in a separate thread.
+  //  - A1...An - argument types whose instances are passed to f(...)
+  //  - holder - optional shared pointer to hold a reference to the created 
thread.
+  template <class F>
+  static Status CreateWithFlags(const std::string& category, const 
std::string& name,
+                                const F& f, uint64_t flags,
+                                scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, f, flags, holder);
+
+  }
+  template <class F>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, f, NO_FLAGS, holder);
+  }
+
+  template <class F, class A1>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, const A2& a2, scoped_refptr<Thread>* 
holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, 
holder);
+  }
+
+  template <class F, class A1, class A2, class A3>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, 
scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, 
holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), 
NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4, class A5>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, 
const A5& a5,
+                       scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), 
NO_FLAGS, holder);
+  }
+
+  template <class F, class A1, class A2, class A3, class A4, class A5, class 
A6>
+  static Status Create(const std::string& category, const std::string& name, 
const F& f,
+                       const A1& a1, const A2& a2, const A3& a3, const A4& a4, 
const A5& a5,
+                       const A6& a6, scoped_refptr<Thread>* holder) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), 
NO_FLAGS, holder);
+  }
+
+  // Emulates boost::thread and detaches.
+  ~Thread();
+
+  // Blocks until this thread finishes execution. Once this method returns, 
the thread
+  // will be unregistered with the ThreadMgr and will not appear in the debug 
UI.
+  void Join() { ThreadJoiner(this).Join(); }
+
+  // The thread ID assigned to this thread by the operating system. If the 
thread
+  // has not yet started running, returns INVALID_TID.
+  //
+  // NOTE: this may block for a short amount of time if the thread has just 
been
+  // started.
+  int64_t tid() const {
+    int64_t t = base::subtle::Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) {
+      return tid_;
+    }
+    return WaitForTid();
+  }
+
+  // Returns the thread's pthread ID.
+  pthread_t pthread_id() const { return thread_; }
+
+  const std::string& name() const { return name_; }
+  const std::string& category() const { return category_; }
+
+  // Return a string representation of the thread identifying information.
+  std::string ToString() const;
+
+  // The current thread of execution, or NULL if the current thread isn't a 
kudu::Thread.
+  // This call is signal-safe.
+  static Thread* current_thread() { return tls_; }
+
+  // Returns a unique, stable identifier for this thread. Note that this is a 
static
+  // method and thus can be used on any thread, including the main thread of 
the
+  // process.
+  //
+  // In general, this should be used when a value is required that is unique to
+  // a thread and must work on any thread including the main process thread.
+  //
+  // NOTE: this is _not_ the TID, but rather a unique value assigned by the
+  // thread implementation. So, this value should not be presented to the user
+  // in log messages, etc.
+  static int64_t UniqueThreadId() {
+#if defined(__linux__)
+    // This cast is a little bit ugly, but it is significantly faster than
+    // calling syscall(SYS_gettid). In particular, this speeds up some code
+    // paths in the tracing implementation.
+    return static_cast<int64_t>(pthread_self());
+#elif defined(__APPLE__)
+    uint64_t tid;
+    CHECK_EQ(0, pthread_threadid_np(NULL, &tid));
+    return tid;
+#else
+#error Unsupported platform
+#endif
+  }
+
+  // Returns the system thread ID (tid on Linux) for the current thread. Note
+  // that this is a static method and thus can be used from any thread,
+  // including the main thread of the process. This is in contrast to
+  // Thread::tid(), which only works on kudu::Threads.
+  //
+  // Thread::tid() will return the same value, but the value is cached in the
+  // Thread object, so will be faster to call.
+  //
+  // Thread::UniqueThreadId() (or Thread::tid()) should be preferred for
+  // performance sensistive code, however it is only guaranteed to return a
+  // unique and stable thread ID, not necessarily the system thread ID.
+  static int64_t CurrentThreadId() {
+#if defined(__linux__)
+    return syscall(SYS_gettid);
+#else
+    return UniqueThreadId();
+#endif
+  }
+
+ private:
+  friend class ThreadJoiner;
+
+  // See 'tid_' docs.
+  enum {
+    INVALID_TID = -1,
+    PARENT_WAITING_TID = -2,
+  };
+
+  // Function object that wraps the user-supplied function to run in a 
separate thread.
+  typedef boost::function<void ()> ThreadFunctor;
+
+  Thread(std::string category, std::string name, ThreadFunctor functor)
+      : thread_(0),
+        category_(std::move(category)),
+        name_(std::move(name)),
+        tid_(INVALID_TID),
+        functor_(std::move(functor)),
+        done_(1),
+        joinable_(false) {}
+
+  // Library-specific thread ID.
+  pthread_t thread_;
+
+  // Name and category for this thread.
+  const std::string category_;
+  const std::string name_;
+
+  // OS-specific thread ID. Once the constructor finishes StartThread(),
+  // guaranteed to be set either to a non-negative integer, or to INVALID_TID.
+  //
+  // The tid_ member goes through the following states:
+  // 1. INVALID_TID: the thread has not been started, or has already exited.
+  // 2. PARENT_WAITING_TID: the parent has started the thread, but the
+  //    thread has not yet begun running. Therefore the TID is not yet known
+  //    but it will be set once the thread starts.
+  // 3. <positive value>: the thread is running.
+  int64_t tid_;
+
+  // User function to be executed by this thread.
+  const ThreadFunctor functor_;
+
+  // Joiners wait on this latch to be notified if the thread is done.
+  //
+  // Note that Joiners must additionally pthread_join(), otherwise certain
+  // resources that callers expect to be destroyed (like TLS) may still be
+  // alive when a Joiner finishes.
+  CountDownLatch done_;
+
+  bool joinable_;
+
+  // Thread local pointer to the current thread of execution. Will be NULL if 
the current
+  // thread is not a Thread.
+  static __thread Thread* tls_;
+
+  // Wait for the running thread to publish its tid.
+  int64_t WaitForTid() const;
+
+  // Starts the thread running SuperviseThread(), and returns once that thread 
has
+  // initialised and its TID has been read. Waits for notification from the 
started
+  // thread that initialisation is complete before returning. On success, 
stores a
+  // reference to the thread in holder.
+  static Status StartThread(const std::string& category, const std::string& 
name,
+                            const ThreadFunctor& functor, uint64_t flags,
+                            scoped_refptr<Thread>* holder);
+
+  // Wrapper for the user-supplied function. Invoked from the new thread,
+  // with the Thread as its only argument. Executes functor_, but before
+  // doing so registers with the global ThreadMgr and reads the thread's
+  // system ID. After functor_ terminates, unregisters with the ThreadMgr.
+  // Always returns NULL.
+  //
+  // SuperviseThread() notifies StartThread() when thread initialisation is
+  // completed via the tid_, which is set to the new thread's system ID.
+  // By that point in time SuperviseThread() has also taken a reference to
+  // the Thread object, allowing it to safely refer to it even after the
+  // caller drops its reference.
+  //
+  // Additionally, StartThread() notifies SuperviseThread() when the actual
+  // Thread object has been assigned (SuperviseThread() is spinning during
+  // this time). Without this, the new thread may reference the actual
+  // Thread object before it has been assigned by StartThread(). See
+  // KUDU-11 for more details.
+  static void* SuperviseThread(void* arg);
+
+  // Invoked when the user-supplied function finishes or in the case of an
+  // abrupt exit (i.e. pthread_exit()). Cleans up after SuperviseThread().
+  static void FinishThread(void* arg);
+};
+
+// Registers /threadz with the debug webserver, and creates thread-tracking 
metrics under
+// the given entity. If 'web' is NULL, does not register the path handler.
+Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& 
server_metrics,
+                                  WebCallbackRegistry* web);
+} // namespace kudu
+
+#endif /* KUDU_UTIL_THREAD_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread_restrictions.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread_restrictions.cc 
b/be/src/kudu/util/thread_restrictions.cc
new file mode 100644
index 0000000..f956fd9
--- /dev/null
+++ b/be/src/kudu/util/thread_restrictions.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadlocal.h"
+#include "kudu/util/thread_restrictions.h"
+
+#ifdef ENABLE_THREAD_RESTRICTIONS
+
+namespace kudu {
+
+namespace {
+
+struct LocalThreadRestrictions {
+  LocalThreadRestrictions()
+    : io_allowed(true),
+      wait_allowed(true),
+      singleton_allowed(true) {
+  }
+
+  bool io_allowed;
+  bool wait_allowed;
+  bool singleton_allowed;
+};
+
+LocalThreadRestrictions* LoadTLS() {
+  // Disable leak check. LSAN sometimes gets false positives on thread locals.
+  // See: https://github.com/google/sanitizers/issues/757
+  debug::ScopedLeakCheckDisabler d;
+  BLOCK_STATIC_THREAD_LOCAL(LocalThreadRestrictions, 
local_thread_restrictions);
+  return local_thread_restrictions;
+}
+
+} // anonymous namespace
+
+bool ThreadRestrictions::SetIOAllowed(bool allowed) {
+  bool previous_allowed = LoadTLS()->io_allowed;
+  LoadTLS()->io_allowed = allowed;
+  return previous_allowed;
+}
+
+void ThreadRestrictions::AssertIOAllowed() {
+  CHECK(LoadTLS()->io_allowed)
+    << "Function marked as IO-only was called from a thread that "
+    << "disallows IO!  If this thread really should be allowed to "
+    << "make IO calls, adjust the call to "
+    << "kudu::ThreadRestrictions::SetIOAllowed() in this thread's "
+    << "startup. "
+    << (Thread::current_thread() ? Thread::current_thread()->ToString() : 
"(not a kudu::Thread)");
+}
+
+bool ThreadRestrictions::SetWaitAllowed(bool allowed) {
+  bool previous_allowed = LoadTLS()->wait_allowed;
+  LoadTLS()->wait_allowed = allowed;
+  return previous_allowed;
+}
+
+void ThreadRestrictions::AssertWaitAllowed() {
+  CHECK(LoadTLS()->wait_allowed)
+    << "Waiting is not allowed to be used on this thread to prevent "
+    << "server-wide latency aberrations and deadlocks. "
+    << (Thread::current_thread() ? Thread::current_thread()->ToString() : 
"(not a kudu::Thread)");
+}
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread_restrictions.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread_restrictions.h 
b/be/src/kudu/util/thread_restrictions.h
new file mode 100644
index 0000000..23f0cd5
--- /dev/null
+++ b/be/src/kudu/util/thread_restrictions.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Some portions: Copyright (c) 2012, The Chromium Authors.
+#ifndef KUDU_UTIL_THREAD_RESTRICTIONS_H
+#define KUDU_UTIL_THREAD_RESTRICTIONS_H
+
+#include "kudu/gutil/macros.h"
+
+#ifndef NDEBUG
+#define ENABLE_THREAD_RESTRICTIONS 1
+#endif
+
+namespace kudu {
+
+// Certain behavior is disallowed on certain threads.  ThreadRestrictions helps
+// enforce these rules.  Examples of such rules:
+//
+// * Do not do blocking IO
+// * Do not wait on synchronization variables or sleep
+//
+// Here's more about how the protection works:
+//
+// 1) If a thread should not be allowed to make IO calls, mark it:
+//      ThreadRestrictions::SetIOAllowed(false);
+//    By default, threads *are* allowed to make IO calls.
+//    In particular, threads like RPC reactors should never do blocking IO
+//    because it may stall other unrelated requests.
+//
+// 2) If a function makes a call that will go out to disk, check whether the
+//    current thread is allowed:
+//      ThreadRestrictions::AssertIOAllowed();
+//
+//
+// Style tip: where should you put AssertIOAllowed checks?  It's best
+// if you put them as close to the disk access as possible, at the
+// lowest level.  This rule is simple to follow and helps catch all
+// callers.  For example, if your function GoDoSomeBlockingDiskCall()
+// only calls other functions in Kudu and doesn't access the underlying
+// disk, you should go add the AssertIOAllowed checks in the helper functions.
+class ThreadRestrictions {
+ public:
+  // Constructing a ScopedAllowIO temporarily allows IO for the current
+  // thread.  Doing this is almost certainly always incorrect, but sometimes
+  // it makes more sense to allow an exception and file a bug in the backlog
+  // to improve it later.
+  class ScopedAllowIO {
+   public:
+    ScopedAllowIO() { previous_value_ = SetIOAllowed(true); }
+    ~ScopedAllowIO() { SetIOAllowed(previous_value_); }
+   private:
+    // Whether IO is allowed when the ScopedAllowIO was constructed.
+    bool previous_value_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO);
+  };
+
+  // Constructing a ScopedAllowWait temporarily allows waiting on the current
+  // thread.  Doing this is almost always incorrect: consider carefully whether
+  // you should instead be deferring work to a different thread.
+  class ScopedAllowWait {
+   public:
+    ScopedAllowWait() { previous_value_ = SetWaitAllowed(true); }
+    ~ScopedAllowWait() { SetWaitAllowed(previous_value_); }
+   private:
+    // Whether singleton use is allowed when the ScopedAllowWait was
+    // constructed.
+    bool previous_value_;
+
+    DISALLOW_COPY_AND_ASSIGN(ScopedAllowWait);
+  };
+
+
+#if ENABLE_THREAD_RESTRICTIONS
+  // Set whether the current thread to make IO calls.
+  // Threads start out in the *allowed* state.
+  // Returns the previous value.
+  static bool SetIOAllowed(bool allowed);
+
+  // Check whether the current thread is allowed to make IO calls,
+  // and FATALs if not.  See the block comment above the class for
+  // a discussion of where to add these checks.
+  static void AssertIOAllowed();
+
+  // Set whether the current thread may wait/block.  Returns the previous
+  // value.
+  static bool SetWaitAllowed(bool allowed);
+
+  // Check whether the current thread is allowed to wait/block.
+  // FATALs if not.
+  static void AssertWaitAllowed();
+#else
+  // Inline the empty definitions of these functions so that they can be
+  // compiled out.
+  static bool SetIOAllowed(bool allowed) { return true; }
+  static void AssertIOAllowed() {}
+  static bool SetWaitAllowed(bool allowed) { return true; }
+  static void AssertWaitAllowed() {}
+#endif
+
+ private:
+  DISALLOW_IMPLICIT_CONSTRUCTORS(ThreadRestrictions);
+};
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_THREAD_RESTRICTIONS_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal.cc b/be/src/kudu/util/threadlocal.cc
new file mode 100644
index 0000000..444ed15
--- /dev/null
+++ b/be/src/kudu/util/threadlocal.cc
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/threadlocal.h"
+
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <pthread.h>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/util/errno.h"
+
+namespace kudu {
+namespace threadlocal {
+namespace internal {
+
+// One key used by the entire process to attach destructors on thread exit.
+static pthread_key_t destructors_key;
+
+// The above key must only be initialized once per process.
+static GoogleOnceType once = GOOGLE_ONCE_INIT;
+
+namespace {
+
+// List of destructors for all thread locals instantiated on a given thread.
+struct PerThreadDestructorList {
+  void (*destructor)(void*);
+  void* arg;
+  PerThreadDestructorList* next;
+};
+
+} // anonymous namespace
+
+// Call all the destructors associated with all THREAD_LOCAL instances in this
+// thread.
+static void InvokeDestructors(void* t) {
+  PerThreadDestructorList* d = reinterpret_cast<PerThreadDestructorList*>(t);
+  while (d != nullptr) {
+    d->destructor(d->arg);
+    PerThreadDestructorList* next = d->next;
+    delete d;
+    d = next;
+  }
+}
+
+// This key must be initialized only once.
+static void CreateKey() {
+  int ret = pthread_key_create(&destructors_key, &InvokeDestructors);
+  // Linux supports up to 1024 keys, we will use only one for all thread 
locals.
+  CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to 
thread: "
+      << "error " << ret << ": " << ErrnoToString(ret);
+}
+
+// Adds a destructor to the list.
+void AddDestructor(void (*destructor)(void*), void* arg) {
+  GoogleOnceInit(&once, &CreateKey);
+
+  // Returns NULL if nothing is set yet.
+  std::unique_ptr<PerThreadDestructorList> p(new PerThreadDestructorList());
+  p->destructor = destructor;
+  p->arg = arg;
+  p->next = 
reinterpret_cast<PerThreadDestructorList*>(pthread_getspecific(destructors_key));
+  int ret = pthread_setspecific(destructors_key, p.release());
+  // The only time this check should fail is if we are out of memory, or if
+  // somehow key creation failed, which should be caught by the above CHECK.
+  CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor 
list: "
+      << "error " << ret << ": " << ErrnoToString(ret);
+}
+
+} // namespace internal
+} // namespace threadlocal
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal.h b/be/src/kudu/util/threadlocal.h
new file mode 100644
index 0000000..ebe1910
--- /dev/null
+++ b/be/src/kudu/util/threadlocal.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THREADLOCAL_H_
+#define KUDU_UTIL_THREADLOCAL_H_
+
+// Block-scoped static thread local implementation.
+//
+// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL 
macro
+// defines a thread-local pointer to the specified type, which is lazily
+// instantiated by any thread entering the block for the first time. The
+// constructor for the type T is invoked at macro execution time, as expected,
+// and its destructor is invoked when the corresponding thread's Runnable
+// returns, or when the thread exits.
+//
+// Inspired by Poco <http://pocoproject.org/docs/Poco.ThreadLocal.html>,
+// Andrew Tomazos <http://stackoverflow.com/questions/12049684/>, and
+// the C++11 thread_local API.
+//
+// Example usage:
+//
+// // Invokes a 3-arg constructor on SomeClass:
+// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3);
+// instance->DoSomething();
+//
+#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...)                                   
 \
+static __thread T* t;                                                          
 \
+do {                                                                           
 \
+  if (PREDICT_FALSE(t == NULL)) {                                              
 \
+    t = new T(__VA_ARGS__);                                                    
 \
+    threadlocal::internal::AddDestructor(threadlocal::internal::Destroy<T>, 
t); \
+  }                                                                            
 \
+} while (false)
+
+// Class-scoped static thread local implementation.
+//
+// Very similar in implementation to the above block-scoped version, but
+// requires a bit more syntax and vigilance to use properly.
+//
+// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the
+// class header, as usual for variable declarations.
+//
+// Because these variables are static, they must also be defined in the impl
+// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_),
+// which is very much like defining any static member, i.e. int Foo::member_.
+//
+// Finally, each thread must initialize the instance before using it by calling
+// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap
+// call, and may be invoked at the top of any method which may reference a
+// thread-local variable.
+//
+// Due to all of these requirements, you should probably declare TLS members
+// as private.
+//
+// Example usage:
+//
+// // foo.h
+// #include "kudu/utils/file.h"
+// class Foo {
+//  public:
+//   void DoSomething(std::string s);
+//  private:
+//   DECLARE_STATIC_THREAD_LOCAL(utils::File, file_);
+// };
+//
+// // foo.cc
+// #include "kudu/foo.h"
+// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_);
+// void Foo::WriteToFile(std::string s) {
+//   // Call constructor if necessary.
+//   INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt");
+//   file_->Write(s);
+// }
+
+// Goes in the class declaration (usually in a header file).
+// dtor must be destructed _after_ t, so it gets defined first.
+// Uses a mangled variable name for dtor since it must also be a member of the
+// class.
+#define DECLARE_STATIC_THREAD_LOCAL(T, t)                                      
               \
+static __thread T* t
+
+// You must also define the instance in the .cc file.
+#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t)                                
               \
+__thread T* Class::t
+
+// Must be invoked at least once by each thread that will access t.
+#define INIT_STATIC_THREAD_LOCAL(T, t, ...)                                    
   \
+do {                                                                           
   \
+  if (PREDICT_FALSE(t == NULL)) {                                              
   \
+    t = new T(__VA_ARGS__);                                                    
   \
+    threadlocal::internal::AddDestructor(threadlocal::internal::Destroy<T>, 
t);   \
+  }                                                                            
   \
+} while (false)
+
+// Internal implementation below.
+
+namespace kudu {
+namespace threadlocal {
+namespace internal {
+
+// Add a destructor to the list.
+void AddDestructor(void (*destructor)(void*), void* arg);
+
+// Destroy the passed object of type T.
+template<class T>
+static void Destroy(void* t) {
+  // With tcmalloc, this should be pretty cheap (same thread as new).
+  delete reinterpret_cast<T*>(t);
+}
+
+} // namespace internal
+} // namespace threadlocal
+} // namespace kudu
+
+#endif // KUDU_UTIL_THREADLOCAL_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal_cache.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadlocal_cache.h 
b/be/src/kudu/util/threadlocal_cache.h
new file mode 100644
index 0000000..e9ab3c2
--- /dev/null
+++ b/be/src/kudu/util/threadlocal_cache.h
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/util/threadlocal.h"
+
+#include <boost/optional/optional.hpp>
+#include <array>
+#include <memory>
+#include <utility>
+
+namespace kudu {
+
+// A small thread-local cache for arbitrary objects.
+//
+// This can be used as a contention-free "lookaside" type cache for 
frequently-accessed
+// objects to avoid having to go to a less-efficient centralized cache.
+//
+// 'Key' must be copyable, and comparable using operator==().
+// 'T' has no particular requirements.
+template<class Key, class T>
+class ThreadLocalCache {
+ public:
+  // The number of entries in the cache.
+  // NOTE: this should always be a power of two for good performance, so that 
the
+  // compiler can optimize the modulo operations into bit-mask operations.
+  static constexpr int kItemCapacity = 4;
+
+  // Look up a key in the cache. Returns either the existing entry with this 
key,
+  // or nullptr if no entry matched.
+  T* Lookup(const Key& key) {
+    // Our cache is so small that a linear search is likely to be more 
efficient than
+    // any kind of actual hashing. We always start the search at wherever we 
most
+    // recently found a hit.
+    for (int i = 0; i < kItemCapacity; i++) {
+      int idx = (last_hit_ + i) % kItemCapacity;
+      auto& p = cache_[idx];
+      if (p.first == key) {
+        last_hit_ = idx;
+        return p.second.get_ptr();
+      }
+    }
+    return nullptr;
+  }
+
+  // Insert a new entry into the cache. If the cache is full (as it usually is 
in the
+  // steady state), this replaces one of the existing entries. The 'args' are 
forwarded
+  // to T's constructor.
+  //
+  // NOTE: entries returned by a previous call to Lookup() may possibly be 
invalidated
+  // by this function.
+  template<typename ... Args>
+  T* EmplaceNew(const Key& key, Args&&... args) {
+    auto& p = cache_[next_slot_++ % kItemCapacity];
+    p.second.emplace(std::forward<Args>(args)...);
+    p.first = key;
+    return p.second.get_ptr();
+  }
+
+  // Get the the cache instance for this thread, creating it if it has not yet 
been
+  // created.
+  //
+  // The instance is automatically deleted and any cached items destructed 
when the
+  // thread exits.
+  static ThreadLocalCache* GetInstance() {
+    INIT_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_);
+    return tl_instance_;
+  }
+
+ private:
+  using EntryPair = std::pair<Key, boost::optional<T>>;
+  std::array<EntryPair, kItemCapacity> cache_;
+
+  // The next slot that we will write into. We always modulo this by the 
capacity
+  // before use.
+  uint8_t next_slot_ = 0;
+  // The slot where we last got a cache hit, so we can start our search at the 
same
+  // spot, optimizing for the case of repeated lookups of the same hot element.
+  uint8_t last_hit_ = 0;
+
+  static_assert(kItemCapacity <= 1 << (sizeof(next_slot_) * 8),
+                "next_slot_ must be large enough for capacity");
+  static_assert(kItemCapacity <= 1 << (sizeof(last_hit_) * 8),
+                "last_hit_ must be large enough for capacity");
+
+  DECLARE_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_);
+};
+
+// Define the thread-local storage for the ThreadLocalCache template.
+// We can't use DEFINE_STATIC_THREAD_LOCAL here because the commas in the
+// template arguments confuse the C preprocessor.
+template<class K, class T>
+__thread ThreadLocalCache<K,T>* ThreadLocalCache<K,T>::tl_instance_;
+
+} // namespace kudu

Reply via email to