http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread.cc b/be/src/kudu/util/thread.cc new file mode 100644 index 0000000..471b87d --- /dev/null +++ b/be/src/kudu/util/thread.cc @@ -0,0 +1,617 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Copied from Impala and adapted to Kudu. + +#include "kudu/util/thread.h" + +#include <sys/resource.h> +#include <sys/syscall.h> +#include <sys/time.h> +#include <sys/types.h> +#include <unistd.h> + +#include <algorithm> +#include <map> +#include <memory> +#include <set> +#include <sstream> +#include <vector> + +#if defined(__linux__) +#include <sys/prctl.h> +#endif // defined(__linux__) + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/mathlimits.h" +#include "kudu/gutil/once.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/errno.h" +#include "kudu/util/logging.h" +#include "kudu/util/kernel_stack_watchdog.h" +#include "kudu/util/metrics.h" +#include "kudu/util/mutex.h" +#include "kudu/util/os-util.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/url-coding.h" +#include "kudu/util/trace.h" +#include "kudu/util/web_callback_registry.h" + +using boost::bind; +using boost::mem_fn; +using std::endl; +using std::map; +using std::ostringstream; +using std::shared_ptr; +using strings::Substitute; + +METRIC_DEFINE_gauge_uint64(server, threads_started, + "Threads Started", + kudu::MetricUnit::kThreads, + "Total number of threads started on this server", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, threads_running, + "Threads Running", + kudu::MetricUnit::kThreads, + "Current number of running threads"); + +METRIC_DEFINE_gauge_uint64(server, cpu_utime, + "User CPU Time", + kudu::MetricUnit::kMilliseconds, + "Total user CPU time of the process", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, cpu_stime, + "System CPU Time", + kudu::MetricUnit::kMilliseconds, + "Total system CPU time of the process", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches, + "Voluntary Context Switches", + kudu::MetricUnit::kContextSwitches, + "Total voluntary context switches", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches, + "Involuntary Context Switches", + kudu::MetricUnit::kContextSwitches, + "Total involuntary context switches", + kudu::EXPOSE_AS_COUNTER); + +namespace kudu { + +static uint64_t GetCpuUTime() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL; +} + +static uint64_t GetCpuSTime() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL; +} + +static uint64_t GetVoluntaryContextSwitches() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_nvcsw;; +} + +static uint64_t GetInVoluntaryContextSwitches() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_nivcsw; +} + +class ThreadMgr; + +__thread Thread* Thread::tls_ = NULL; + +// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. +// The Thread class adds a reference to thread_manager while it is supervising a thread so +// that a race between the end of the process's main thread (and therefore the destruction +// of thread_manager) and the end of a thread that tries to remove itself from the +// manager after the destruction can be avoided. +static shared_ptr<ThreadMgr> thread_manager; + +// Controls the single (lazy) initialization of thread_manager. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +// A singleton class that tracks all live threads, and groups them together for easy +// auditing. Used only by Thread. +class ThreadMgr { + public: + ThreadMgr() + : metrics_enabled_(false), + threads_started_metric_(0), + threads_running_metric_(0) { + } + + ~ThreadMgr() { + MutexLock l(lock_); + thread_categories_.clear(); + } + + static void SetThreadName(const std::string& name, int64 tid); + + Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web); + + // Registers a thread to the supplied category. The key is a pthread_t, + // not the system TID, since pthread_t is less prone to being recycled. + void AddThread(const pthread_t& pthread_id, const string& name, const string& category, + int64_t tid); + + // Removes a thread from the supplied category. If the thread has + // already been removed, this is a no-op. + void RemoveThread(const pthread_t& pthread_id, const string& category); + + private: + // Container class for any details we want to capture about a thread + // TODO: Add start-time. + // TODO: Track fragment ID. + class ThreadDescriptor { + public: + ThreadDescriptor() { } + ThreadDescriptor(string category, string name, int64_t thread_id) + : name_(std::move(name)), + category_(std::move(category)), + thread_id_(thread_id) {} + + const string& name() const { return name_; } + const string& category() const { return category_; } + int64_t thread_id() const { return thread_id_; } + + private: + string name_; + string category_; + int64_t thread_id_; + }; + + // A ThreadCategory is a set of threads that are logically related. + // TODO: unordered_map is incompatible with pthread_t, but would be more + // efficient here. + typedef map<const pthread_t, ThreadDescriptor> ThreadCategory; + + // All thread categorys, keyed on the category name. + typedef map<string, ThreadCategory> ThreadCategoryMap; + + // Protects thread_categories_ and metrics_enabled_ + Mutex lock_; + + // All thread categorys that ever contained a thread, even if empty + ThreadCategoryMap thread_categories_; + + // True after StartInstrumentation(..) returns + bool metrics_enabled_; + + // Counters to track all-time total number of threads, and the + // current number of running threads. + uint64_t threads_started_metric_; + uint64_t threads_running_metric_; + + // Metric callbacks. + uint64_t ReadThreadsStarted(); + uint64_t ReadThreadsRunning(); + + // Webpage callback; prints all threads by category + void ThreadPathHandler(const WebCallbackRegistry::WebRequest& args, ostringstream* output); + void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* output); +}; + +void ThreadMgr::SetThreadName(const string& name, int64 tid) { + // On linux we can get the thread names to show up in the debugger by setting + // the process name for the LWP. We don't want to do this for the main + // thread because that would rename the process, causing tools like killall + // to stop working. + if (tid == getpid()) { + return; + } + +#if defined(__linux__) + // http://0pointer.de/blog/projects/name-your-threads.html + // Set the name for the LWP (which gets truncated to 15 characters). + // Note that glibc also has a 'pthread_setname_np' api, but it may not be + // available everywhere and it's only benefit over using prctl directly is + // that it can set the name of threads other than the current thread. + int err = prctl(PR_SET_NAME, name.c_str()); +#else + int err = pthread_setname_np(name.c_str()); +#endif // defined(__linux__) + // We expect EPERM failures in sandboxed processes, just ignore those. + if (err < 0 && errno != EPERM) { + PLOG(ERROR) << "SetThreadName"; + } +} + +Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, + WebCallbackRegistry* web) { + MutexLock l(lock_); + metrics_enabled_ = true; + + // Use function gauges here so that we can register a unique copy of these metrics in + // multiple tservers, even though the ThreadMgr is itself a singleton. + metrics->NeverRetire( + METRIC_threads_started.InstantiateFunctionGauge(metrics, + Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this)))); + metrics->NeverRetire( + METRIC_threads_running.InstantiateFunctionGauge(metrics, + Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this)))); + metrics->NeverRetire( + METRIC_cpu_utime.InstantiateFunctionGauge(metrics, + Bind(&GetCpuUTime))); + metrics->NeverRetire( + METRIC_cpu_stime.InstantiateFunctionGauge(metrics, + Bind(&GetCpuSTime))); + metrics->NeverRetire( + METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics, + Bind(&GetVoluntaryContextSwitches))); + metrics->NeverRetire( + METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics, + Bind(&GetInVoluntaryContextSwitches))); + + if (web) { + WebCallbackRegistry::PathHandlerCallback thread_callback = + bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler), this, _1, _2); + DCHECK_NOTNULL(web)->RegisterPathHandler("/threadz", "Threads", thread_callback); + } + return Status::OK(); +} + +uint64_t ThreadMgr::ReadThreadsStarted() { + MutexLock l(lock_); + return threads_started_metric_; +} + +uint64_t ThreadMgr::ReadThreadsRunning() { + MutexLock l(lock_); + return threads_running_metric_; +} + +void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name, + const string& category, int64_t tid) { + // These annotations cause TSAN to ignore the synchronization on lock_ + // without causing the subsequent mutations to be treated as data races + // in and of themselves (that's what IGNORE_READS_AND_WRITES does). + // + // Why do we need them here and in SuperviseThread()? TSAN operates by + // observing synchronization events and using them to establish "happens + // before" relationships between threads. Where these relationships are + // not built, shared state access constitutes a data race. The + // synchronization events here, in RemoveThread(), and in + // SuperviseThread() may cause TSAN to establish a "happens before" + // relationship between thread functors, ignoring potential data races. + // The annotations prevent this from happening. + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(lock_); + thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid); + if (metrics_enabled_) { + threads_running_metric_++; + threads_started_metric_++; + } + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) { + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(lock_); + auto category_it = thread_categories_.find(category); + DCHECK(category_it != thread_categories_.end()); + category_it->second.erase(pthread_id); + if (metrics_enabled_) { + threads_running_metric_--; + } + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category, + ostringstream* output) { + for (const ThreadCategory::value_type& thread : category) { + ThreadStats stats; + Status status = GetThreadStats(thread.second.thread_id(), &stats); + if (!status.ok()) { + KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: " + << status.ToString(); + } + (*output) << "<tr><td>" << thread.second.name() << "</td><td>" + << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>" + << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>" + << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>"; + } +} + +void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, + ostringstream* output) { + MutexLock l(lock_); + vector<const ThreadCategory*> categories_to_print; + auto category_name = req.parsed_args.find("group"); + if (category_name != req.parsed_args.end()) { + string group = EscapeForHtmlToString(category_name->second); + (*output) << "<h2>Thread Group: " << group << "</h2>" << endl; + if (group != "all") { + ThreadCategoryMap::const_iterator category = thread_categories_.find(group); + if (category == thread_categories_.end()) { + (*output) << "Thread group '" << group << "' not found" << endl; + return; + } + categories_to_print.push_back(&category->second); + (*output) << "<h3>" << category->first << " : " << category->second.size() + << "</h3>"; + } else { + for (const ThreadCategoryMap::value_type& category : thread_categories_) { + categories_to_print.push_back(&category.second); + } + (*output) << "<h3>All Threads : </h3>"; + } + + (*output) << "<table class='table table-hover table-border'>"; + (*output) << "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>" + << "<th>Cumulative Kernel CPU(s)</th>" + << "<th>Cumulative IO-wait(s)</th></tr></thead>"; + (*output) << "<tbody>\n"; + + for (const ThreadCategory* category : categories_to_print) { + PrintThreadCategoryRows(*category, output); + } + (*output) << "</tbody></table>"; + } else { + (*output) << "<h2>Thread Groups</h2>"; + if (metrics_enabled_) { + (*output) << "<h4>" << threads_running_metric_ << " thread(s) running"; + } + (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>"; + + for (const ThreadCategoryMap::value_type& category : thread_categories_) { + string category_arg; + UrlEncode(category.first, &category_arg); + (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>" + << category.first << " : " << category.second.size() << "</h3></a>"; + } + } +} + +static void InitThreading() { + // Warm up the stack trace library. This avoids a race in libunwind initialization + // by making sure we initialize it before we start any other threads. + ignore_result(GetStackTraceHex()); + thread_manager.reset(new ThreadMgr()); +} + +Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, + WebCallbackRegistry* web) { + GoogleOnceInit(&once, &InitThreading); + return thread_manager->StartInstrumentation(server_metrics, web); +} + +ThreadJoiner::ThreadJoiner(Thread* thr) + : thread_(CHECK_NOTNULL(thr)), + warn_after_ms_(kDefaultWarnAfterMs), + warn_every_ms_(kDefaultWarnEveryMs), + give_up_after_ms_(kDefaultGiveUpAfterMs) { +} + +ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { + warn_after_ms_ = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) { + warn_every_ms_ = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { + give_up_after_ms_ = ms; + return *this; +} + +Status ThreadJoiner::Join() { + if (Thread::current_thread() && + Thread::current_thread()->tid() == thread_->tid()) { + return Status::InvalidArgument("Can't join on own thread", thread_->name_); + } + + // Early exit: double join is a no-op. + if (!thread_->joinable_) { + return Status::OK(); + } + + int waited_ms = 0; + bool keep_trying = true; + while (keep_trying) { + if (waited_ms >= warn_after_ms_) { + LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid $2)", + waited_ms, thread_->name_, thread_->tid_); + } + + int remaining_before_giveup = MathLimits<int>::kMax; + if (give_up_after_ms_ != -1) { + remaining_before_giveup = give_up_after_ms_ - waited_ms; + } + + int remaining_before_next_warn = warn_every_ms_; + if (waited_ms < warn_after_ms_) { + remaining_before_next_warn = warn_after_ms_ - waited_ms; + } + + if (remaining_before_giveup < remaining_before_next_warn) { + keep_trying = false; + } + + int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); + + if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) { + // Unconditionally join before returning, to guarantee that any TLS + // has been destroyed (pthread_key_create() destructors only run + // after a pthread's user method has returned). + int ret = pthread_join(thread_->thread_, NULL); + CHECK_EQ(ret, 0); + thread_->joinable_ = false; + return Status::OK(); + } + waited_ms += wait_for; + } + return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", + waited_ms, thread_->name_)); +} + +Thread::~Thread() { + if (joinable_) { + int ret = pthread_detach(thread_); + CHECK_EQ(ret, 0); + } +} + +void Thread::CallAtExit(const Closure& cb) { + CHECK_EQ(Thread::current_thread(), this); + exit_callbacks_.push_back(cb); +} + +std::string Thread::ToString() const { + return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_); +} + +Status Thread::StartThread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr<Thread> *holder) { + TRACE_COUNTER_INCREMENT("threads_started", 1); + TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us"); + const string log_prefix = Substitute("$0 ($1) ", name, category); + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread"); + + // Temporary reference for the duration of this function. + scoped_refptr<Thread> t(new Thread(category, name, functor)); + + { + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread"); + SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250); + int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get()); + if (ret) { + return Status::RuntimeError("Could not create thread", strerror(ret), ret); + } + } + + // The thread has been created and is now joinable. + // + // Why set this in the parent and not the child? Because only the parent + // (or someone communicating with the parent) can join, so joinable must + // be set before the parent returns. + t->joinable_ = true; + + // Optional, and only set if the thread was successfully created. + if (holder) { + *holder = t; + } + + // The tid_ member goes through the following states: + // 1 CHILD_WAITING_TID: the child has just been spawned and is waiting + // for the parent to finish writing to caller state (i.e. 'holder'). + // 2. PARENT_WAITING_TID: the parent has updated caller state and is now + // waiting for the child to write the tid. + // 3. <value>: both the parent and the child are free to continue. If the + // value is INVALID_TID, the child could not discover its tid. + Release_Store(&t->tid_, PARENT_WAITING_TID); + { + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, + "waiting for new thread to publish its TID"); + int loop_count = 0; + while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) { + boost::detail::yield(loop_count++); + } + } + + VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name; + return Status::OK(); +} + +void* Thread::SuperviseThread(void* arg) { + Thread* t = static_cast<Thread*>(arg); + int64_t system_tid = Thread::CurrentThreadId(); + if (system_tid == -1) { + string error_msg = ErrnoToString(errno); + KLOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg; + } + string name = strings::Substitute("$0-$1", t->name(), system_tid); + + // Take an additional reference to the thread manager, which we'll need below. + GoogleOnceInit(&once, &InitThreading); + ANNOTATE_IGNORE_SYNC_BEGIN(); + shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager; + ANNOTATE_IGNORE_SYNC_END(); + + // Set up the TLS. + // + // We could store a scoped_refptr in the TLS itself, but as its + // lifecycle is poorly defined, we'll use a bare pointer and take an + // additional reference on t out of band, in thread_ref. + scoped_refptr<Thread> thread_ref = t; + t->tls_ = t; + + // Wait until the parent has updated all caller-visible state, then write + // the TID to 'tid_', thus completing the parent<-->child handshake. + int loop_count = 0; + while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) { + boost::detail::yield(loop_count++); + } + Release_Store(&t->tid_, system_tid); + + thread_manager->SetThreadName(name, t->tid()); + thread_manager->AddThread(pthread_self(), name, t->category(), t->tid()); + + // FinishThread() is guaranteed to run (even if functor_ throws an + // exception) because pthread_cleanup_push() creates a scoped object + // whose destructor invokes the provided callback. + pthread_cleanup_push(&Thread::FinishThread, t); + t->functor_(); + pthread_cleanup_pop(true); + + return NULL; +} + +void Thread::FinishThread(void* arg) { + Thread* t = static_cast<Thread*>(arg); + + for (Closure& c : t->exit_callbacks_) { + c.Run(); + } + + // We're here either because of the explicit pthread_cleanup_pop() in + // SuperviseThread() or through pthread_exit(). In either case, + // thread_manager is guaranteed to be live because thread_mgr_ref in + // SuperviseThread() is still live. + thread_manager->RemoveThread(pthread_self(), t->category()); + + // Signal any Joiner that we're done. + t->done_.CountDown(); + + VLOG(2) << "Ended thread " << t->tid() << " - " + << t->category() << ":" << t->name(); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread.h b/be/src/kudu/util/thread.h new file mode 100644 index 0000000..46e2505 --- /dev/null +++ b/be/src/kudu/util/thread.h @@ -0,0 +1,362 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Copied from Impala and adapted to Kudu. + +#ifndef KUDU_UTIL_THREAD_H +#define KUDU_UTIL_THREAD_H + +#include <pthread.h> +#include <sys/syscall.h> +#include <sys/types.h> + +#include <string> +#include <vector> + +#include <boost/bind.hpp> +#include <boost/function.hpp> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/async_util.h" +#include "kudu/util/status.h" + +namespace kudu { + +class MetricEntity; +class Thread; +class WebCallbackRegistry; + +// Utility to join on a thread, printing warning messages if it +// takes too long. For example: +// +// ThreadJoiner(&my_thread, "processing thread") +// .warn_after_ms(1000) +// .warn_every_ms(5000) +// .Join(); +// +// TODO: would be nice to offer a way to use ptrace() or signals to +// dump the stack trace of the thread we're trying to join on if it +// gets stuck. But, after looking for 20 minutes or so, it seems +// pretty complicated to get right. +class ThreadJoiner { + public: + explicit ThreadJoiner(Thread* thread); + + // Start emitting warnings after this many milliseconds. + // + // Default: 1000 ms. + ThreadJoiner& warn_after_ms(int ms); + + // After the warnings after started, emit another warning at the + // given interval. + // + // Default: 1000 ms. + ThreadJoiner& warn_every_ms(int ms); + + // If the thread has not stopped after this number of milliseconds, give up + // joining on it and return Status::Aborted. + // + // -1 (the default) means to wait forever trying to join. + ThreadJoiner& give_up_after_ms(int ms); + + // Join the thread, subject to the above parameters. If the thread joining + // fails for any reason, returns RuntimeError. If it times out, returns + // Aborted. + Status Join(); + + private: + enum { + kDefaultWarnAfterMs = 1000, + kDefaultWarnEveryMs = 1000, + kDefaultGiveUpAfterMs = -1 // forever + }; + + Thread* thread_; + + int warn_after_ms_; + int warn_every_ms_; + int give_up_after_ms_; + + DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); +}; + +// Thin wrapper around pthread that can register itself with the singleton ThreadMgr +// (a private class implemented in thread.cc entirely, which tracks all live threads so +// that they may be monitored via the debug webpages). This class has a limited subset of +// boost::thread's API. Construction is almost the same, but clients must supply a +// category and a name for each thread so that they can be identified in the debug web +// UI. Otherwise, Join() is the only supported method from boost::thread. +// +// Each Thread object knows its operating system thread ID (TID), which can be used to +// attach debuggers to specific threads, to retrieve resource-usage statistics from the +// operating system, and to assign threads to resource control groups. +// +// Threads are shared objects, but in a degenerate way. They may only have +// up to two referents: the caller that created the thread (parent), and +// the thread itself (child). Moreover, the only two methods to mutate state +// (Join() and the destructor) are constrained: the child may not Join() on +// itself, and the destructor is only run when there's one referent left. +// These constraints allow us to access thread internals without any locks. +class Thread : public RefCountedThreadSafe<Thread> { + public: + + // Flags passed to Thread::CreateWithFlags(). + enum CreateFlags { + NO_FLAGS = 0, + + // Disable the use of KernelStackWatchdog to detect and log slow + // thread creations. This is necessary when starting the kernel stack + // watchdog thread itself to avoid reentrancy. + NO_STACK_WATCHDOG = 1 << 0 + }; + + // This constructor pattern mimics that in boost::thread. There is + // one constructor for each number of arguments that the thread + // function accepts. To extend the set of acceptable signatures, add + // another constructor with <class F, class A1.... class An>. + // + // In general: + // - category: string identifying the thread category to which this thread belongs, + // used for organising threads together on the debug UI. + // - name: name of this thread. Will be appended with "-<thread-id>" to ensure + // uniqueness. + // - F - a method type that supports operator(), and the instance passed to the + // constructor is executed immediately in a separate thread. + // - A1...An - argument types whose instances are passed to f(...) + // - holder - optional shared pointer to hold a reference to the created thread. + template <class F> + static Status CreateWithFlags(const std::string& category, const std::string& name, + const F& f, uint64_t flags, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, f, flags, holder); + + } + template <class F> + static Status Create(const std::string& category, const std::string& name, const F& f, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, f, NO_FLAGS, holder); + } + + template <class F, class A1> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder); + } + + template <class F, class A1, class A2> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4, class A5> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4, class A5, class A6> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + const A6& a6, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder); + } + + // Emulates boost::thread and detaches. + ~Thread(); + + // Blocks until this thread finishes execution. Once this method returns, the thread + // will be unregistered with the ThreadMgr and will not appear in the debug UI. + void Join() { ThreadJoiner(this).Join(); } + + // Call the given Closure on the thread before it exits. The closures are executed + // in the order they are added. + // + // NOTE: This must only be called on the currently executing thread, to avoid having + // to reason about complicated races (eg registering a callback on an already-dead + // thread). + // + // This callback is guaranteed to be called except in the case of a process crash. + void CallAtExit(const Closure& cb); + + // The thread ID assigned to this thread by the operating system. If the OS does not + // support retrieving the tid, returns Thread::INVALID_TID. + int64_t tid() const { return tid_; } + + // Returns the thread's pthread ID. + pthread_t pthread_id() const { return thread_; } + + const std::string& name() const { return name_; } + const std::string& category() const { return category_; } + + // Return a string representation of the thread identifying information. + std::string ToString() const; + + // The current thread of execution, or NULL if the current thread isn't a kudu::Thread. + // This call is signal-safe. + static Thread* current_thread() { return tls_; } + + // Returns a unique, stable identifier for this thread. Note that this is a static + // method and thus can be used on any thread, including the main thread of the + // process. + // + // In general, this should be used when a value is required that is unique to + // a thread and must work on any thread including the main process thread. + // + // NOTE: this is _not_ the TID, but rather a unique value assigned by the + // thread implementation. So, this value should not be presented to the user + // in log messages, etc. + static int64_t UniqueThreadId() { +#if defined(__linux__) + // This cast is a little bit ugly, but it is significantly faster than + // calling syscall(SYS_gettid). In particular, this speeds up some code + // paths in the tracing implementation. + return static_cast<int64_t>(pthread_self()); +#elif defined(__APPLE__) + uint64_t tid; + CHECK_EQ(0, pthread_threadid_np(NULL, &tid)); + return tid; +#else +#error Unsupported platform +#endif + } + + // Returns the system thread ID (tid on Linux) for the current thread. Note + // that this is a static method and thus can be used from any thread, + // including the main thread of the process. This is in contrast to + // Thread::tid(), which only works on kudu::Threads. + // + // Thread::tid() will return the same value, but the value is cached in the + // Thread object, so will be faster to call. + // + // Thread::UniqueThreadId() (or Thread::tid()) should be preferred for + // performance sensistive code, however it is only guaranteed to return a + // unique and stable thread ID, not necessarily the system thread ID. + static int64_t CurrentThreadId() { +#if defined(__linux__) + return syscall(SYS_gettid); +#else + return UniqueThreadId(); +#endif + } + + private: + friend class ThreadJoiner; + + // The various special values for tid_ that describe the various steps + // in the parent<-->child handshake. + enum { + INVALID_TID = -1, + CHILD_WAITING_TID = -2, + PARENT_WAITING_TID = -3, + }; + + // Function object that wraps the user-supplied function to run in a separate thread. + typedef boost::function<void ()> ThreadFunctor; + + Thread(std::string category, std::string name, ThreadFunctor functor) + : thread_(0), + category_(std::move(category)), + name_(std::move(name)), + tid_(CHILD_WAITING_TID), + functor_(std::move(functor)), + done_(1), + joinable_(false) {} + + // Library-specific thread ID. + pthread_t thread_; + + // Name and category for this thread. + const std::string category_; + const std::string name_; + + // OS-specific thread ID. Once the constructor finishes StartThread(), + // guaranteed to be set either to a non-negative integer, or to INVALID_TID. + int64_t tid_; + + // User function to be executed by this thread. + const ThreadFunctor functor_; + + // Joiners wait on this latch to be notified if the thread is done. + // + // Note that Joiners must additionally pthread_join(), otherwise certain + // resources that callers expect to be destroyed (like TLS) may still be + // alive when a Joiner finishes. + CountDownLatch done_; + + bool joinable_; + + // Thread local pointer to the current thread of execution. Will be NULL if the current + // thread is not a Thread. + static __thread Thread* tls_; + + std::vector<Closure> exit_callbacks_; + + // Starts the thread running SuperviseThread(), and returns once that thread has + // initialised and its TID has been read. Waits for notification from the started + // thread that initialisation is complete before returning. On success, stores a + // reference to the thread in holder. + static Status StartThread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr<Thread>* holder); + + // Wrapper for the user-supplied function. Invoked from the new thread, + // with the Thread as its only argument. Executes functor_, but before + // doing so registers with the global ThreadMgr and reads the thread's + // system ID. After functor_ terminates, unregisters with the ThreadMgr. + // Always returns NULL. + // + // SuperviseThread() notifies StartThread() when thread initialisation is + // completed via the tid_, which is set to the new thread's system ID. + // By that point in time SuperviseThread() has also taken a reference to + // the Thread object, allowing it to safely refer to it even after the + // caller drops its reference. + // + // Additionally, StartThread() notifies SuperviseThread() when the actual + // Thread object has been assigned (SuperviseThread() is spinning during + // this time). Without this, the new thread may reference the actual + // Thread object before it has been assigned by StartThread(). See + // KUDU-11 for more details. + static void* SuperviseThread(void* arg); + + // Invoked when the user-supplied function finishes or in the case of an + // abrupt exit (i.e. pthread_exit()). Cleans up after SuperviseThread(). + static void FinishThread(void* arg); +}; + +// Registers /threadz with the debug webserver, and creates thread-tracking metrics under +// the given entity. If 'web' is NULL, does not register the path handler. +Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, + WebCallbackRegistry* web); +} // namespace kudu + +#endif /* KUDU_UTIL_THREAD_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread_restrictions.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread_restrictions.cc b/be/src/kudu/util/thread_restrictions.cc new file mode 100644 index 0000000..40372c1 --- /dev/null +++ b/be/src/kudu/util/thread_restrictions.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gperftools/heap-checker.h> + +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadlocal.h" +#include "kudu/util/thread_restrictions.h" + +#ifdef ENABLE_THREAD_RESTRICTIONS + +namespace kudu { + +namespace { + +struct LocalThreadRestrictions { + LocalThreadRestrictions() + : io_allowed(true), + wait_allowed(true), + singleton_allowed(true) { + } + + bool io_allowed; + bool wait_allowed; + bool singleton_allowed; +}; + +LocalThreadRestrictions* LoadTLS() { + // Disable leak check. LSAN sometimes gets false positives on thread locals. + // See: https://github.com/google/sanitizers/issues/757 + debug::ScopedLeakCheckDisabler d; + BLOCK_STATIC_THREAD_LOCAL(LocalThreadRestrictions, local_thread_restrictions); + return local_thread_restrictions; +} + +} // anonymous namespace + +bool ThreadRestrictions::SetIOAllowed(bool allowed) { + bool previous_allowed = LoadTLS()->io_allowed; + LoadTLS()->io_allowed = allowed; + return previous_allowed; +} + +void ThreadRestrictions::AssertIOAllowed() { + CHECK(LoadTLS()->io_allowed) + << "Function marked as IO-only was called from a thread that " + << "disallows IO! If this thread really should be allowed to " + << "make IO calls, adjust the call to " + << "kudu::ThreadRestrictions::SetIOAllowed() in this thread's " + << "startup. " + << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)"); +} + +bool ThreadRestrictions::SetWaitAllowed(bool allowed) { + bool previous_allowed = LoadTLS()->wait_allowed; + LoadTLS()->wait_allowed = allowed; + return previous_allowed; +} + +void ThreadRestrictions::AssertWaitAllowed() { + CHECK(LoadTLS()->wait_allowed) + << "Waiting is not allowed to be used on this thread to prevent " + << "server-wide latency aberrations and deadlocks. " + << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)"); +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread_restrictions.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread_restrictions.h b/be/src/kudu/util/thread_restrictions.h new file mode 100644 index 0000000..23f0cd5 --- /dev/null +++ b/be/src/kudu/util/thread_restrictions.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Some portions: Copyright (c) 2012, The Chromium Authors. +#ifndef KUDU_UTIL_THREAD_RESTRICTIONS_H +#define KUDU_UTIL_THREAD_RESTRICTIONS_H + +#include "kudu/gutil/macros.h" + +#ifndef NDEBUG +#define ENABLE_THREAD_RESTRICTIONS 1 +#endif + +namespace kudu { + +// Certain behavior is disallowed on certain threads. ThreadRestrictions helps +// enforce these rules. Examples of such rules: +// +// * Do not do blocking IO +// * Do not wait on synchronization variables or sleep +// +// Here's more about how the protection works: +// +// 1) If a thread should not be allowed to make IO calls, mark it: +// ThreadRestrictions::SetIOAllowed(false); +// By default, threads *are* allowed to make IO calls. +// In particular, threads like RPC reactors should never do blocking IO +// because it may stall other unrelated requests. +// +// 2) If a function makes a call that will go out to disk, check whether the +// current thread is allowed: +// ThreadRestrictions::AssertIOAllowed(); +// +// +// Style tip: where should you put AssertIOAllowed checks? It's best +// if you put them as close to the disk access as possible, at the +// lowest level. This rule is simple to follow and helps catch all +// callers. For example, if your function GoDoSomeBlockingDiskCall() +// only calls other functions in Kudu and doesn't access the underlying +// disk, you should go add the AssertIOAllowed checks in the helper functions. +class ThreadRestrictions { + public: + // Constructing a ScopedAllowIO temporarily allows IO for the current + // thread. Doing this is almost certainly always incorrect, but sometimes + // it makes more sense to allow an exception and file a bug in the backlog + // to improve it later. + class ScopedAllowIO { + public: + ScopedAllowIO() { previous_value_ = SetIOAllowed(true); } + ~ScopedAllowIO() { SetIOAllowed(previous_value_); } + private: + // Whether IO is allowed when the ScopedAllowIO was constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO); + }; + + // Constructing a ScopedAllowWait temporarily allows waiting on the current + // thread. Doing this is almost always incorrect: consider carefully whether + // you should instead be deferring work to a different thread. + class ScopedAllowWait { + public: + ScopedAllowWait() { previous_value_ = SetWaitAllowed(true); } + ~ScopedAllowWait() { SetWaitAllowed(previous_value_); } + private: + // Whether singleton use is allowed when the ScopedAllowWait was + // constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowWait); + }; + + +#if ENABLE_THREAD_RESTRICTIONS + // Set whether the current thread to make IO calls. + // Threads start out in the *allowed* state. + // Returns the previous value. + static bool SetIOAllowed(bool allowed); + + // Check whether the current thread is allowed to make IO calls, + // and FATALs if not. See the block comment above the class for + // a discussion of where to add these checks. + static void AssertIOAllowed(); + + // Set whether the current thread may wait/block. Returns the previous + // value. + static bool SetWaitAllowed(bool allowed); + + // Check whether the current thread is allowed to wait/block. + // FATALs if not. + static void AssertWaitAllowed(); +#else + // Inline the empty definitions of these functions so that they can be + // compiled out. + static bool SetIOAllowed(bool allowed) { return true; } + static void AssertIOAllowed() {} + static bool SetWaitAllowed(bool allowed) { return true; } + static void AssertWaitAllowed() {} +#endif + + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(ThreadRestrictions); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_THREAD_RESTRICTIONS_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal.cc b/be/src/kudu/util/threadlocal.cc new file mode 100644 index 0000000..11e8e33 --- /dev/null +++ b/be/src/kudu/util/threadlocal.cc @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/threadlocal.h" + +#include <pthread.h> + +#include <glog/logging.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/errno.h" + +namespace kudu { +namespace threadlocal { +namespace internal { + +// One key used by the entire process to attach destructors on thread exit. +static pthread_key_t destructors_key; + +// The above key must only be initialized once per process. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +// Call all the destructors associated with all THREAD_LOCAL instances in this +// thread. +static void InvokeDestructors(void* t) { + PerThreadDestructorList* d = reinterpret_cast<PerThreadDestructorList*>(t); + while (d != nullptr) { + d->destructor(d->arg); + PerThreadDestructorList* next = d->next; + delete d; + d = next; + } +} + +// This key must be initialized only once. +static void CreateKey() { + int ret = pthread_key_create(&destructors_key, &InvokeDestructors); + // Linux supports up to 1024 keys, we will use only one for all thread locals. + CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: " + << "error " << ret << ": " << ErrnoToString(ret); +} + +// Adds a destructor to the list. +void AddDestructor(PerThreadDestructorList* p) { + GoogleOnceInit(&once, &CreateKey); + + // Returns NULL if nothing is set yet. + p->next = reinterpret_cast<PerThreadDestructorList*>(pthread_getspecific(destructors_key)); + int ret = pthread_setspecific(destructors_key, p); + // The only time this check should fail is if we are out of memory, or if + // somehow key creation failed, which should be caught by the above CHECK. + CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: " + << "error " << ret << ": " << ErrnoToString(ret); +} + +} // namespace internal +} // namespace threadlocal +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal.h b/be/src/kudu/util/threadlocal.h new file mode 100644 index 0000000..2380487 --- /dev/null +++ b/be/src/kudu/util/threadlocal.h @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THREADLOCAL_H_ +#define KUDU_UTIL_THREADLOCAL_H_ + +// Block-scoped static thread local implementation. +// +// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro +// defines a thread-local pointer to the specified type, which is lazily +// instantiated by any thread entering the block for the first time. The +// constructor for the type T is invoked at macro execution time, as expected, +// and its destructor is invoked when the corresponding thread's Runnable +// returns, or when the thread exits. +// +// Inspired by Poco <http://pocoproject.org/docs/Poco.ThreadLocal.html>, +// Andrew Tomazos <http://stackoverflow.com/questions/12049684/>, and +// the C++11 thread_local API. +// +// Example usage: +// +// // Invokes a 3-arg constructor on SomeClass: +// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3); +// instance->DoSomething(); +// +#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \ +static __thread T* t; \ +do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + threadlocal::internal::PerThreadDestructorList* dtor_list = \ + new threadlocal::internal::PerThreadDestructorList(); \ + dtor_list->destructor = threadlocal::internal::Destroy<T>; \ + dtor_list->arg = t; \ + threadlocal::internal::AddDestructor(dtor_list); \ + } \ +} while (false) + +// Class-scoped static thread local implementation. +// +// Very similar in implementation to the above block-scoped version, but +// requires a bit more syntax and vigilance to use properly. +// +// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the +// class header, as usual for variable declarations. +// +// Because these variables are static, they must also be defined in the impl +// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_), +// which is very much like defining any static member, i.e. int Foo::member_. +// +// Finally, each thread must initialize the instance before using it by calling +// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap +// call, and may be invoked at the top of any method which may reference a +// thread-local variable. +// +// Due to all of these requirements, you should probably declare TLS members +// as private. +// +// Example usage: +// +// // foo.h +// #include "kudu/utils/file.h" +// class Foo { +// public: +// void DoSomething(std::string s); +// private: +// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_); +// }; +// +// // foo.cc +// #include "kudu/foo.h" +// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_); +// void Foo::WriteToFile(std::string s) { +// // Call constructor if necessary. +// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt"); +// file_->Write(s); +// } + +// Goes in the class declaration (usually in a header file). +// dtor must be destructed _after_ t, so it gets defined first. +// Uses a mangled variable name for dtor since it must also be a member of the +// class. +#define DECLARE_STATIC_THREAD_LOCAL(T, t) \ +static __thread T* t + +// You must also define the instance in the .cc file. +#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) \ +__thread T* Class::t + +// Must be invoked at least once by each thread that will access t. +#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \ +do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + threadlocal::internal::PerThreadDestructorList* dtor_list = \ + new threadlocal::internal::PerThreadDestructorList(); \ + dtor_list->destructor = threadlocal::internal::Destroy<T>; \ + dtor_list->arg = t; \ + threadlocal::internal::AddDestructor(dtor_list); \ + } \ +} while (false) + +// Internal implementation below. + +namespace kudu { +namespace threadlocal { +namespace internal { + +// List of destructors for all thread locals instantiated on a given thread. +struct PerThreadDestructorList { + void (*destructor)(void*); + void* arg; + PerThreadDestructorList* next; +}; + +// Add a destructor to the list. +void AddDestructor(PerThreadDestructorList* p); + +// Destroy the passed object of type T. +template<class T> +static void Destroy(void* t) { + // With tcmalloc, this should be pretty cheap (same thread as new). + delete reinterpret_cast<T*>(t); +} + +} // namespace internal +} // namespace threadlocal +} // namespace kudu + +#endif // KUDU_UTIL_THREADLOCAL_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadlocal_cache.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal_cache.h b/be/src/kudu/util/threadlocal_cache.h new file mode 100644 index 0000000..e9ab3c2 --- /dev/null +++ b/be/src/kudu/util/threadlocal_cache.h @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "kudu/util/threadlocal.h" + +#include <boost/optional/optional.hpp> +#include <array> +#include <memory> +#include <utility> + +namespace kudu { + +// A small thread-local cache for arbitrary objects. +// +// This can be used as a contention-free "lookaside" type cache for frequently-accessed +// objects to avoid having to go to a less-efficient centralized cache. +// +// 'Key' must be copyable, and comparable using operator==(). +// 'T' has no particular requirements. +template<class Key, class T> +class ThreadLocalCache { + public: + // The number of entries in the cache. + // NOTE: this should always be a power of two for good performance, so that the + // compiler can optimize the modulo operations into bit-mask operations. + static constexpr int kItemCapacity = 4; + + // Look up a key in the cache. Returns either the existing entry with this key, + // or nullptr if no entry matched. + T* Lookup(const Key& key) { + // Our cache is so small that a linear search is likely to be more efficient than + // any kind of actual hashing. We always start the search at wherever we most + // recently found a hit. + for (int i = 0; i < kItemCapacity; i++) { + int idx = (last_hit_ + i) % kItemCapacity; + auto& p = cache_[idx]; + if (p.first == key) { + last_hit_ = idx; + return p.second.get_ptr(); + } + } + return nullptr; + } + + // Insert a new entry into the cache. If the cache is full (as it usually is in the + // steady state), this replaces one of the existing entries. The 'args' are forwarded + // to T's constructor. + // + // NOTE: entries returned by a previous call to Lookup() may possibly be invalidated + // by this function. + template<typename ... Args> + T* EmplaceNew(const Key& key, Args&&... args) { + auto& p = cache_[next_slot_++ % kItemCapacity]; + p.second.emplace(std::forward<Args>(args)...); + p.first = key; + return p.second.get_ptr(); + } + + // Get the the cache instance for this thread, creating it if it has not yet been + // created. + // + // The instance is automatically deleted and any cached items destructed when the + // thread exits. + static ThreadLocalCache* GetInstance() { + INIT_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_); + return tl_instance_; + } + + private: + using EntryPair = std::pair<Key, boost::optional<T>>; + std::array<EntryPair, kItemCapacity> cache_; + + // The next slot that we will write into. We always modulo this by the capacity + // before use. + uint8_t next_slot_ = 0; + // The slot where we last got a cache hit, so we can start our search at the same + // spot, optimizing for the case of repeated lookups of the same hot element. + uint8_t last_hit_ = 0; + + static_assert(kItemCapacity <= 1 << (sizeof(next_slot_) * 8), + "next_slot_ must be large enough for capacity"); + static_assert(kItemCapacity <= 1 << (sizeof(last_hit_) * 8), + "last_hit_ must be large enough for capacity"); + + DECLARE_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_); +}; + +// Define the thread-local storage for the ThreadLocalCache template. +// We can't use DEFINE_STATIC_THREAD_LOCAL here because the commas in the +// template arguments confuse the C preprocessor. +template<class K, class T> +__thread ThreadLocalCache<K,T>* ThreadLocalCache<K,T>::tl_instance_; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool-test.cc b/be/src/kudu/util/threadpool-test.cc new file mode 100644 index 0000000..6bd5826 --- /dev/null +++ b/be/src/kudu/util/threadpool-test.cc @@ -0,0 +1,367 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <memory> +#include <string> + +#include <boost/bind.hpp> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/bind.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/metrics.h" +#include "kudu/util/promise.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/threadpool.h" +#include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" +#include "kudu/util/trace.h" + +using std::shared_ptr; + +namespace kudu { + +static const char* kDefaultPoolName = "test"; + +class ThreadPoolTest : public KuduTest { + public: + + virtual void SetUp() override { + KuduTest::SetUp(); + ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_)); + } + + Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) { + return builder.Build(&pool_); + } + + Status RebuildPoolWithMinMax(int min_threads, int max_threads) { + return ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .Build(&pool_); + } + + protected: + gscoped_ptr<ThreadPool> pool_; +}; + +TEST_F(ThreadPoolTest, TestNoTaskOpenClose) { + ASSERT_OK(RebuildPoolWithMinMax(4, 4)); + pool_->Shutdown(); +} + +static void SimpleTaskMethod(int n, Atomic32 *counter) { + while (n--) { + base::subtle::NoBarrier_AtomicIncrement(counter, 1); + boost::detail::yield(n); + } +} + +class SimpleTask : public Runnable { + public: + SimpleTask(int n, Atomic32 *counter) + : n_(n), counter_(counter) { + } + + void Run() OVERRIDE { + SimpleTaskMethod(n_, counter_); + } + + private: + int n_; + Atomic32 *counter_; +}; + +TEST_F(ThreadPoolTest, TestSimpleTasks) { + ASSERT_OK(RebuildPoolWithMinMax(4, 4)); + + Atomic32 counter(0); + std::shared_ptr<Runnable> task(new SimpleTask(15, &counter)); + + ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter))); + ASSERT_OK(pool_->Submit(task)); + ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter))); + ASSERT_OK(pool_->Submit(task)); + ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter))); + pool_->Wait(); + ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter)); + pool_->Shutdown(); +} + +static void IssueTraceStatement() { + TRACE("hello from task"); +} + +// Test that the thread-local trace is propagated to tasks +// submitted to the threadpool. +TEST_F(ThreadPoolTest, TestTracePropagation) { + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + + scoped_refptr<Trace> t(new Trace); + { + ADOPT_TRACE(t.get()); + ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement)); + } + pool_->Wait(); + ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task"); +} + +TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) { + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + pool_->Shutdown(); + Status s = pool_->SubmitFunc(&IssueTraceStatement); + ASSERT_EQ("Service unavailable: The pool has been shut down.", + s.ToString()); +} + +class SlowTask : public Runnable { + public: + explicit SlowTask(CountDownLatch* latch) + : latch_(latch) { + } + + void Run() OVERRIDE { + latch_->Wait(); + } + + private: + CountDownLatch* latch_; +}; + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + + // There are no threads to start with. + ASSERT_TRUE(pool_->num_threads_ == 0); + // We get up to 3 threads when submitting work. + CountDownLatch latch(1); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(2, pool_->num_threads_); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(3, pool_->num_threads_); + // The 4th piece of work gets queued. + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(3, pool_->num_threads_); + // Finish all work + latch.CountDown(); + pool_->Wait(); + ASSERT_EQ(0, pool_->active_threads_); + pool_->Shutdown(); + ASSERT_EQ(0, pool_->num_threads_); +} + +// Regression test for a bug where a task is submitted exactly +// as a thread is about to exit. Previously this could hang forever. +TEST_F(ThreadPoolTest, TestRace) { + alarm(60); + auto cleanup = MakeScopedCleanup([]() { + alarm(0); // Disable alarm on test exit. + }); + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(1) + .set_idle_timeout(MonoDelta::FromMicroseconds(1)))); + + for (int i = 0; i < 500; i++) { + CountDownLatch l(1); + ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l))); + l.Wait(); + // Sleeping a different amount in each iteration makes it more likely to hit + // the bug. + SleepFor(MonoDelta::FromMicroseconds(i)); + } +} + +TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(4) + .set_idle_timeout(MonoDelta::FromMilliseconds(1)))); + + // There is 1 thread to start with. + ASSERT_EQ(1, pool_->num_threads_); + // We get up to 4 threads when submitting work. + CountDownLatch latch(1); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(1, pool_->num_threads_); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(2, pool_->num_threads_); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(3, pool_->num_threads_); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(4, pool_->num_threads_); + // The 5th piece of work gets queued. + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_EQ(4, pool_->num_threads_); + // Finish all work + latch.CountDown(); + pool_->Wait(); + ASSERT_EQ(0, pool_->active_threads_); + pool_->Shutdown(); + ASSERT_EQ(0, pool_->num_threads_); +} + +TEST_F(ThreadPoolTest, TestMaxQueueSize) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1))); + + CountDownLatch latch(1); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); + CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString(); + latch.CountDown(); + pool_->Wait(); + pool_->Shutdown(); +} + +// Test that when we specify a zero-sized queue, the maximum number of threads +// running is used for enforcement. +TEST_F(ThreadPoolTest, TestZeroQueueSize) { + const int kMaxThreads = 4; + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_queue_size(0) + .set_max_threads(kMaxThreads))); + + CountDownLatch latch(1); + for (int i = 0; i < kMaxThreads; i++) { + ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)))); + } + Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))); + ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString(); + ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity"); + latch.CountDown(); + pool_->Wait(); + pool_->Shutdown(); +} + +// Test that setting a promise from another thread yields +// a value on the current thread. +TEST_F(ThreadPoolTest, TestPromises) { + ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1))); + + Promise<int> my_promise; + ASSERT_OK(pool_->SubmitClosure( + Bind(&Promise<int>::Set, Unretained(&my_promise), 5))); + ASSERT_EQ(5, my_promise.Get()); + pool_->Shutdown(); +} + +METRIC_DEFINE_entity(test_entity); +METRIC_DEFINE_histogram(test_entity, queue_length, "queue length", + MetricUnit::kTasks, "queue length", 1000, 1); + +METRIC_DEFINE_histogram(test_entity, queue_time, "queue time", + MetricUnit::kMicroseconds, "queue time", 1000000, 1); + +METRIC_DEFINE_histogram(test_entity, run_time, "run time", + MetricUnit::kMicroseconds, "run time", 1000, 1); + +TEST_F(ThreadPoolTest, TestMetrics) { + MetricRegistry registry; + scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate( + ®istry, "test entity"); + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + + // Enable metrics for the thread pool. + scoped_refptr<Histogram> queue_length = METRIC_queue_length.Instantiate(entity); + scoped_refptr<Histogram> queue_time = METRIC_queue_time.Instantiate(entity); + scoped_refptr<Histogram> run_time = METRIC_run_time.Instantiate(entity); + pool_->SetQueueLengthHistogram(queue_length); + pool_->SetQueueTimeMicrosHistogram(queue_time); + pool_->SetRunTimeMicrosHistogram(run_time); + + int kNumItems = 500; + for (int i = 0; i < kNumItems; i++) { + ASSERT_OK(pool_->SubmitFunc(boost::bind(&usleep, i))); + } + + pool_->Wait(); + + // Check that all histograms were incremented once per submitted item. + ASSERT_EQ(kNumItems, queue_length->TotalCount()); + ASSERT_EQ(kNumItems, queue_time->TotalCount()); + ASSERT_EQ(kNumItems, run_time->TotalCount()); +} + +// Test that a thread pool will crash if asked to run its own blocking +// functions in a pool thread. +// +// In a multi-threaded application, TSAN is unsafe to use following a fork(). +// After a fork(), TSAN will: +// 1. Disable verification, expecting an exec() soon anyway, and +// 2. Die on future thread creation. +// For some reason, this test triggers behavior #2. We could disable it with +// the TSAN option die_after_fork=0, but this can (supposedly) lead to +// deadlocks, so we'll disable the entire test instead. +#ifndef THREAD_SANITIZER +TEST_F(ThreadPoolTest, TestDeadlocks) { + const char* death_msg = "called pool function that would result in deadlock"; + ASSERT_DEATH({ + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + ASSERT_OK(pool_->SubmitClosure( + Bind(&ThreadPool::Shutdown, Unretained(pool_.get())))); + pool_->Wait(); + }, death_msg); + + ASSERT_DEATH({ + ASSERT_OK(RebuildPoolWithMinMax(1, 1)); + ASSERT_OK(pool_->SubmitClosure( + Bind(&ThreadPool::Wait, Unretained(pool_.get())))); + pool_->Wait(); + }, death_msg); +} +#endif + +class SlowDestructorRunnable : public Runnable { + public: + void Run() override {} + + virtual ~SlowDestructorRunnable() { + SleepFor(MonoDelta::FromMilliseconds(100)); + } +}; + +// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks +// in the queue. +TEST_F(ThreadPoolTest, TestSlowDestructor) { + ASSERT_OK(RebuildPoolWithMinMax(1, 20)); + MonoTime start = MonoTime::Now(); + for (int i = 0; i < 100; i++) { + shared_ptr<Runnable> task(new SlowDestructorRunnable()); + ASSERT_OK(pool_->Submit(std::move(task))); + } + pool_->Wait(); + ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5); +} + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/threadpool.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadpool.cc b/be/src/kudu/util/threadpool.cc new file mode 100644 index 0000000..b3f4ddf --- /dev/null +++ b/be/src/kudu/util/threadpool.cc @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/threadpool.h" + +#include <boost/function.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <limits> +#include <string> + +#include "kudu/gutil/callback.h" +#include "kudu/gutil/stl_util.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/sysinfo.h" +#include "kudu/util/metrics.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/thread.h" +#include "kudu/util/trace.h" + +namespace kudu { + +using strings::Substitute; + +//////////////////////////////////////////////////////// +// FunctionRunnable +//////////////////////////////////////////////////////// + +class FunctionRunnable : public Runnable { + public: + explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {} + + void Run() OVERRIDE { + func_(); + } + + private: + boost::function<void()> func_; +}; + +//////////////////////////////////////////////////////// +// ThreadPoolBuilder +//////////////////////////////////////////////////////// + +ThreadPoolBuilder::ThreadPoolBuilder(std::string name) + : name_(std::move(name)), + min_threads_(0), + max_threads_(base::NumCPUs()), + max_queue_size_(std::numeric_limits<int>::max()), + idle_timeout_(MonoDelta::FromMilliseconds(500)) {} + +ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix( + const std::string& prefix) { + trace_metric_prefix_ = prefix; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { + CHECK_GE(min_threads, 0); + min_threads_ = min_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { + CHECK_GT(max_threads, 0); + max_threads_ = max_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { + max_queue_size_ = max_queue_size; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { + idle_timeout_ = idle_timeout; + return *this; +} + +Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const { + pool->reset(new ThreadPool(*this)); + RETURN_NOT_OK((*pool)->Init()); + return Status::OK(); +} + +//////////////////////////////////////////////////////// +// ThreadPool +//////////////////////////////////////////////////////// + +ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) + : name_(builder.name_), + min_threads_(builder.min_threads_), + max_threads_(builder.max_threads_), + max_queue_size_(builder.max_queue_size_), + idle_timeout_(builder.idle_timeout_), + pool_status_(Status::Uninitialized("The pool was not initialized.")), + idle_cond_(&lock_), + no_threads_cond_(&lock_), + not_empty_(&lock_), + num_threads_(0), + active_threads_(0), + queue_size_(0) { + + string prefix = !builder.trace_metric_prefix_.empty() ? + builder.trace_metric_prefix_ : builder.name_; + + queue_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".queue_time_us"); + run_wall_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".run_wall_time_us"); + run_cpu_time_trace_metric_name_ = TraceMetrics::InternName( + prefix + ".run_cpu_time_us"); +} + +ThreadPool::~ThreadPool() { + Shutdown(); +} + +Status ThreadPool::Init() { + MutexLock unique_lock(lock_); + if (!pool_status_.IsUninitialized()) { + return Status::NotSupported("The thread pool is already initialized"); + } + pool_status_ = Status::OK(); + for (int i = 0; i < min_threads_; i++) { + Status status = CreateThreadUnlocked(); + if (!status.ok()) { + Shutdown(); + return status; + } + } + return Status::OK(); +} + +void ThreadPool::Shutdown() { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + + pool_status_ = Status::ServiceUnavailable("The pool has been shut down."); + + // Clear the queue_ member under the lock, but defer the releasing + // of the entries outside the lock, in case there are concurrent threads + // wanting to access the ThreadPool. The task's destructors may acquire + // locks, etc, so this also prevents lock inversions. + auto to_release = std::move(queue_); + queue_.clear(); + queue_size_ = 0; + not_empty_.Broadcast(); + + // The Runnable doesn't have Abort() so we must wait + // and hopefully the abort is done outside before calling Shutdown(). + while (num_threads_ > 0) { + no_threads_cond_.Wait(); + } + + // Finally release the tasks that were in the queue, outside the lock. + unique_lock.Unlock(); + for (QueueEntry& e : to_release) { + if (e.trace) { + e.trace->Release(); + } + } +} + +Status ThreadPool::SubmitClosure(const Closure& task) { + // TODO: once all uses of boost::bind-based tasks are dead, implement this + // in a more straight-forward fashion. + return SubmitFunc(boost::bind(&Closure::Run, task)); +} + +Status ThreadPool::SubmitFunc(boost::function<void()> func) { + return Submit(std::shared_ptr<Runnable>(new FunctionRunnable(std::move(func)))); +} + +Status ThreadPool::Submit(std::shared_ptr<Runnable> task) { + MonoTime submit_time = MonoTime::Now(); + + MutexLock guard(lock_); + if (PREDICT_FALSE(!pool_status_.ok())) { + return pool_status_; + } + + // Size limit check. + int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ + + static_cast<int64_t>(max_queue_size_) - queue_size_; + if (capacity_remaining < 1) { + return Status::ServiceUnavailable( + Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", + num_threads_, max_threads_, queue_size_, max_queue_size_)); + } + + // Should we create another thread? + // We assume that each current inactive thread will grab one item from the + // queue. If it seems like we'll need another thread, we create one. + // In theory, a currently active thread could finish immediately after this + // calculation. This would mean we created a thread we didn't really need. + // However, this race is unavoidable, since we don't do the work under a lock. + // It's also harmless. + // + // Of course, we never create more than max_threads_ threads no matter what. + int inactive_threads = num_threads_ - active_threads_; + int additional_threads = (queue_size_ + 1) - inactive_threads; + if (additional_threads > 0 && num_threads_ < max_threads_) { + Status status = CreateThreadUnlocked(); + if (!status.ok()) { + if (num_threads_ == 0) { + // If we have no threads, we can't do any work. + return status; + } + // If we failed to create a thread, but there are still some other + // worker threads, log a warning message and continue. + LOG(ERROR) << "Thread pool failed to create thread: " + << status.ToString(); + } + } + + QueueEntry e; + e.runnable = std::move(task); + e.trace = Trace::CurrentTrace(); + // Need to AddRef, since the thread which submitted the task may go away, + // and we don't want the trace to be destructed while waiting in the queue. + if (e.trace) { + e.trace->AddRef(); + } + e.submit_time = submit_time; + + queue_.emplace_back(std::move(e)); + int length_at_submit = queue_size_++; + + guard.Unlock(); + not_empty_.Signal(); + + if (queue_length_histogram_) { + queue_length_histogram_->Increment(length_at_submit); + } + + return Status::OK(); +} + +void ThreadPool::Wait() { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + while ((!queue_.empty()) || (active_threads_ > 0)) { + idle_cond_.Wait(); + } +} + +bool ThreadPool::WaitUntil(const MonoTime& until) { + return WaitFor(until - MonoTime::Now()); +} + +bool ThreadPool::WaitFor(const MonoDelta& delta) { + MutexLock unique_lock(lock_); + CheckNotPoolThreadUnlocked(); + while ((!queue_.empty()) || (active_threads_ > 0)) { + if (!idle_cond_.TimedWait(delta)) { + return false; + } + } + return true; +} + + +void ThreadPool::SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist) { + queue_length_histogram_ = hist; +} + +void ThreadPool::SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) { + queue_time_us_histogram_ = hist; +} + +void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) { + run_time_us_histogram_ = hist; +} + +void ThreadPool::DispatchThread(bool permanent) { + MutexLock unique_lock(lock_); + while (true) { + // Note: Status::Aborted() is used to indicate normal shutdown. + if (!pool_status_.ok()) { + VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString(); + break; + } + + if (queue_.empty()) { + if (permanent) { + not_empty_.Wait(); + } else { + if (!not_empty_.TimedWait(idle_timeout_)) { + // After much investigation, it appears that pthread condition variables have + // a weird behavior in which they can return ETIMEDOUT from timed_wait even if + // another thread did in fact signal. Apparently after a timeout there is some + // brief period during which another thread may actually grab the internal mutex + // protecting the state, signal, and release again before we get the mutex. So, + // we'll recheck the empty queue case regardless. + if (queue_.empty()) { + VLOG(3) << "Releasing worker thread from pool " << name_ << " after " + << idle_timeout_.ToMilliseconds() << "ms of idle time."; + break; + } + } + } + continue; + } + + // Fetch a pending task + QueueEntry entry = std::move(queue_.front()); + queue_.pop_front(); + queue_size_--; + ++active_threads_; + + unique_lock.Unlock(); + + // Release the reference which was held by the queued item. + ADOPT_TRACE(entry.trace); + if (entry.trace) { + entry.trace->Release(); + } + + // Update metrics + MonoTime now(MonoTime::Now()); + int64_t queue_time_us = (now - entry.submit_time).ToMicroseconds(); + TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us); + if (queue_time_us_histogram_) { + queue_time_us_histogram_->Increment(queue_time_us); + } + + // Execute the task + { + MicrosecondsInt64 start_wall_us = GetMonoTimeMicros(); + MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros(); + + entry.runnable->Run(); + + int64_t wall_us = GetMonoTimeMicros() - start_wall_us; + int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us; + + if (run_time_us_histogram_) { + run_time_us_histogram_->Increment(wall_us); + } + TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us); + TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us); + } + // Destruct the task while we do not hold the lock. + // + // The task's destructor may be expensive if it has a lot of bound + // objects, and we don't want to block submission of the threadpool. + // In the worst case, the destructor might even try to do something + // with this threadpool, and produce a deadlock. + entry.runnable.reset(); + unique_lock.Lock(); + + if (--active_threads_ == 0) { + idle_cond_.Broadcast(); + } + } + + // It's important that we hold the lock between exiting the loop and dropping + // num_threads_. Otherwise it's possible someone else could come along here + // and add a new task just as the last running thread is about to exit. + CHECK(unique_lock.OwnsLock()); + + CHECK_EQ(threads_.erase(Thread::current_thread()), 1); + if (--num_threads_ == 0) { + no_threads_cond_.Broadcast(); + + // Sanity check: if we're the last thread exiting, the queue ought to be + // empty. Otherwise it will never get processed. + CHECK(queue_.empty()); + DCHECK_EQ(0, queue_size_); + } +} + +Status ThreadPool::CreateThreadUnlocked() { + // The first few threads are permanent, and do not time out. + bool permanent = (num_threads_ < min_threads_); + scoped_refptr<Thread> t; + Status s = kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_), + &ThreadPool::DispatchThread, this, permanent, &t); + if (s.ok()) { + InsertOrDie(&threads_, t.get()); + num_threads_++; + } + return s; +} + +void ThreadPool::CheckNotPoolThreadUnlocked() { + Thread* current = Thread::current_thread(); + if (ContainsKey(threads_, current)) { + LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " + "name '$1' called pool function that would result in deadlock", + name_, current->name()); + } +} + +} // namespace kudu
