http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread.h ---------------------------------------------------------------------- diff --git a/be/src/util/thread.h b/be/src/util/thread.h index 18f3a75..6898517 100644 --- a/be/src/util/thread.h +++ b/be/src/util/thread.h @@ -48,10 +48,10 @@ class Webserver; /// TODO: Consider allowing fragment IDs as category parameters. class Thread { public: - /// This constructor pattern mimics that in boost::thread. There is - /// one constructor for each number of arguments that the thread + /// This static Create method pattern mimics that in the boost::thread constructors. + /// There is one static Create method for each number of arguments that the thread /// function accepts. To extend the set of acceptable signatures, add - /// another constructor with <class F, class A1.... class An>. + /// another static Create method with <class F, class A1.... class An>. // /// In general: /// - category: string identifying the thread category to which this thread belongs, @@ -61,44 +61,56 @@ class Thread { /// - F - a method type that supports operator(), and the instance passed to the /// constructor is executed immediately in a separate thread. /// - A1...An - argument types whose instances are passed to f(...) + /// - thread - unique_ptr<Thread>* to reset with the created Thread. + /// - fault_injection_eligible - If set to true, allow fault injection at this + /// callsite (see thread_creation_fault_injection). If set to false, fault + /// injection is diabled at this callsite. Thread creation sites that crash + /// Impala or abort startup must have this set to false. template <class F> - Thread(const std::string& category, const std::string& name, const F& f) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(f); + static Status Create(const std::string& category, const std::string& name, + const F& f, std::unique_ptr<Thread>* thread, + bool fault_injection_eligible = false) { + return StartThread(category, name, f, thread, fault_injection_eligible); } template <class F, class A1> - Thread(const std::string& category, const std::string& name, const F& f, const A1& a1) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(boost::bind(f, a1)); + static Status Create(const std::string& category, const std::string& name, + const F& f, const A1& a1, std::unique_ptr<Thread>* thread, + bool fault_injection_eligible = false) { + return StartThread(category, name, boost::bind(f, a1), thread, + fault_injection_eligible); } template <class F, class A1, class A2> - Thread(const std::string& category, const std::string& name, const F& f, - const A1& a1, const A2& a2) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(boost::bind(f, a1, a2)); + static Status Create(const std::string& category, const std::string& name, + const F& f, const A1& a1, const A2& a2, std::unique_ptr<Thread>* thread, + bool fault_injection_eligible = false) { + return StartThread(category, name, boost::bind(f, a1, a2), thread, + fault_injection_eligible); } template <class F, class A1, class A2, class A3> - Thread(const std::string& category, const std::string& name, const F& f, - const A1& a1, const A2& a2, const A3& a3) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(boost::bind(f, a1, a2, a3)); + static Status Create(const std::string& category, const std::string& name, + const F& f, const A1& a1, const A2& a2, const A3& a3, + std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) { + return StartThread(category, name, boost::bind(f, a1, a2, a3), thread, + fault_injection_eligible); } template <class F, class A1, class A2, class A3, class A4> - Thread(const std::string& category, const std::string& name, const F& f, - const A1& a1, const A2& a2, const A3& a3, const A4& a4) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(boost::bind(f, a1, a2, a3, a4)); + static Status Create(const std::string& category, const std::string& name, + const F& f, const A1& a1, const A2& a2, const A3& a3, const A4& a4, + std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), thread, + fault_injection_eligible); } template <class F, class A1, class A2, class A3, class A4, class A5> - Thread(const std::string& category, const std::string& name, const F& f, - const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5) - : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) { - StartThread(boost::bind(f, a1, a2, a3, a4, a5)); + static Status Create(const std::string& category, const std::string& name, + const F& f, const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) { + return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), thread, + fault_injection_eligible); } /// Blocks until this thread finishes execution. Once this method returns, the thread @@ -117,6 +129,9 @@ class Thread { static const int64_t INVALID_THREAD_ID = -1; private: + Thread(const std::string& category, const std::string& name) + : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {} + /// To distinguish between a thread ID that can't be determined, and one that hasn't /// been assigned. Since tid_ is set in the constructor, this value will never be seen /// by clients of this class. @@ -137,10 +152,13 @@ class Thread { /// non-negative integer, or INVALID_THREAD_ID. int64_t tid_; - /// Starts the thread running SuperviseThread(), and returns once that thread has - /// initialised and its TID read. Waits for notification from the started thread that - /// initialisation is complete before returning. - void StartThread(const ThreadFunctor& functor); + /// Creates a new thread and starts the thread running SuperviseThread(). It waits + /// for notification from the started thread that initialisation is complete and + /// the TID read before returning. This will return an error if thread create fails. + /// In the event of success, 'thread' will be set to the created Thread. + static Status StartThread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, std::unique_ptr<Thread>* thread, + bool fault_injection_eligible) WARN_UNUSED_RESULT; /// Wrapper for the user-supplied function. Always invoked from thread_. Executes the /// method in functor_, but before doing so registers with the global ThreadMgr and @@ -175,7 +193,7 @@ class ThreadGroup { /// will destroy it when the ThreadGroup is destroyed. Threads will linger until that /// point (even if terminated), however, so callers should be mindful of the cost of /// placing very many threads in this set. - void AddThread(std::unique_ptr<Thread> thread); + void AddThread(std::unique_ptr<Thread>&& thread); /// Waits for all threads to finish. DO NOT call this from a thread inside this set; /// deadlock will predictably ensue. @@ -196,7 +214,7 @@ void InitThreading(); /// the "thread-manager." If 'include_jvm_threads' is true, shows information about /// live JVM threads in the web UI. Status StartThreadInstrumentation(MetricGroup* metrics, Webserver* webserver, - bool include_jvm_threads); + bool include_jvm_threads) WARN_UNUSED_RESULT; } #endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 393daba..c08268e 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -331,6 +331,8 @@ error_codes = ( ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in pool $1. " "Queued reason: $2"), + + ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"), ) import sys
