http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread.cc b/be/src/kudu/util/thread.cc new file mode 100644 index 0000000..4abc7c1 --- /dev/null +++ b/be/src/kudu/util/thread.cc @@ -0,0 +1,628 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Copied from Impala and adapted to Kudu. + +#include "kudu/util/thread.h" + +#if defined(__linux__) +#include <sys/prctl.h> +#endif // defined(__linux__) +#include <sys/resource.h> +#include <unistd.h> + +#include <algorithm> +#include <cerrno> +#include <cstring> +#include <map> +#include <memory> +#include <sstream> +#include <unordered_map> +#include <utility> +#include <vector> + +#include <boost/bind.hpp> +#include <boost/smart_ptr/shared_ptr.hpp> +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/bind.h" +#include "kudu/gutil/bind_helpers.h" +#include "kudu/gutil/dynamic_annotations.h" +#include "kudu/gutil/mathlimits.h" +#include "kudu/gutil/once.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/flag_tags.h" +#include "kudu/util/kernel_stack_watchdog.h" +#include "kudu/util/logging.h" +#include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" +#include "kudu/util/mutex.h" +#include "kudu/util/os-util.h" +#include "kudu/util/scoped_cleanup.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/trace.h" +#include "kudu/util/url-coding.h" +#include "kudu/util/web_callback_registry.h" + +using boost::bind; +using boost::mem_fn; +using std::endl; +using std::map; +using std::ostringstream; +using std::shared_ptr; +using std::string; +using std::vector; +using strings::Substitute; + +METRIC_DEFINE_gauge_uint64(server, threads_started, + "Threads Started", + kudu::MetricUnit::kThreads, + "Total number of threads started on this server", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, threads_running, + "Threads Running", + kudu::MetricUnit::kThreads, + "Current number of running threads"); + +METRIC_DEFINE_gauge_uint64(server, cpu_utime, + "User CPU Time", + kudu::MetricUnit::kMilliseconds, + "Total user CPU time of the process", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, cpu_stime, + "System CPU Time", + kudu::MetricUnit::kMilliseconds, + "Total system CPU time of the process", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches, + "Voluntary Context Switches", + kudu::MetricUnit::kContextSwitches, + "Total voluntary context switches", + kudu::EXPOSE_AS_COUNTER); + +METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches, + "Involuntary Context Switches", + kudu::MetricUnit::kContextSwitches, + "Total involuntary context switches", + kudu::EXPOSE_AS_COUNTER); + +DEFINE_int32(thread_inject_start_latency_ms, 0, + "Number of ms to sleep when starting a new thread. (For tests)."); +TAG_FLAG(thread_inject_start_latency_ms, hidden); +TAG_FLAG(thread_inject_start_latency_ms, unsafe); + +namespace kudu { + +static uint64_t GetCpuUTime() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL; +} + +static uint64_t GetCpuSTime() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL; +} + +static uint64_t GetVoluntaryContextSwitches() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_nvcsw;; +} + +static uint64_t GetInVoluntaryContextSwitches() { + rusage ru; + CHECK_ERR(getrusage(RUSAGE_SELF, &ru)); + return ru.ru_nivcsw; +} + +class ThreadMgr; + +__thread Thread* Thread::tls_ = NULL; + +// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. +// The Thread class adds a reference to thread_manager while it is supervising a thread so +// that a race between the end of the process's main thread (and therefore the destruction +// of thread_manager) and the end of a thread that tries to remove itself from the +// manager after the destruction can be avoided. +static shared_ptr<ThreadMgr> thread_manager; + +// Controls the single (lazy) initialization of thread_manager. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +// A singleton class that tracks all live threads, and groups them together for easy +// auditing. Used only by Thread. +class ThreadMgr { + public: + ThreadMgr() + : threads_started_metric_(0), + threads_running_metric_(0) { + } + + ~ThreadMgr() { + MutexLock l(lock_); + thread_categories_.clear(); + } + + static void SetThreadName(const std::string& name, int64_t tid); + + Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, WebCallbackRegistry* web); + + // Registers a thread to the supplied category. The key is a pthread_t, + // not the system TID, since pthread_t is less prone to being recycled. + void AddThread(const pthread_t& pthread_id, const string& name, const string& category, + int64_t tid); + + // Removes a thread from the supplied category. If the thread has + // already been removed, this is a no-op. + void RemoveThread(const pthread_t& pthread_id, const string& category); + + private: + // Container class for any details we want to capture about a thread + // TODO: Add start-time. + // TODO: Track fragment ID. + class ThreadDescriptor { + public: + ThreadDescriptor() { } + ThreadDescriptor(string category, string name, int64_t thread_id) + : name_(std::move(name)), + category_(std::move(category)), + thread_id_(thread_id) {} + + const string& name() const { return name_; } + const string& category() const { return category_; } + int64_t thread_id() const { return thread_id_; } + + private: + string name_; + string category_; + int64_t thread_id_; + }; + + // A ThreadCategory is a set of threads that are logically related. + // TODO: unordered_map is incompatible with pthread_t, but would be more + // efficient here. + typedef map<const pthread_t, ThreadDescriptor> ThreadCategory; + + // All thread categorys, keyed on the category name. + typedef map<string, ThreadCategory> ThreadCategoryMap; + + // Protects thread_categories_ and thread metrics. + Mutex lock_; + + // All thread categorys that ever contained a thread, even if empty + ThreadCategoryMap thread_categories_; + + // Counters to track all-time total number of threads, and the + // current number of running threads. + uint64_t threads_started_metric_; + uint64_t threads_running_metric_; + + // Metric callbacks. + uint64_t ReadThreadsStarted(); + uint64_t ReadThreadsRunning(); + + // Webpage callback; prints all threads by category. + void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, + WebCallbackRegistry::PrerenderedWebResponse* resp); + void PrintThreadCategoryRows(const ThreadCategory& category, ostringstream* output); +}; + +void ThreadMgr::SetThreadName(const string& name, int64_t tid) { + // On linux we can get the thread names to show up in the debugger by setting + // the process name for the LWP. We don't want to do this for the main + // thread because that would rename the process, causing tools like killall + // to stop working. + if (tid == getpid()) { + return; + } + +#if defined(__linux__) + // http://0pointer.de/blog/projects/name-your-threads.html + // Set the name for the LWP (which gets truncated to 15 characters). + // Note that glibc also has a 'pthread_setname_np' api, but it may not be + // available everywhere and it's only benefit over using prctl directly is + // that it can set the name of threads other than the current thread. + int err = prctl(PR_SET_NAME, name.c_str()); +#else + int err = pthread_setname_np(name.c_str()); +#endif // defined(__linux__) + // We expect EPERM failures in sandboxed processes, just ignore those. + if (err < 0 && errno != EPERM) { + PLOG(ERROR) << "SetThreadName"; + } +} + +Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics, + WebCallbackRegistry* web) { + MutexLock l(lock_); + + // Use function gauges here so that we can register a unique copy of these metrics in + // multiple tservers, even though the ThreadMgr is itself a singleton. + metrics->NeverRetire( + METRIC_threads_started.InstantiateFunctionGauge(metrics, + Bind(&ThreadMgr::ReadThreadsStarted, Unretained(this)))); + metrics->NeverRetire( + METRIC_threads_running.InstantiateFunctionGauge(metrics, + Bind(&ThreadMgr::ReadThreadsRunning, Unretained(this)))); + metrics->NeverRetire( + METRIC_cpu_utime.InstantiateFunctionGauge(metrics, + Bind(&GetCpuUTime))); + metrics->NeverRetire( + METRIC_cpu_stime.InstantiateFunctionGauge(metrics, + Bind(&GetCpuSTime))); + metrics->NeverRetire( + METRIC_voluntary_context_switches.InstantiateFunctionGauge(metrics, + Bind(&GetVoluntaryContextSwitches))); + metrics->NeverRetire( + METRIC_involuntary_context_switches.InstantiateFunctionGauge(metrics, + Bind(&GetInVoluntaryContextSwitches))); + + if (web) { + WebCallbackRegistry::PrerenderedPathHandlerCallback thread_callback = + bind<void>(mem_fn(&ThreadMgr::ThreadPathHandler), this, _1, _2); + DCHECK_NOTNULL(web)->RegisterPrerenderedPathHandler("/threadz", "Threads", thread_callback, + true /* is_styled*/, + true /* is_on_nav_bar */); + } + return Status::OK(); +} + +uint64_t ThreadMgr::ReadThreadsStarted() { + MutexLock l(lock_); + return threads_started_metric_; +} + +uint64_t ThreadMgr::ReadThreadsRunning() { + MutexLock l(lock_); + return threads_running_metric_; +} + +void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name, + const string& category, int64_t tid) { + // These annotations cause TSAN to ignore the synchronization on lock_ + // without causing the subsequent mutations to be treated as data races + // in and of themselves (that's what IGNORE_READS_AND_WRITES does). + // + // Why do we need them here and in SuperviseThread()? TSAN operates by + // observing synchronization events and using them to establish "happens + // before" relationships between threads. Where these relationships are + // not built, shared state access constitutes a data race. The + // synchronization events here, in RemoveThread(), and in + // SuperviseThread() may cause TSAN to establish a "happens before" + // relationship between thread functors, ignoring potential data races. + // The annotations prevent this from happening. + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(lock_); + thread_categories_[category][pthread_id] = ThreadDescriptor(category, name, tid); + threads_running_metric_++; + threads_started_metric_++; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) { + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(lock_); + auto category_it = thread_categories_.find(category); + DCHECK(category_it != thread_categories_.end()); + category_it->second.erase(pthread_id); + threads_running_metric_--; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::PrintThreadCategoryRows(const ThreadCategory& category, + ostringstream* output) { + for (const ThreadCategory::value_type& thread : category) { + ThreadStats stats; + Status status = GetThreadStats(thread.second.thread_id(), &stats); + if (!status.ok()) { + KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: " + << status.ToString(); + } + (*output) << "<tr><td>" << thread.second.name() << "</td><td>" + << (static_cast<double>(stats.user_ns) / 1e9) << "</td><td>" + << (static_cast<double>(stats.kernel_ns) / 1e9) << "</td><td>" + << (static_cast<double>(stats.iowait_ns) / 1e9) << "</td></tr>"; + } +} + +void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req, + WebCallbackRegistry::PrerenderedWebResponse* resp) { + ostringstream* output = resp->output; + MutexLock l(lock_); + vector<const ThreadCategory*> categories_to_print; + auto category_name = req.parsed_args.find("group"); + if (category_name != req.parsed_args.end()) { + string group = EscapeForHtmlToString(category_name->second); + (*output) << "<h2>Thread Group: " << group << "</h2>" << endl; + if (group != "all") { + ThreadCategoryMap::const_iterator category = thread_categories_.find(group); + if (category == thread_categories_.end()) { + (*output) << "Thread group '" << group << "' not found" << endl; + return; + } + categories_to_print.push_back(&category->second); + (*output) << "<h3>" << category->first << " : " << category->second.size() + << "</h3>"; + } else { + for (const ThreadCategoryMap::value_type& category : thread_categories_) { + categories_to_print.push_back(&category.second); + } + (*output) << "<h3>All Threads : </h3>"; + } + + (*output) << "<table class='table table-hover table-border'>"; + (*output) << "<thead><tr><th>Thread name</th><th>Cumulative User CPU(s)</th>" + << "<th>Cumulative Kernel CPU(s)</th>" + << "<th>Cumulative IO-wait(s)</th></tr></thead>"; + (*output) << "<tbody>\n"; + + for (const ThreadCategory* category : categories_to_print) { + PrintThreadCategoryRows(*category, output); + } + (*output) << "</tbody></table>"; + } else { + (*output) << "<h2>Thread Groups</h2>"; + (*output) << "<h4>" << threads_running_metric_ << " thread(s) running"; + (*output) << "<a href='/threadz?group=all'><h3>All Threads</h3>"; + + for (const ThreadCategoryMap::value_type& category : thread_categories_) { + string category_arg; + UrlEncode(category.first, &category_arg); + (*output) << "<a href='/threadz?group=" << category_arg << "'><h3>" + << category.first << " : " << category.second.size() << "</h3></a>"; + } + } +} + +static void InitThreading() { + thread_manager.reset(new ThreadMgr()); +} + +Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, + WebCallbackRegistry* web) { + GoogleOnceInit(&once, &InitThreading); + return thread_manager->StartInstrumentation(server_metrics, web); +} + +ThreadJoiner::ThreadJoiner(Thread* thr) + : thread_(CHECK_NOTNULL(thr)), + warn_after_ms_(kDefaultWarnAfterMs), + warn_every_ms_(kDefaultWarnEveryMs), + give_up_after_ms_(kDefaultGiveUpAfterMs) { +} + +ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { + warn_after_ms_ = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) { + warn_every_ms_ = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { + give_up_after_ms_ = ms; + return *this; +} + +Status ThreadJoiner::Join() { + if (Thread::current_thread() && + Thread::current_thread()->tid() == thread_->tid()) { + return Status::InvalidArgument("Can't join on own thread", thread_->name_); + } + + // Early exit: double join is a no-op. + if (!thread_->joinable_) { + return Status::OK(); + } + + int waited_ms = 0; + bool keep_trying = true; + while (keep_trying) { + if (waited_ms >= warn_after_ms_) { + LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid $2)", + waited_ms, thread_->name_, thread_->tid_); + } + + int remaining_before_giveup = MathLimits<int>::kMax; + if (give_up_after_ms_ != -1) { + remaining_before_giveup = give_up_after_ms_ - waited_ms; + } + + int remaining_before_next_warn = warn_every_ms_; + if (waited_ms < warn_after_ms_) { + remaining_before_next_warn = warn_after_ms_ - waited_ms; + } + + if (remaining_before_giveup < remaining_before_next_warn) { + keep_trying = false; + } + + int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); + + if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) { + // Unconditionally join before returning, to guarantee that any TLS + // has been destroyed (pthread_key_create() destructors only run + // after a pthread's user method has returned). + int ret = pthread_join(thread_->thread_, NULL); + CHECK_EQ(ret, 0); + thread_->joinable_ = false; + return Status::OK(); + } + waited_ms += wait_for; + } + return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", + waited_ms, thread_->name_)); +} + +Thread::~Thread() { + if (joinable_) { + int ret = pthread_detach(thread_); + CHECK_EQ(ret, 0); + } +} + +std::string Thread::ToString() const { + return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_); +} + +int64_t Thread::WaitForTid() const { + const string log_prefix = Substitute("$0 ($1) ", name_, category_); + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, + "waiting for new thread to publish its TID"); + int loop_count = 0; + while (true) { + int64_t t = Acquire_Load(&tid_); + if (t != PARENT_WAITING_TID) return t; + boost::detail::yield(loop_count++); + } +} + + +Status Thread::StartThread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr<Thread> *holder) { + TRACE_COUNTER_INCREMENT("threads_started", 1); + TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us"); + GoogleOnceInit(&once, &InitThreading); + + const string log_prefix = Substitute("$0 ($1) ", name, category); + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread"); + + // Temporary reference for the duration of this function. + scoped_refptr<Thread> t(new Thread(category, name, functor)); + + // Optional, and only set if the thread was successfully created. + // + // We have to set this before we even start the thread because it's + // allowed for the thread functor to access 'holder'. + if (holder) { + *holder = t; + } + + t->tid_ = PARENT_WAITING_TID; + + // Add a reference count to the thread since SuperviseThread() needs to + // access the thread object, and we have no guarantee that our caller + // won't drop the reference as soon as we return. This is dereferenced + // in FinishThread(). + t->AddRef(); + + auto cleanup = MakeScopedCleanup([&]() { + // If we failed to create the thread, we need to undo all of our prep work. + t->tid_ = INVALID_TID; + t->Release(); + }); + + if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) { + LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start"; + SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms)); + } + + { + SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread"); + SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250); + int ret = pthread_create(&t->thread_, NULL, &Thread::SuperviseThread, t.get()); + if (ret) { + return Status::RuntimeError("Could not create thread", strerror(ret), ret); + } + } + + // The thread has been created and is now joinable. + // + // Why set this in the parent and not the child? Because only the parent + // (or someone communicating with the parent) can join, so joinable must + // be set before the parent returns. + t->joinable_ = true; + cleanup.cancel(); + + VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name; + return Status::OK(); +} + +void* Thread::SuperviseThread(void* arg) { + Thread* t = static_cast<Thread*>(arg); + int64_t system_tid = Thread::CurrentThreadId(); + PCHECK(system_tid != -1); + + // Take an additional reference to the thread manager, which we'll need below. + ANNOTATE_IGNORE_SYNC_BEGIN(); + shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager; + ANNOTATE_IGNORE_SYNC_END(); + + // Set up the TLS. + // + // We could store a scoped_refptr in the TLS itself, but as its + // lifecycle is poorly defined, we'll use a bare pointer. We + // already incremented the reference count in StartThread. + Thread::tls_ = t; + + // Publish our tid to 'tid_', which unblocks any callers waiting in + // WaitForTid(). + Release_Store(&t->tid_, system_tid); + + string name = strings::Substitute("$0-$1", t->name(), system_tid); + thread_manager->SetThreadName(name, t->tid_); + thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_); + + // FinishThread() is guaranteed to run (even if functor_ throws an + // exception) because pthread_cleanup_push() creates a scoped object + // whose destructor invokes the provided callback. + pthread_cleanup_push(&Thread::FinishThread, t); + t->functor_(); + pthread_cleanup_pop(true); + + return NULL; +} + +void Thread::FinishThread(void* arg) { + Thread* t = static_cast<Thread*>(arg); + + // We're here either because of the explicit pthread_cleanup_pop() in + // SuperviseThread() or through pthread_exit(). In either case, + // thread_manager is guaranteed to be live because thread_mgr_ref in + // SuperviseThread() is still live. + thread_manager->RemoveThread(pthread_self(), t->category()); + + // Signal any Joiner that we're done. + t->done_.CountDown(); + + VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category() << ":" << t->name(); + t->Release(); + // NOTE: the above 'Release' call could be the last reference to 'this', + // so 'this' could be destructed at this point. Do not add any code + // following here! +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread.h b/be/src/kudu/util/thread.h new file mode 100644 index 0000000..dd035f8 --- /dev/null +++ b/be/src/kudu/util/thread.h @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Copied from Impala and adapted to Kudu. + +#ifndef KUDU_UTIL_THREAD_H +#define KUDU_UTIL_THREAD_H + +#include <pthread.h> +#if defined(__linux__) +#include <syscall.h> +#else +#include <sys/syscall.h> +#endif +#include <unistd.h> + +#include <cstdint> +#include <string> +#include <utility> + +#include <boost/bind.hpp> // IWYU pragma: keep +#include <boost/function.hpp> // IWYU pragma: keep + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/status.h" + +namespace kudu { + +class MetricEntity; +class Thread; +class WebCallbackRegistry; + +// Utility to join on a thread, printing warning messages if it +// takes too long. For example: +// +// ThreadJoiner(&my_thread, "processing thread") +// .warn_after_ms(1000) +// .warn_every_ms(5000) +// .Join(); +// +// TODO: would be nice to offer a way to use ptrace() or signals to +// dump the stack trace of the thread we're trying to join on if it +// gets stuck. But, after looking for 20 minutes or so, it seems +// pretty complicated to get right. +class ThreadJoiner { + public: + explicit ThreadJoiner(Thread* thread); + + // Start emitting warnings after this many milliseconds. + // + // Default: 1000 ms. + ThreadJoiner& warn_after_ms(int ms); + + // After the warnings after started, emit another warning at the + // given interval. + // + // Default: 1000 ms. + ThreadJoiner& warn_every_ms(int ms); + + // If the thread has not stopped after this number of milliseconds, give up + // joining on it and return Status::Aborted. + // + // -1 (the default) means to wait forever trying to join. + ThreadJoiner& give_up_after_ms(int ms); + + // Join the thread, subject to the above parameters. If the thread joining + // fails for any reason, returns RuntimeError. If it times out, returns + // Aborted. + Status Join(); + + private: + enum { + kDefaultWarnAfterMs = 1000, + kDefaultWarnEveryMs = 1000, + kDefaultGiveUpAfterMs = -1 // forever + }; + + Thread* thread_; + + int warn_after_ms_; + int warn_every_ms_; + int give_up_after_ms_; + + DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); +}; + +// Thin wrapper around pthread that can register itself with the singleton ThreadMgr +// (a private class implemented in thread.cc entirely, which tracks all live threads so +// that they may be monitored via the debug webpages). This class has a limited subset of +// boost::thread's API. Construction is almost the same, but clients must supply a +// category and a name for each thread so that they can be identified in the debug web +// UI. Otherwise, Join() is the only supported method from boost::thread. +// +// Each Thread object knows its operating system thread ID (TID), which can be used to +// attach debuggers to specific threads, to retrieve resource-usage statistics from the +// operating system, and to assign threads to resource control groups. +// +// Threads are shared objects, but in a degenerate way. They may only have +// up to two referents: the caller that created the thread (parent), and +// the thread itself (child). Moreover, the only two methods to mutate state +// (Join() and the destructor) are constrained: the child may not Join() on +// itself, and the destructor is only run when there's one referent left. +// These constraints allow us to access thread internals without any locks. +class Thread : public RefCountedThreadSafe<Thread> { + public: + + // Flags passed to Thread::CreateWithFlags(). + enum CreateFlags { + NO_FLAGS = 0, + + // Disable the use of KernelStackWatchdog to detect and log slow + // thread creations. This is necessary when starting the kernel stack + // watchdog thread itself to avoid reentrancy. + NO_STACK_WATCHDOG = 1 << 0 + }; + + // This constructor pattern mimics that in boost::thread. There is + // one constructor for each number of arguments that the thread + // function accepts. To extend the set of acceptable signatures, add + // another constructor with <class F, class A1.... class An>. + // + // In general: + // - category: string identifying the thread category to which this thread belongs, + // used for organising threads together on the debug UI. + // - name: name of this thread. Will be appended with "-<thread-id>" to ensure + // uniqueness. + // - F - a method type that supports operator(), and the instance passed to the + // constructor is executed immediately in a separate thread. + // - A1...An - argument types whose instances are passed to f(...) + // - holder - optional shared pointer to hold a reference to the created thread. + template <class F> + static Status CreateWithFlags(const std::string& category, const std::string& name, + const F& f, uint64_t flags, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, f, flags, holder); + + } + template <class F> + static Status Create(const std::string& category, const std::string& name, const F& f, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, f, NO_FLAGS, holder); + } + + template <class F, class A1> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1), NO_FLAGS, holder); + } + + template <class F, class A1, class A2> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4, class A5> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder); + } + + template <class F, class A1, class A2, class A3, class A4, class A5, class A6> + static Status Create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + const A6& a6, scoped_refptr<Thread>* holder) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder); + } + + // Emulates boost::thread and detaches. + ~Thread(); + + // Blocks until this thread finishes execution. Once this method returns, the thread + // will be unregistered with the ThreadMgr and will not appear in the debug UI. + void Join() { ThreadJoiner(this).Join(); } + + // The thread ID assigned to this thread by the operating system. If the thread + // has not yet started running, returns INVALID_TID. + // + // NOTE: this may block for a short amount of time if the thread has just been + // started. + int64_t tid() const { + int64_t t = base::subtle::Acquire_Load(&tid_); + if (t != PARENT_WAITING_TID) { + return tid_; + } + return WaitForTid(); + } + + // Returns the thread's pthread ID. + pthread_t pthread_id() const { return thread_; } + + const std::string& name() const { return name_; } + const std::string& category() const { return category_; } + + // Return a string representation of the thread identifying information. + std::string ToString() const; + + // The current thread of execution, or NULL if the current thread isn't a kudu::Thread. + // This call is signal-safe. + static Thread* current_thread() { return tls_; } + + // Returns a unique, stable identifier for this thread. Note that this is a static + // method and thus can be used on any thread, including the main thread of the + // process. + // + // In general, this should be used when a value is required that is unique to + // a thread and must work on any thread including the main process thread. + // + // NOTE: this is _not_ the TID, but rather a unique value assigned by the + // thread implementation. So, this value should not be presented to the user + // in log messages, etc. + static int64_t UniqueThreadId() { +#if defined(__linux__) + // This cast is a little bit ugly, but it is significantly faster than + // calling syscall(SYS_gettid). In particular, this speeds up some code + // paths in the tracing implementation. + return static_cast<int64_t>(pthread_self()); +#elif defined(__APPLE__) + uint64_t tid; + CHECK_EQ(0, pthread_threadid_np(NULL, &tid)); + return tid; +#else +#error Unsupported platform +#endif + } + + // Returns the system thread ID (tid on Linux) for the current thread. Note + // that this is a static method and thus can be used from any thread, + // including the main thread of the process. This is in contrast to + // Thread::tid(), which only works on kudu::Threads. + // + // Thread::tid() will return the same value, but the value is cached in the + // Thread object, so will be faster to call. + // + // Thread::UniqueThreadId() (or Thread::tid()) should be preferred for + // performance sensistive code, however it is only guaranteed to return a + // unique and stable thread ID, not necessarily the system thread ID. + static int64_t CurrentThreadId() { +#if defined(__linux__) + return syscall(SYS_gettid); +#else + return UniqueThreadId(); +#endif + } + + private: + friend class ThreadJoiner; + + // See 'tid_' docs. + enum { + INVALID_TID = -1, + PARENT_WAITING_TID = -2, + }; + + // Function object that wraps the user-supplied function to run in a separate thread. + typedef boost::function<void ()> ThreadFunctor; + + Thread(std::string category, std::string name, ThreadFunctor functor) + : thread_(0), + category_(std::move(category)), + name_(std::move(name)), + tid_(INVALID_TID), + functor_(std::move(functor)), + done_(1), + joinable_(false) {} + + // Library-specific thread ID. + pthread_t thread_; + + // Name and category for this thread. + const std::string category_; + const std::string name_; + + // OS-specific thread ID. Once the constructor finishes StartThread(), + // guaranteed to be set either to a non-negative integer, or to INVALID_TID. + // + // The tid_ member goes through the following states: + // 1. INVALID_TID: the thread has not been started, or has already exited. + // 2. PARENT_WAITING_TID: the parent has started the thread, but the + // thread has not yet begun running. Therefore the TID is not yet known + // but it will be set once the thread starts. + // 3. <positive value>: the thread is running. + int64_t tid_; + + // User function to be executed by this thread. + const ThreadFunctor functor_; + + // Joiners wait on this latch to be notified if the thread is done. + // + // Note that Joiners must additionally pthread_join(), otherwise certain + // resources that callers expect to be destroyed (like TLS) may still be + // alive when a Joiner finishes. + CountDownLatch done_; + + bool joinable_; + + // Thread local pointer to the current thread of execution. Will be NULL if the current + // thread is not a Thread. + static __thread Thread* tls_; + + // Wait for the running thread to publish its tid. + int64_t WaitForTid() const; + + // Starts the thread running SuperviseThread(), and returns once that thread has + // initialised and its TID has been read. Waits for notification from the started + // thread that initialisation is complete before returning. On success, stores a + // reference to the thread in holder. + static Status StartThread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr<Thread>* holder); + + // Wrapper for the user-supplied function. Invoked from the new thread, + // with the Thread as its only argument. Executes functor_, but before + // doing so registers with the global ThreadMgr and reads the thread's + // system ID. After functor_ terminates, unregisters with the ThreadMgr. + // Always returns NULL. + // + // SuperviseThread() notifies StartThread() when thread initialisation is + // completed via the tid_, which is set to the new thread's system ID. + // By that point in time SuperviseThread() has also taken a reference to + // the Thread object, allowing it to safely refer to it even after the + // caller drops its reference. + // + // Additionally, StartThread() notifies SuperviseThread() when the actual + // Thread object has been assigned (SuperviseThread() is spinning during + // this time). Without this, the new thread may reference the actual + // Thread object before it has been assigned by StartThread(). See + // KUDU-11 for more details. + static void* SuperviseThread(void* arg); + + // Invoked when the user-supplied function finishes or in the case of an + // abrupt exit (i.e. pthread_exit()). Cleans up after SuperviseThread(). + static void FinishThread(void* arg); +}; + +// Registers /threadz with the debug webserver, and creates thread-tracking metrics under +// the given entity. If 'web' is NULL, does not register the path handler. +Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics, + WebCallbackRegistry* web); +} // namespace kudu + +#endif /* KUDU_UTIL_THREAD_H */ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread_restrictions.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread_restrictions.cc b/be/src/kudu/util/thread_restrictions.cc new file mode 100644 index 0000000..f956fd9 --- /dev/null +++ b/be/src/kudu/util/thread_restrictions.cc @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <ostream> +#include <string> + +#include <glog/logging.h> + +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadlocal.h" +#include "kudu/util/thread_restrictions.h" + +#ifdef ENABLE_THREAD_RESTRICTIONS + +namespace kudu { + +namespace { + +struct LocalThreadRestrictions { + LocalThreadRestrictions() + : io_allowed(true), + wait_allowed(true), + singleton_allowed(true) { + } + + bool io_allowed; + bool wait_allowed; + bool singleton_allowed; +}; + +LocalThreadRestrictions* LoadTLS() { + // Disable leak check. LSAN sometimes gets false positives on thread locals. + // See: https://github.com/google/sanitizers/issues/757 + debug::ScopedLeakCheckDisabler d; + BLOCK_STATIC_THREAD_LOCAL(LocalThreadRestrictions, local_thread_restrictions); + return local_thread_restrictions; +} + +} // anonymous namespace + +bool ThreadRestrictions::SetIOAllowed(bool allowed) { + bool previous_allowed = LoadTLS()->io_allowed; + LoadTLS()->io_allowed = allowed; + return previous_allowed; +} + +void ThreadRestrictions::AssertIOAllowed() { + CHECK(LoadTLS()->io_allowed) + << "Function marked as IO-only was called from a thread that " + << "disallows IO! If this thread really should be allowed to " + << "make IO calls, adjust the call to " + << "kudu::ThreadRestrictions::SetIOAllowed() in this thread's " + << "startup. " + << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)"); +} + +bool ThreadRestrictions::SetWaitAllowed(bool allowed) { + bool previous_allowed = LoadTLS()->wait_allowed; + LoadTLS()->wait_allowed = allowed; + return previous_allowed; +} + +void ThreadRestrictions::AssertWaitAllowed() { + CHECK(LoadTLS()->wait_allowed) + << "Waiting is not allowed to be used on this thread to prevent " + << "server-wide latency aberrations and deadlocks. " + << (Thread::current_thread() ? Thread::current_thread()->ToString() : "(not a kudu::Thread)"); +} + +} // namespace kudu + +#endif http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/thread_restrictions.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread_restrictions.h b/be/src/kudu/util/thread_restrictions.h new file mode 100644 index 0000000..23f0cd5 --- /dev/null +++ b/be/src/kudu/util/thread_restrictions.h @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Some portions: Copyright (c) 2012, The Chromium Authors. +#ifndef KUDU_UTIL_THREAD_RESTRICTIONS_H +#define KUDU_UTIL_THREAD_RESTRICTIONS_H + +#include "kudu/gutil/macros.h" + +#ifndef NDEBUG +#define ENABLE_THREAD_RESTRICTIONS 1 +#endif + +namespace kudu { + +// Certain behavior is disallowed on certain threads. ThreadRestrictions helps +// enforce these rules. Examples of such rules: +// +// * Do not do blocking IO +// * Do not wait on synchronization variables or sleep +// +// Here's more about how the protection works: +// +// 1) If a thread should not be allowed to make IO calls, mark it: +// ThreadRestrictions::SetIOAllowed(false); +// By default, threads *are* allowed to make IO calls. +// In particular, threads like RPC reactors should never do blocking IO +// because it may stall other unrelated requests. +// +// 2) If a function makes a call that will go out to disk, check whether the +// current thread is allowed: +// ThreadRestrictions::AssertIOAllowed(); +// +// +// Style tip: where should you put AssertIOAllowed checks? It's best +// if you put them as close to the disk access as possible, at the +// lowest level. This rule is simple to follow and helps catch all +// callers. For example, if your function GoDoSomeBlockingDiskCall() +// only calls other functions in Kudu and doesn't access the underlying +// disk, you should go add the AssertIOAllowed checks in the helper functions. +class ThreadRestrictions { + public: + // Constructing a ScopedAllowIO temporarily allows IO for the current + // thread. Doing this is almost certainly always incorrect, but sometimes + // it makes more sense to allow an exception and file a bug in the backlog + // to improve it later. + class ScopedAllowIO { + public: + ScopedAllowIO() { previous_value_ = SetIOAllowed(true); } + ~ScopedAllowIO() { SetIOAllowed(previous_value_); } + private: + // Whether IO is allowed when the ScopedAllowIO was constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowIO); + }; + + // Constructing a ScopedAllowWait temporarily allows waiting on the current + // thread. Doing this is almost always incorrect: consider carefully whether + // you should instead be deferring work to a different thread. + class ScopedAllowWait { + public: + ScopedAllowWait() { previous_value_ = SetWaitAllowed(true); } + ~ScopedAllowWait() { SetWaitAllowed(previous_value_); } + private: + // Whether singleton use is allowed when the ScopedAllowWait was + // constructed. + bool previous_value_; + + DISALLOW_COPY_AND_ASSIGN(ScopedAllowWait); + }; + + +#if ENABLE_THREAD_RESTRICTIONS + // Set whether the current thread to make IO calls. + // Threads start out in the *allowed* state. + // Returns the previous value. + static bool SetIOAllowed(bool allowed); + + // Check whether the current thread is allowed to make IO calls, + // and FATALs if not. See the block comment above the class for + // a discussion of where to add these checks. + static void AssertIOAllowed(); + + // Set whether the current thread may wait/block. Returns the previous + // value. + static bool SetWaitAllowed(bool allowed); + + // Check whether the current thread is allowed to wait/block. + // FATALs if not. + static void AssertWaitAllowed(); +#else + // Inline the empty definitions of these functions so that they can be + // compiled out. + static bool SetIOAllowed(bool allowed) { return true; } + static void AssertIOAllowed() {} + static bool SetWaitAllowed(bool allowed) { return true; } + static void AssertWaitAllowed() {} +#endif + + private: + DISALLOW_IMPLICIT_CONSTRUCTORS(ThreadRestrictions); +}; + +} // namespace kudu + +#endif /* KUDU_UTIL_THREAD_RESTRICTIONS_H */ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal.cc b/be/src/kudu/util/threadlocal.cc new file mode 100644 index 0000000..444ed15 --- /dev/null +++ b/be/src/kudu/util/threadlocal.cc @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/threadlocal.h" + +#include <memory> +#include <ostream> +#include <string> + +#include <pthread.h> + +#include <glog/logging.h> + +#include "kudu/gutil/once.h" +#include "kudu/util/errno.h" + +namespace kudu { +namespace threadlocal { +namespace internal { + +// One key used by the entire process to attach destructors on thread exit. +static pthread_key_t destructors_key; + +// The above key must only be initialized once per process. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +namespace { + +// List of destructors for all thread locals instantiated on a given thread. +struct PerThreadDestructorList { + void (*destructor)(void*); + void* arg; + PerThreadDestructorList* next; +}; + +} // anonymous namespace + +// Call all the destructors associated with all THREAD_LOCAL instances in this +// thread. +static void InvokeDestructors(void* t) { + PerThreadDestructorList* d = reinterpret_cast<PerThreadDestructorList*>(t); + while (d != nullptr) { + d->destructor(d->arg); + PerThreadDestructorList* next = d->next; + delete d; + d = next; + } +} + +// This key must be initialized only once. +static void CreateKey() { + int ret = pthread_key_create(&destructors_key, &InvokeDestructors); + // Linux supports up to 1024 keys, we will use only one for all thread locals. + CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: " + << "error " << ret << ": " << ErrnoToString(ret); +} + +// Adds a destructor to the list. +void AddDestructor(void (*destructor)(void*), void* arg) { + GoogleOnceInit(&once, &CreateKey); + + // Returns NULL if nothing is set yet. + std::unique_ptr<PerThreadDestructorList> p(new PerThreadDestructorList()); + p->destructor = destructor; + p->arg = arg; + p->next = reinterpret_cast<PerThreadDestructorList*>(pthread_getspecific(destructors_key)); + int ret = pthread_setspecific(destructors_key, p.release()); + // The only time this check should fail is if we are out of memory, or if + // somehow key creation failed, which should be caught by the above CHECK. + CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: " + << "error " << ret << ": " << ErrnoToString(ret); +} + +} // namespace internal +} // namespace threadlocal +} // namespace kudu http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal.h b/be/src/kudu/util/threadlocal.h new file mode 100644 index 0000000..ebe1910 --- /dev/null +++ b/be/src/kudu/util/threadlocal.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_THREADLOCAL_H_ +#define KUDU_UTIL_THREADLOCAL_H_ + +// Block-scoped static thread local implementation. +// +// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro +// defines a thread-local pointer to the specified type, which is lazily +// instantiated by any thread entering the block for the first time. The +// constructor for the type T is invoked at macro execution time, as expected, +// and its destructor is invoked when the corresponding thread's Runnable +// returns, or when the thread exits. +// +// Inspired by Poco <http://pocoproject.org/docs/Poco.ThreadLocal.html>, +// Andrew Tomazos <http://stackoverflow.com/questions/12049684/>, and +// the C++11 thread_local API. +// +// Example usage: +// +// // Invokes a 3-arg constructor on SomeClass: +// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3); +// instance->DoSomething(); +// +#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \ +static __thread T* t; \ +do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + threadlocal::internal::AddDestructor(threadlocal::internal::Destroy<T>, t); \ + } \ +} while (false) + +// Class-scoped static thread local implementation. +// +// Very similar in implementation to the above block-scoped version, but +// requires a bit more syntax and vigilance to use properly. +// +// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the +// class header, as usual for variable declarations. +// +// Because these variables are static, they must also be defined in the impl +// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_), +// which is very much like defining any static member, i.e. int Foo::member_. +// +// Finally, each thread must initialize the instance before using it by calling +// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap +// call, and may be invoked at the top of any method which may reference a +// thread-local variable. +// +// Due to all of these requirements, you should probably declare TLS members +// as private. +// +// Example usage: +// +// // foo.h +// #include "kudu/utils/file.h" +// class Foo { +// public: +// void DoSomething(std::string s); +// private: +// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_); +// }; +// +// // foo.cc +// #include "kudu/foo.h" +// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_); +// void Foo::WriteToFile(std::string s) { +// // Call constructor if necessary. +// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt"); +// file_->Write(s); +// } + +// Goes in the class declaration (usually in a header file). +// dtor must be destructed _after_ t, so it gets defined first. +// Uses a mangled variable name for dtor since it must also be a member of the +// class. +#define DECLARE_STATIC_THREAD_LOCAL(T, t) \ +static __thread T* t + +// You must also define the instance in the .cc file. +#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) \ +__thread T* Class::t + +// Must be invoked at least once by each thread that will access t. +#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \ +do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + threadlocal::internal::AddDestructor(threadlocal::internal::Destroy<T>, t); \ + } \ +} while (false) + +// Internal implementation below. + +namespace kudu { +namespace threadlocal { +namespace internal { + +// Add a destructor to the list. +void AddDestructor(void (*destructor)(void*), void* arg); + +// Destroy the passed object of type T. +template<class T> +static void Destroy(void* t) { + // With tcmalloc, this should be pretty cheap (same thread as new). + delete reinterpret_cast<T*>(t); +} + +} // namespace internal +} // namespace threadlocal +} // namespace kudu + +#endif // KUDU_UTIL_THREADLOCAL_H_ http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadlocal_cache.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/threadlocal_cache.h b/be/src/kudu/util/threadlocal_cache.h new file mode 100644 index 0000000..e9ab3c2 --- /dev/null +++ b/be/src/kudu/util/threadlocal_cache.h @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "kudu/util/threadlocal.h" + +#include <boost/optional/optional.hpp> +#include <array> +#include <memory> +#include <utility> + +namespace kudu { + +// A small thread-local cache for arbitrary objects. +// +// This can be used as a contention-free "lookaside" type cache for frequently-accessed +// objects to avoid having to go to a less-efficient centralized cache. +// +// 'Key' must be copyable, and comparable using operator==(). +// 'T' has no particular requirements. +template<class Key, class T> +class ThreadLocalCache { + public: + // The number of entries in the cache. + // NOTE: this should always be a power of two for good performance, so that the + // compiler can optimize the modulo operations into bit-mask operations. + static constexpr int kItemCapacity = 4; + + // Look up a key in the cache. Returns either the existing entry with this key, + // or nullptr if no entry matched. + T* Lookup(const Key& key) { + // Our cache is so small that a linear search is likely to be more efficient than + // any kind of actual hashing. We always start the search at wherever we most + // recently found a hit. + for (int i = 0; i < kItemCapacity; i++) { + int idx = (last_hit_ + i) % kItemCapacity; + auto& p = cache_[idx]; + if (p.first == key) { + last_hit_ = idx; + return p.second.get_ptr(); + } + } + return nullptr; + } + + // Insert a new entry into the cache. If the cache is full (as it usually is in the + // steady state), this replaces one of the existing entries. The 'args' are forwarded + // to T's constructor. + // + // NOTE: entries returned by a previous call to Lookup() may possibly be invalidated + // by this function. + template<typename ... Args> + T* EmplaceNew(const Key& key, Args&&... args) { + auto& p = cache_[next_slot_++ % kItemCapacity]; + p.second.emplace(std::forward<Args>(args)...); + p.first = key; + return p.second.get_ptr(); + } + + // Get the the cache instance for this thread, creating it if it has not yet been + // created. + // + // The instance is automatically deleted and any cached items destructed when the + // thread exits. + static ThreadLocalCache* GetInstance() { + INIT_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_); + return tl_instance_; + } + + private: + using EntryPair = std::pair<Key, boost::optional<T>>; + std::array<EntryPair, kItemCapacity> cache_; + + // The next slot that we will write into. We always modulo this by the capacity + // before use. + uint8_t next_slot_ = 0; + // The slot where we last got a cache hit, so we can start our search at the same + // spot, optimizing for the case of repeated lookups of the same hot element. + uint8_t last_hit_ = 0; + + static_assert(kItemCapacity <= 1 << (sizeof(next_slot_) * 8), + "next_slot_ must be large enough for capacity"); + static_assert(kItemCapacity <= 1 << (sizeof(last_hit_) * 8), + "last_hit_ must be large enough for capacity"); + + DECLARE_STATIC_THREAD_LOCAL(ThreadLocalCache, tl_instance_); +}; + +// Define the thread-local storage for the ThreadLocalCache template. +// We can't use DEFINE_STATIC_THREAD_LOCAL here because the commas in the +// template arguments confuse the C preprocessor. +template<class K, class T> +__thread ThreadLocalCache<K,T>* ThreadLocalCache<K,T>::tl_instance_; + +} // namespace kudu