http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/TLogging.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/TLogging.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/TLogging.h new file mode 100644 index 0000000..07ff030 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/TLogging.h @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TLOGGING_H_ +#define _THRIFT_TLOGGING_H_ 1 + +#include <thrift/thrift-config.h> + +/** + * Contains utility macros for debugging and logging. + * + */ + +#include <time.h> + +#ifdef HAVE_STDINT_H +#include <stdint.h> +#endif + +/** + * T_GLOBAL_DEBUGGING_LEVEL = 0: all debugging turned off, debug macros undefined + * T_GLOBAL_DEBUGGING_LEVEL = 1: all debugging turned on + */ +#define T_GLOBAL_DEBUGGING_LEVEL 0 + +/** + * T_GLOBAL_LOGGING_LEVEL = 0: all logging turned off, logging macros undefined + * T_GLOBAL_LOGGING_LEVEL = 1: all logging turned on + */ +#define T_GLOBAL_LOGGING_LEVEL 1 + +/** + * Standard wrapper around fprintf what will prefix the file name and line + * number to the line. Uses T_GLOBAL_DEBUGGING_LEVEL to control whether it is + * turned on or off. + * + * @param format_string + */ +#if T_GLOBAL_DEBUGGING_LEVEL > 0 +#define T_DEBUG(format_string, ...) \ + if (T_GLOBAL_DEBUGGING_LEVEL > 0) { \ + fprintf(stderr, "[%s,%d] " format_string " \n", __FILE__, __LINE__, ##__VA_ARGS__); \ + } +#else +#define T_DEBUG(format_string, ...) +#endif + +/** + * analogous to T_DEBUG but also prints the time + * + * @param string format_string input: printf style format string + */ +#if T_GLOBAL_DEBUGGING_LEVEL > 0 +#define T_DEBUG_T(format_string, ...) \ + { \ + if (T_GLOBAL_DEBUGGING_LEVEL > 0) { \ + time_t now; \ + char dbgtime[26]; \ + time(&now); \ + THRIFT_CTIME_R(&now, dbgtime); \ + dbgtime[24] = '\0'; \ + fprintf(stderr, \ + "[%s,%d] [%s] " format_string " \n", \ + __FILE__, \ + __LINE__, \ + dbgtime, \ + ##__VA_ARGS__); \ + } \ + } +#else +#define T_DEBUG_T(format_string, ...) +#endif + +/** + * analogous to T_DEBUG but uses input level to determine whether or not the string + * should be logged. + * + * @param int level: specified debug level + * @param string format_string input: format string + */ +#define T_DEBUG_L(level, format_string, ...) \ + if ((level) > 0) { \ + fprintf(stderr, "[%s,%d] " format_string " \n", __FILE__, __LINE__, ##__VA_ARGS__); \ + } + +/** + * Explicit error logging. Prints time, file name and line number + * + * @param string format_string input: printf style format string + */ +#define T_ERROR(format_string, ...) \ + { \ + time_t now; \ + char dbgtime[26]; \ + time(&now); \ + THRIFT_CTIME_R(&now, dbgtime); \ + dbgtime[24] = '\0'; \ + fprintf(stderr, \ + "[%s,%d] [%s] ERROR: " format_string " \n", \ + __FILE__, \ + __LINE__, \ + dbgtime, \ + ##__VA_ARGS__); \ + } + +/** + * Analogous to T_ERROR, additionally aborting the process. + * WARNING: macro calls abort(), ending program execution + * + * @param string format_string input: printf style format string + */ +#define T_ERROR_ABORT(format_string, ...) \ + { \ + time_t now; \ + char dbgtime[26]; \ + time(&now); \ + THRIFT_CTIME_R(&now, dbgtime); \ + dbgtime[24] = '\0'; \ + fprintf(stderr, \ + "[%s,%d] [%s] ERROR: Going to abort " format_string " \n", \ + __FILE__, \ + __LINE__, \ + dbgtime, \ + ##__VA_ARGS__); \ + exit(1); \ + } + +/** + * Log input message + * + * @param string format_string input: printf style format string + */ +#if T_GLOBAL_LOGGING_LEVEL > 0 +#define T_LOG_OPER(format_string, ...) \ + { \ + if (T_GLOBAL_LOGGING_LEVEL > 0) { \ + time_t now; \ + char dbgtime[26]; \ + time(&now); \ + THRIFT_CTIME_R(&now, dbgtime); \ + dbgtime[24] = '\0'; \ + fprintf(stderr, "[%s] " format_string " \n", dbgtime, ##__VA_ARGS__); \ + } \ + } +#else +#define T_LOG_OPER(format_string, ...) +#endif + +/** + * T_GLOBAL_DEBUG_VIRTUAL = 0 or unset: normal operation, + * virtual call debug messages disabled + * T_GLOBAL_DEBUG_VIRTUAL = 1: log a debug messages whenever an + * avoidable virtual call is made + * T_GLOBAL_DEBUG_VIRTUAL = 2: record detailed info that can be + * printed by calling + * apache::thrift::profile_print_info() + */ +#if T_GLOBAL_DEBUG_VIRTUAL > 1 +#define T_VIRTUAL_CALL() ::apache::thrift::profile_virtual_call(typeid(*this)) +#define T_GENERIC_PROTOCOL(template_class, generic_prot, specific_prot) \ + do { \ + if (!(specific_prot)) { \ + ::apache::thrift::profile_generic_protocol(typeid(*template_class), typeid(*generic_prot)); \ + } \ + } while (0) +#elif T_GLOBAL_DEBUG_VIRTUAL == 1 +#define T_VIRTUAL_CALL() fprintf(stderr, "[%s,%d] virtual call\n", __FILE__, __LINE__) +#define T_GENERIC_PROTOCOL(template_class, generic_prot, specific_prot) \ + do { \ + if (!(specific_prot)) { \ + fprintf(stderr, "[%s,%d] failed to cast to specific protocol type\n", __FILE__, __LINE__); \ + } \ + } while (0) +#else +#define T_VIRTUAL_CALL() +#define T_GENERIC_PROTOCOL(template_class, generic_prot, specific_prot) +#endif + +#endif // #ifndef _THRIFT_TLOGGING_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.cpp new file mode 100644 index 0000000..5739d0f --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.cpp @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/Thrift.h> +#include <cstring> +#include <cstdlib> +#include <boost/lexical_cast.hpp> +#include <stdarg.h> +#include <stdio.h> + +namespace apache { +namespace thrift { + +TOutput GlobalOutput; + +void TOutput::printf(const char* message, ...) { +#ifndef THRIFT_SQUELCH_CONSOLE_OUTPUT + // Try to reduce heap usage, even if printf is called rarely. + static const int STACK_BUF_SIZE = 256; + char stack_buf[STACK_BUF_SIZE]; + va_list ap; + +#ifdef _MSC_VER + va_start(ap, message); + int need = _vscprintf(message, ap); + va_end(ap); + + if (need < STACK_BUF_SIZE) { + va_start(ap, message); + vsnprintf_s(stack_buf, STACK_BUF_SIZE, _TRUNCATE, message, ap); + va_end(ap); + f_(stack_buf); + return; + } +#else + va_start(ap, message); + int need = vsnprintf(stack_buf, STACK_BUF_SIZE, message, ap); + va_end(ap); + + if (need < STACK_BUF_SIZE) { + f_(stack_buf); + return; + } +#endif + + char* heap_buf = (char*)malloc((need + 1) * sizeof(char)); + if (heap_buf == NULL) { +#ifdef _MSC_VER + va_start(ap, message); + vsnprintf_s(stack_buf, STACK_BUF_SIZE, _TRUNCATE, message, ap); + va_end(ap); +#endif + // Malloc failed. We might as well print the stack buffer. + f_(stack_buf); + return; + } + + va_start(ap, message); + int rval = vsnprintf(heap_buf, need + 1, message, ap); + va_end(ap); + // TODO(shigin): inform user + if (rval != -1) { + f_(heap_buf); + } + free(heap_buf); +#endif +} + +void TOutput::errorTimeWrapper(const char* msg) { +#ifndef THRIFT_SQUELCH_CONSOLE_OUTPUT + time_t now; + char dbgtime[26]; + time(&now); + THRIFT_CTIME_R(&now, dbgtime); + dbgtime[24] = 0; + fprintf(stderr, "Thrift: %s %s\n", dbgtime, msg); +#endif +} + +void TOutput::perror(const char* message, int errno_copy) { + std::string out = message + strerror_s(errno_copy); + f_(out.c_str()); +} + +std::string TOutput::strerror_s(int errno_copy) { +#ifndef HAVE_STRERROR_R + return "errno = " + boost::lexical_cast<std::string>(errno_copy); +#else // HAVE_STRERROR_R + + char b_errbuf[1024] = {'\0'}; +#ifdef STRERROR_R_CHAR_P + char* b_error = strerror_r(errno_copy, b_errbuf, sizeof(b_errbuf)); +#else + char* b_error = b_errbuf; + int rv = strerror_r(errno_copy, b_errbuf, sizeof(b_errbuf)); + if (rv == -1) { + // strerror_r failed. omgwtfbbq. + return "XSI-compliant strerror_r() failed with errno = " + + boost::lexical_cast<std::string>(errno_copy); + } +#endif + // Can anyone prove that explicit cast is probably not necessary + // to ensure that the string object is constructed before + // b_error becomes invalid? + return std::string(b_error); + +#endif // HAVE_STRERROR_R +} +} +} // apache::thrift http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.h new file mode 100644 index 0000000..1375f73 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/TOutput.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_OUTPUT_H_ +#define _THRIFT_OUTPUT_H_ 1 + +namespace apache { +namespace thrift { + +class TOutput { +public: + TOutput() : f_(&errorTimeWrapper) {} + + inline void setOutputFunction(void (*function)(const char*)) { f_ = function; } + + inline void operator()(const char* message) { f_(message); } + + // It is important to have a const char* overload here instead of + // just the string version, otherwise errno could be corrupted + // if there is some problem allocating memory when constructing + // the string. + void perror(const char* message, int errno_copy); + inline void perror(const std::string& message, int errno_copy) { + perror(message.c_str(), errno_copy); + } + + void printf(const char* message, ...); + + static void errorTimeWrapper(const char* msg); + + /** Just like strerror_r but returns a C++ string object. */ + static std::string strerror_s(int errno_copy); + +private: + void (*f_)(const char*); +}; + +extern TOutput GlobalOutput; +} +} // namespace apache::thrift + +#endif //_THRIFT_OUTPUT_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/TProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/TProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/TProcessor.h new file mode 100644 index 0000000..d8f86c4 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/TProcessor.h @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TPROCESSOR_H_ +#define _THRIFT_TPROCESSOR_H_ 1 + +#include <string> +#include <thrift/protocol/TProtocol.h> +#include <boost/shared_ptr.hpp> + +namespace apache { +namespace thrift { + +/** + * Virtual interface class that can handle events from the processor. To + * use this you should subclass it and implement the methods that you care + * about. Your subclass can also store local data that you may care about, + * such as additional "arguments" to these methods (stored in the object + * instance's state). + */ +class TProcessorEventHandler { +public: + virtual ~TProcessorEventHandler() {} + + /** + * Called before calling other callback methods. + * Expected to return some sort of context object. + * The return value is passed to all other callbacks + * for that function invocation. + */ + virtual void* getContext(const char* fn_name, void* serverContext) { + (void)fn_name; + (void)serverContext; + return NULL; + } + + /** + * Expected to free resources associated with a context. + */ + virtual void freeContext(void* ctx, const char* fn_name) { + (void)ctx; + (void)fn_name; + } + + /** + * Called before reading arguments. + */ + virtual void preRead(void* ctx, const char* fn_name) { + (void)ctx; + (void)fn_name; + } + + /** + * Called between reading arguments and calling the handler. + */ + virtual void postRead(void* ctx, const char* fn_name, uint32_t bytes) { + (void)ctx; + (void)fn_name; + (void)bytes; + } + + /** + * Called between calling the handler and writing the response. + */ + virtual void preWrite(void* ctx, const char* fn_name) { + (void)ctx; + (void)fn_name; + } + + /** + * Called after writing the response. + */ + virtual void postWrite(void* ctx, const char* fn_name, uint32_t bytes) { + (void)ctx; + (void)fn_name; + (void)bytes; + } + + /** + * Called when an async function call completes successfully. + */ + virtual void asyncComplete(void* ctx, const char* fn_name) { + (void)ctx; + (void)fn_name; + } + + /** + * Called if the handler throws an undeclared exception. + */ + virtual void handlerError(void* ctx, const char* fn_name) { + (void)ctx; + (void)fn_name; + } + +protected: + TProcessorEventHandler() {} +}; + +/** + * A helper class used by the generated code to free each context. + */ +class TProcessorContextFreer { +public: + TProcessorContextFreer(TProcessorEventHandler* handler, void* context, const char* method) + : handler_(handler), context_(context), method_(method) {} + ~TProcessorContextFreer() { + if (handler_ != NULL) + handler_->freeContext(context_, method_); + } + void unregister() { handler_ = NULL; } + +private: + apache::thrift::TProcessorEventHandler* handler_; + void* context_; + const char* method_; +}; + +/** + * A processor is a generic object that acts upon two streams of data, one + * an input and the other an output. The definition of this object is loose, + * though the typical case is for some sort of server that either generates + * responses to an input stream or forwards data from one pipe onto another. + * + */ +class TProcessor { +public: + virtual ~TProcessor() {} + + virtual bool process(boost::shared_ptr<protocol::TProtocol> in, + boost::shared_ptr<protocol::TProtocol> out, + void* connectionContext) = 0; + + bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> io, void* connectionContext) { + return process(io, io, connectionContext); + } + + boost::shared_ptr<TProcessorEventHandler> getEventHandler() { return eventHandler_; } + + void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) { + eventHandler_ = eventHandler; + } + +protected: + TProcessor() {} + + boost::shared_ptr<TProcessorEventHandler> eventHandler_; +}; + +/** + * This is a helper class to allow boost::shared_ptr to be used with handler + * pointers returned by the generated handler factories. + * + * The handler factory classes generated by the thrift compiler return raw + * pointers, and factory->releaseHandler() must be called when the handler is + * no longer needed. + * + * A ReleaseHandler object can be instantiated and passed as the second + * parameter to a shared_ptr, so that factory->releaseHandler() will be called + * when the object is no longer needed, instead of deleting the pointer. + */ +template <typename HandlerFactory_> +class ReleaseHandler { +public: + ReleaseHandler(const boost::shared_ptr<HandlerFactory_>& handlerFactory) + : handlerFactory_(handlerFactory) {} + + void operator()(typename HandlerFactory_::Handler* handler) { + if (handler) { + handlerFactory_->releaseHandler(handler); + } + } + +private: + boost::shared_ptr<HandlerFactory_> handlerFactory_; +}; + +struct TConnectionInfo { + // The input and output protocols + boost::shared_ptr<protocol::TProtocol> input; + boost::shared_ptr<protocol::TProtocol> output; + // The underlying transport used for the connection + // This is the transport that was returned by TServerTransport::accept(), + // and it may be different than the transport pointed to by the input and + // output protocols. + boost::shared_ptr<transport::TTransport> transport; +}; + +class TProcessorFactory { +public: + virtual ~TProcessorFactory() {} + + /** + * Get the TProcessor to use for a particular connection. + * + * This method is always invoked in the same thread that the connection was + * accepted on. This generally means that this call does not need to be + * thread safe, as it will always be invoked from a single thread. + */ + virtual boost::shared_ptr<TProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; +}; + +class TSingletonProcessorFactory : public TProcessorFactory { +public: + TSingletonProcessorFactory(boost::shared_ptr<TProcessor> processor) : processor_(processor) {} + + boost::shared_ptr<TProcessor> getProcessor(const TConnectionInfo&) { return processor_; } + +private: + boost::shared_ptr<TProcessor> processor_; +}; +} +} // apache::thrift + +#endif // #ifndef _THRIFT_TPROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/TToString.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/TToString.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/TToString.h new file mode 100644 index 0000000..5023869 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/TToString.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TOSTRING_H_ +#define _THRIFT_TOSTRING_H_ 1 + +#include <boost/lexical_cast.hpp> + +#include <vector> +#include <map> +#include <set> +#include <string> +#include <sstream> + +namespace apache { +namespace thrift { + +template <typename T> +std::string to_string(const T& t) { + return boost::lexical_cast<std::string>(t); +} + +template <typename K, typename V> +std::string to_string(const std::map<K, V>& m); + +template <typename T> +std::string to_string(const std::set<T>& s); + +template <typename T> +std::string to_string(const std::vector<T>& t); + +template <typename K, typename V> +std::string to_string(const typename std::pair<K, V>& v) { + std::ostringstream o; + o << to_string(v.first) << ": " << to_string(v.second); + return o.str(); +} + +template <typename T> +std::string to_string(const T& beg, const T& end) { + std::ostringstream o; + for (T it = beg; it != end; ++it) { + if (it != beg) + o << ", "; + o << to_string(*it); + } + return o.str(); +} + +template <typename T> +std::string to_string(const std::vector<T>& t) { + std::ostringstream o; + o << "[" << to_string(t.begin(), t.end()) << "]"; + return o.str(); +} + +template <typename K, typename V> +std::string to_string(const std::map<K, V>& m) { + std::ostringstream o; + o << "{" << to_string(m.begin(), m.end()) << "}"; + return o.str(); +} + +template <typename T> +std::string to_string(const std::set<T>& s) { + std::ostringstream o; + o << "{" << to_string(s.begin(), s.end()) << "}"; + return o.str(); +} +} +} // apache::thrift + +#endif // _THRIFT_TOSTRING_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/Thrift.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/Thrift.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/Thrift.h new file mode 100644 index 0000000..e8e70eb --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/Thrift.h @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_THRIFT_H_ +#define _THRIFT_THRIFT_H_ 1 + +#include <thrift/transport/PlatformSocket.h> + +#include <thrift/thrift-config.h> + +#include <stdio.h> +#include <assert.h> + +#include <sys/types.h> +#ifdef HAVE_NETINET_IN_H +#include <netinet/in.h> +#endif +#ifdef HAVE_INTTYPES_H +#include <inttypes.h> +#endif +#include <string> +#include <map> +#include <list> +#include <set> +#include <vector> +#include <exception> +#include <typeinfo> + +#include <boost/utility/enable_if.hpp> +#include <boost/type_traits/is_convertible.hpp> + +#include <thrift/TLogging.h> +#include <thrift/TOutput.h> + +#define THRIFT_UNUSED_VARIABLE(x) ((void)(x)) + +namespace apache { +namespace thrift { + +class TEnumIterator + : public std::iterator<std::forward_iterator_tag, std::pair<int, const char*> > { +public: + TEnumIterator(int n, int* enums, const char** names) + : ii_(0), n_(n), enums_(enums), names_(names) {} + + int operator++() { return ++ii_; } + + bool operator!=(const TEnumIterator& end) { + THRIFT_UNUSED_VARIABLE(end); + assert(end.n_ == -1); + return (ii_ != n_); + } + + std::pair<int, const char*> operator*() const { return std::make_pair(enums_[ii_], names_[ii_]); } + +private: + int ii_; + const int n_; + int* enums_; + const char** names_; +}; + +class TException : public std::exception { +public: + TException() : message_() {} + + TException(const std::string& message) : message_(message) {} + + virtual ~TException() throw() {} + + virtual const char* what() const throw() { + if (message_.empty()) { + return "Default TException."; + } else { + return message_.c_str(); + } + } + +protected: + std::string message_; +}; + +class TDelayedException { +public: + template <class E> + static TDelayedException* delayException(const E& e); + virtual void throw_it() = 0; + virtual ~TDelayedException(){}; +}; + +template <class E> +class TExceptionWrapper : public TDelayedException { +public: + TExceptionWrapper(const E& e) : e_(e) {} + virtual void throw_it() { + E temp(e_); + delete this; + throw temp; + } + +private: + E e_; +}; + +template <class E> +TDelayedException* TDelayedException::delayException(const E& e) { + return new TExceptionWrapper<E>(e); +} + +#if T_GLOBAL_DEBUG_VIRTUAL > 1 +void profile_virtual_call(const std::type_info& info); +void profile_generic_protocol(const std::type_info& template_type, const std::type_info& prot_type); +void profile_print_info(FILE* f); +void profile_print_info(); +void profile_write_pprof(FILE* gen_calls_f, FILE* virtual_calls_f); +#endif +} +} // apache::thrift + +#endif // #ifndef _THRIFT_THRIFT_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/VirtualProfiling.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/VirtualProfiling.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/VirtualProfiling.cpp new file mode 100644 index 0000000..6ce346b --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/VirtualProfiling.cpp @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/Thrift.h> + +// Do nothing if virtual call profiling is not enabled +#if T_GLOBAL_DEBUG_VIRTUAL > 1 + +// TODO: This code only works with g++ (since we rely on the fact +// that all std::type_info instances referring to a particular type +// always return the exact same pointer value from name().) +#ifndef __GNUG__ +#error "Thrift virtual function profiling currently only works with gcc" +#endif // !__GNUG__ + +// TODO: We also require glibc for the backtrace() and backtrace_symbols() +// functions. +#ifndef __GLIBC__ +#error "Thrift virtual function profiling currently requires glibc" +#endif // !__GLIBC__ + +#include <thrift/concurrency/Mutex.h> + +#include <ext/hash_map> +#include <execinfo.h> +#include <stdio.h> + +namespace apache { +namespace thrift { + +using ::apache::thrift::concurrency::Mutex; +using ::apache::thrift::concurrency::Guard; + +static const unsigned int MAX_STACK_DEPTH = 15; + +/** + * A stack trace + */ +class Backtrace { +public: + Backtrace(int skip = 0); + Backtrace(Backtrace const& bt); + + void operator=(Backtrace const& bt) { + numCallers_ = bt.numCallers_; + if (numCallers_ >= 0) { + memcpy(callers_, bt.callers_, numCallers_ * sizeof(void*)); + } + } + + bool operator==(Backtrace const& bt) const { return (cmp(bt) == 0); } + + size_t hash() const { + intptr_t ret = 0; + for (int n = 0; n < numCallers_; ++n) { + ret ^= reinterpret_cast<intptr_t>(callers_[n]); + } + return static_cast<size_t>(ret); + } + + int cmp(Backtrace const& bt) const { + int depth_diff = (numCallers_ - bt.numCallers_); + if (depth_diff != 0) { + return depth_diff; + } + + for (int n = 0; n < numCallers_; ++n) { + int diff = reinterpret_cast<intptr_t>(callers_[n]) + - reinterpret_cast<intptr_t>(bt.callers_[n]); + if (diff != 0) { + return diff; + } + } + + return 0; + } + + void print(FILE* f, int indent = 0, int start = 0) const { + char** strings = backtrace_symbols(callers_, numCallers_); + if (strings) { + start += skip_; + if (start < 0) { + start = 0; + } + for (int n = start; n < numCallers_; ++n) { + fprintf(f, "%*s#%-2d %s\n", indent, "", n, strings[n]); + } + free(strings); + } else { + fprintf(f, "%*s<failed to determine symbols>\n", indent, ""); + } + } + + int getDepth() const { return numCallers_ - skip_; } + + void* getFrame(int index) const { + int adjusted_index = index + skip_; + if (adjusted_index < 0 || adjusted_index >= numCallers_) { + return NULL; + } + return callers_[adjusted_index]; + } + +private: + void* callers_[MAX_STACK_DEPTH]; + int numCallers_; + int skip_; +}; + +// Define the constructors non-inline, so they consistently add a single +// frame to the stack trace, regardless of whether optimization is enabled +Backtrace::Backtrace(int skip) + : skip_(skip + 1) // ignore the constructor itself +{ + numCallers_ = backtrace(callers_, MAX_STACK_DEPTH); + if (skip_ > numCallers_) { + skip_ = numCallers_; + } +} + +Backtrace::Backtrace(Backtrace const& bt) : numCallers_(bt.numCallers_), skip_(bt.skip_) { + if (numCallers_ >= 0) { + memcpy(callers_, bt.callers_, numCallers_ * sizeof(void*)); + } +} + +/** + * A backtrace, plus one or two type names + */ +class Key { +public: + class Hash { + public: + size_t operator()(Key const& k) const { return k.hash(); } + }; + + Key(const Backtrace* bt, const std::type_info& type_info) + : backtrace_(bt), typeName1_(type_info.name()), typeName2_(NULL) {} + + Key(const Backtrace* bt, const std::type_info& type_info1, const std::type_info& type_info2) + : backtrace_(bt), typeName1_(type_info1.name()), typeName2_(type_info2.name()) {} + + Key(const Key& k) + : backtrace_(k.backtrace_), typeName1_(k.typeName1_), typeName2_(k.typeName2_) {} + + void operator=(const Key& k) { + backtrace_ = k.backtrace_; + typeName1_ = k.typeName1_; + typeName2_ = k.typeName2_; + } + + const Backtrace* getBacktrace() const { return backtrace_; } + + const char* getTypeName() const { return typeName1_; } + + const char* getTypeName2() const { return typeName2_; } + + void makePersistent() { + // Copy the Backtrace object + backtrace_ = new Backtrace(*backtrace_); + + // NOTE: We don't copy the type name. + // The GNU libstdc++ implementation of type_info::name() returns a value + // that will be valid for the lifetime of the program. (Although the C++ + // standard doesn't guarantee this will be true on all implementations.) + } + + /** + * Clean up memory allocated by makePersistent() + * + * Should only be invoked if makePersistent() has previously been called. + * The Key should no longer be used after cleanup() is called. + */ + void cleanup() { + delete backtrace_; + backtrace_ = NULL; + } + + int cmp(const Key& k) const { + int ret = backtrace_->cmp(*k.backtrace_); + if (ret != 0) { + return ret; + } + + // NOTE: We compare just the name pointers. + // With GNU libstdc++, every type_info object for the same type points to + // exactly the same name string. (Although this isn't guaranteed by the + // C++ standard.) + ret = k.typeName1_ - typeName1_; + if (ret != 0) { + return ret; + } + return k.typeName2_ - typeName2_; + } + + bool operator==(const Key& k) const { return cmp(k) == 0; } + + size_t hash() const { + // NOTE: As above, we just use the name pointer value. + // Works with GNU libstdc++, but not guaranteed to be correct on all + // implementations. + return backtrace_->hash() ^ reinterpret_cast<size_t>(typeName1_) + ^ reinterpret_cast<size_t>(typeName2_); + } + +private: + const Backtrace* backtrace_; + const char* typeName1_; + const char* typeName2_; +}; + +/** + * A functor that determines which of two BacktraceMap entries + * has a higher count. + */ +class CountGreater { +public: + bool operator()(std::pair<Key, size_t> bt1, std::pair<Key, size_t> bt2) const { + return bt1.second > bt2.second; + } +}; + +typedef __gnu_cxx::hash_map<Key, size_t, Key::Hash> BacktraceMap; + +/** + * A map describing how many times T_VIRTUAL_CALL() has been invoked. + */ +BacktraceMap virtual_calls; +Mutex virtual_calls_mutex; + +/** + * A map describing how many times T_GENERIC_PROTOCOL() has been invoked. + */ +BacktraceMap generic_calls; +Mutex generic_calls_mutex; + +void _record_backtrace(BacktraceMap* map, const Mutex& mutex, Key* k) { + Guard guard(mutex); + + BacktraceMap::iterator it = map->find(*k); + if (it == map->end()) { + k->makePersistent(); + map->insert(std::make_pair(*k, 1)); + } else { + // increment the count + // NOTE: we could assert if it->second is 0 afterwards, since that would + // mean we've wrapped. + ++(it->second); + } +} + +/** + * Record an unnecessary virtual function call. + * + * This method is invoked by the T_VIRTUAL_CALL() macro. + */ +void profile_virtual_call(const std::type_info& type) { + int const skip = 1; // ignore this frame + Backtrace bt(skip); + Key k(&bt, type); + _record_backtrace(&virtual_calls, virtual_calls_mutex, &k); +} + +/** + * Record a call to a template processor with a protocol that is not the one + * specified in the template parameter. + * + * This method is invoked by the T_GENERIC_PROTOCOL() macro. + */ +void profile_generic_protocol(const std::type_info& template_type, + const std::type_info& prot_type) { + int const skip = 1; // ignore this frame + Backtrace bt(skip); + Key k(&bt, template_type, prot_type); + _record_backtrace(&generic_calls, generic_calls_mutex, &k); +} + +/** + * Print the recorded profiling information to the specified file. + */ +void profile_print_info(FILE* f) { + typedef std::vector<std::pair<Key, size_t> > BacktraceVector; + + CountGreater is_greater; + + // Grab both locks for the duration of the print operation, + // to ensure the output is a consistent snapshot of a single point in time + Guard generic_calls_guard(generic_calls_mutex); + Guard virtual_calls_guard(virtual_calls_mutex); + + // print the info from generic_calls, sorted by frequency + // + // We print the generic_calls info ahead of virtual_calls, since it is more + // useful in some cases. All T_GENERIC_PROTOCOL calls can be eliminated + // from most programs. Not all T_VIRTUAL_CALLs will be eliminated by + // converting to templates. + BacktraceVector gp_sorted(generic_calls.begin(), generic_calls.end()); + std::sort(gp_sorted.begin(), gp_sorted.end(), is_greater); + + for (BacktraceVector::const_iterator it = gp_sorted.begin(); it != gp_sorted.end(); ++it) { + Key const& key = it->first; + size_t const count = it->second; + fprintf(f, + "T_GENERIC_PROTOCOL: %zu calls to %s with a %s:\n", + count, + key.getTypeName(), + key.getTypeName2()); + key.getBacktrace()->print(f, 2); + fprintf(f, "\n"); + } + + // print the info from virtual_calls, sorted by frequency + BacktraceVector vc_sorted(virtual_calls.begin(), virtual_calls.end()); + std::sort(vc_sorted.begin(), vc_sorted.end(), is_greater); + + for (BacktraceVector::const_iterator it = vc_sorted.begin(); it != vc_sorted.end(); ++it) { + Key const& key = it->first; + size_t const count = it->second; + fprintf(f, "T_VIRTUAL_CALL: %zu calls on %s:\n", count, key.getTypeName()); + key.getBacktrace()->print(f, 2); + fprintf(f, "\n"); + } +} + +/** + * Print the recorded profiling information to stdout. + */ +void profile_print_info() { + profile_print_info(stdout); +} + +/** + * Write a BacktraceMap as Google CPU profiler binary data. + */ +static void profile_write_pprof_file(FILE* f, BacktraceMap const& map) { + // Write the header + uintptr_t header[5] = {0, 3, 0, 0, 0}; + fwrite(&header, sizeof(header), 1, f); + + // Write the profile records + for (BacktraceMap::const_iterator it = map.begin(); it != map.end(); ++it) { + uintptr_t count = it->second; + fwrite(&count, sizeof(count), 1, f); + + Backtrace const* bt = it->first.getBacktrace(); + uintptr_t num_pcs = bt->getDepth(); + fwrite(&num_pcs, sizeof(num_pcs), 1, f); + + for (uintptr_t n = 0; n < num_pcs; ++n) { + void* pc = bt->getFrame(n); + fwrite(&pc, sizeof(pc), 1, f); + } + } + + // Write the trailer + uintptr_t trailer[3] = {0, 1, 0}; + fwrite(&trailer, sizeof(trailer), 1, f); + + // Write /proc/self/maps + // TODO(simpkins): This only works on linux + FILE* proc_maps = fopen("/proc/self/maps", "r"); + if (proc_maps) { + uint8_t buf[4096]; + while (true) { + size_t bytes_read = fread(buf, 1, sizeof(buf), proc_maps); + if (bytes_read == 0) { + break; + } + fwrite(buf, 1, bytes_read, f); + } + fclose(proc_maps); + } +} + +/** + * Write the recorded profiling information as pprof files. + * + * This writes the information using the Google CPU profiler binary data + * format, so it can be analyzed with pprof. Note that information about the + * protocol/transport data types cannot be stored in this file format. + * + * See http://code.google.com/p/google-perftools/ for more details. + * + * @param gen_calls_f The information about calls to + * profile_generic_protocol() will be written to this + * file. + * @param virtual_calls_f The information about calls to + * profile_virtual_call() will be written to this file. + */ +void profile_write_pprof(FILE* gen_calls_f, FILE* virtual_calls_f) { + typedef std::vector<std::pair<Key, size_t> > BacktraceVector; + + CountGreater is_greater; + + // Grab both locks for the duration of the print operation, + // to ensure the output is a consistent snapshot of a single point in time + Guard generic_calls_guard(generic_calls_mutex); + Guard virtual_calls_guard(virtual_calls_mutex); + + // write the info from generic_calls + profile_write_pprof_file(gen_calls_f, generic_calls); + + // write the info from virtual_calls + profile_write_pprof_file(virtual_calls_f, virtual_calls); +} +} +} // apache::thrift + +#endif // T_GLOBAL_PROFILE_VIRTUAL > 0 http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncBufferProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncBufferProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncBufferProcessor.h new file mode 100644 index 0000000..3c957a6 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncBufferProcessor.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ +#define _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ 1 + +#include <thrift/cxxfunctional.h> +#include <boost/shared_ptr.hpp> + +#include <thrift/transport/TBufferTransports.h> + +namespace apache { +namespace thrift { +namespace async { + +class TAsyncBufferProcessor { +public: + // Process data in "in", putting the result in "out". + // Call _return(true) when done, or _return(false) to + // forcefully close the connection (if applicable). + // "in" and "out" should be TMemoryBuffer or similar, + // not a wrapper around a socket. + virtual void process(apache::thrift::stdcxx::function<void(bool healthy)> _return, + boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf, + boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf) = 0; + virtual ~TAsyncBufferProcessor() {} +}; +} +} +} // apache::thrift::async + +#endif // #ifndef _THRIFT_TASYNC_BUFFER_PROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.cpp new file mode 100644 index 0000000..4716af2 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.cpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TAsyncChannel.h> +#include <thrift/cxxfunctional.h> + +namespace apache { +namespace thrift { +namespace async { + +void TAsyncChannel::sendAndRecvMessage(const VoidCallback& cob, + TMemoryBuffer* sendBuf, + TMemoryBuffer* recvBuf) { + apache::thrift::stdcxx::function<void()> send_done + = apache::thrift::stdcxx::bind(&TAsyncChannel::recvMessage, this, cob, recvBuf); + + sendMessage(send_done, sendBuf); +} +} +} +} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.h new file mode 100644 index 0000000..eb3ce2a --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncChannel.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_ +#define _THRIFT_ASYNC_TASYNCCHANNEL_H_ 1 + +#include <thrift/cxxfunctional.h> +#include <thrift/Thrift.h> + +namespace apache { +namespace thrift { +namespace transport { +class TMemoryBuffer; +} +} +} + +namespace apache { +namespace thrift { +namespace async { +using apache::thrift::transport::TMemoryBuffer; + +class TAsyncChannel { +public: + typedef apache::thrift::stdcxx::function<void()> VoidCallback; + + virtual ~TAsyncChannel() {} + + // is the channel in a good state? + virtual bool good() const = 0; + virtual bool error() const = 0; + virtual bool timedOut() const = 0; + + /** + * Send a message over the channel. + */ + virtual void sendMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message) = 0; + + /** + * Receive a message from the channel. + */ + virtual void recvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message) = 0; + + /** + * Send a message over the channel and receive a response. + */ + virtual void sendAndRecvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf); +}; +} +} +} // apache::thrift::async + +#endif // #ifndef _THRIFT_ASYNC_TASYNCCHANNEL_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h new file mode 100644 index 0000000..e79c57d --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_ +#define _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_ 1 + +#include <thrift/async/TAsyncProcessor.h> + +namespace apache { +namespace thrift { +namespace async { + +/** + * TAsyncDispatchProcessor is a helper class to parse the message header then + * call another function to dispatch based on the function name. + * + * Subclasses must implement dispatchCall() to dispatch on the function name. + */ +template <class Protocol_> +class TAsyncDispatchProcessorT : public TAsyncProcessor { +public: + virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, + boost::shared_ptr<protocol::TProtocol> in, + boost::shared_ptr<protocol::TProtocol> out) { + protocol::TProtocol* inRaw = in.get(); + protocol::TProtocol* outRaw = out.get(); + + // Try to dynamic cast to the template protocol type + Protocol_* specificIn = dynamic_cast<Protocol_*>(inRaw); + Protocol_* specificOut = dynamic_cast<Protocol_*>(outRaw); + if (specificIn && specificOut) { + return processFast(_return, specificIn, specificOut); + } + + // Log the fact that we have to use the slow path + T_GENERIC_PROTOCOL(this, inRaw, specificIn); + T_GENERIC_PROTOCOL(this, outRaw, specificOut); + + std::string fname; + protocol::TMessageType mtype; + int32_t seqid; + inRaw->readMessageBegin(fname, mtype, seqid); + + // If this doesn't look like a valid call, log an error and return false so + // that the server will close the connection. + // + // (The old generated processor code used to try to skip a T_STRUCT and + // continue. However, that seems unsafe.) + if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { + GlobalOutput.printf("received invalid message type %d from client", mtype); + _return(false); + return; + } + + return this->dispatchCall(_return, inRaw, outRaw, fname, seqid); + } + + void processFast(apache::thrift::stdcxx::function<void(bool success)> _return, + Protocol_* in, + Protocol_* out) { + std::string fname; + protocol::TMessageType mtype; + int32_t seqid; + in->readMessageBegin(fname, mtype, seqid); + + if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { + GlobalOutput.printf("received invalid message type %d from client", mtype); + _return(false); + return; + } + + return this->dispatchCallTemplated(_return, in, out, fname, seqid); + } + + virtual void dispatchCall(apache::thrift::stdcxx::function<void(bool ok)> _return, + apache::thrift::protocol::TProtocol* in, + apache::thrift::protocol::TProtocol* out, + const std::string& fname, + int32_t seqid) = 0; + + virtual void dispatchCallTemplated(apache::thrift::stdcxx::function<void(bool ok)> _return, + Protocol_* in, + Protocol_* out, + const std::string& fname, + int32_t seqid) = 0; +}; + +/** + * Non-templatized version of TAsyncDispatchProcessor, + * that doesn't bother trying to perform a dynamic_cast. + */ +class TAsyncDispatchProcessor : public TAsyncProcessor { +public: + virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, + boost::shared_ptr<protocol::TProtocol> in, + boost::shared_ptr<protocol::TProtocol> out) { + protocol::TProtocol* inRaw = in.get(); + protocol::TProtocol* outRaw = out.get(); + + std::string fname; + protocol::TMessageType mtype; + int32_t seqid; + inRaw->readMessageBegin(fname, mtype, seqid); + + // If this doesn't look like a valid call, log an error and return false so + // that the server will close the connection. + // + // (The old generated processor code used to try to skip a T_STRUCT and + // continue. However, that seems unsafe.) + if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { + GlobalOutput.printf("received invalid message type %d from client", mtype); + _return(false); + return; + } + + return dispatchCall(_return, inRaw, outRaw, fname, seqid); + } + + virtual void dispatchCall(apache::thrift::stdcxx::function<void(bool ok)> _return, + apache::thrift::protocol::TProtocol* in, + apache::thrift::protocol::TProtocol* out, + const std::string& fname, + int32_t seqid) = 0; +}; + +// Specialize TAsyncDispatchProcessorT for TProtocol and TDummyProtocol just to +// use the generic TDispatchProcessor. +template <> +class TAsyncDispatchProcessorT<protocol::TDummyProtocol> : public TAsyncDispatchProcessor {}; +template <> +class TAsyncDispatchProcessorT<protocol::TProtocol> : public TAsyncDispatchProcessor {}; +} +} +} // apache::thrift::async + +#endif // _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h new file mode 100644 index 0000000..033f7d9 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TASYNCPROCESSOR_H_ +#define _THRIFT_TASYNCPROCESSOR_H_ 1 + +#include <thrift/cxxfunctional.h> +#include <boost/shared_ptr.hpp> +#include <thrift/protocol/TProtocol.h> +#include <thrift/TProcessor.h> + +namespace apache { +namespace thrift { +namespace async { + +/** + * Async version of a TProcessor. It is not expected to complete by the time + * the call to process returns. Instead, it calls a cob to signal completion. + */ + +class TEventServer; // forward declaration + +class TAsyncProcessor { +public: + virtual ~TAsyncProcessor() {} + + virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, + boost::shared_ptr<protocol::TProtocol> in, + boost::shared_ptr<protocol::TProtocol> out) = 0; + + void process(apache::thrift::stdcxx::function<void(bool success)> _return, + boost::shared_ptr<apache::thrift::protocol::TProtocol> io) { + return process(_return, io, io); + } + + boost::shared_ptr<TProcessorEventHandler> getEventHandler() { return eventHandler_; } + + void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) { + eventHandler_ = eventHandler; + } + + const TEventServer* getAsyncServer() { return asyncServer_; } + +protected: + TAsyncProcessor() {} + + boost::shared_ptr<TProcessorEventHandler> eventHandler_; + const TEventServer* asyncServer_; + +private: + friend class TEventServer; + void setAsyncServer(const TEventServer* server) { asyncServer_ = server; } +}; + +class TAsyncProcessorFactory { +public: + virtual ~TAsyncProcessorFactory() {} + + /** + * Get the TAsyncProcessor to use for a particular connection. + * + * This method is always invoked in the same thread that the connection was + * accepted on. This generally means that this call does not need to be + * thread safe, as it will always be invoked from a single thread. + */ + virtual boost::shared_ptr<TAsyncProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; +}; +} +} +} // apache::thrift::async + +// XXX I'm lazy for now +namespace apache { +namespace thrift { +using apache::thrift::async::TAsyncProcessor; +} +} + +#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp new file mode 100644 index 0000000..5a4f347 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TAsyncProtocolProcessor.h> + +using apache::thrift::transport::TBufferBase; +using apache::thrift::protocol::TProtocol; + +namespace apache { +namespace thrift { +namespace async { + +void TAsyncProtocolProcessor::process(apache::thrift::stdcxx::function<void(bool healthy)> _return, + boost::shared_ptr<TBufferBase> ibuf, + boost::shared_ptr<TBufferBase> obuf) { + boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf)); + boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf)); + return underlying_ + ->process(apache::thrift::stdcxx::bind(&TAsyncProtocolProcessor::finish, + _return, + oprot, + apache::thrift::stdcxx::placeholders::_1), + iprot, + oprot); +} + +/* static */ void TAsyncProtocolProcessor::finish( + apache::thrift::stdcxx::function<void(bool healthy)> _return, + boost::shared_ptr<TProtocol> oprot, + bool healthy) { + (void)oprot; + // This is a stub function to hold a reference to oprot. + return _return(healthy); +} +} +} +} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h new file mode 100644 index 0000000..3f2b394 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TNAME_ME_H_ +#define _THRIFT_TNAME_ME_H_ 1 + +#include <thrift/async/TAsyncProcessor.h> +#include <thrift/async/TAsyncBufferProcessor.h> +#include <thrift/protocol/TProtocol.h> + +namespace apache { +namespace thrift { +namespace async { + +class TAsyncProtocolProcessor : public TAsyncBufferProcessor { +public: + TAsyncProtocolProcessor(boost::shared_ptr<TAsyncProcessor> underlying, + boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact) + : underlying_(underlying), pfact_(pfact) {} + + virtual void process(apache::thrift::stdcxx::function<void(bool healthy)> _return, + boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf, + boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf); + + virtual ~TAsyncProtocolProcessor() {} + +private: + static void finish(apache::thrift::stdcxx::function<void(bool healthy)> _return, + boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot, + bool healthy); + + boost::shared_ptr<TAsyncProcessor> underlying_; + boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact_; +}; +} +} +} // apache::thrift::async + +#endif // #ifndef _THRIFT_TNAME_ME_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp new file mode 100644 index 0000000..c7e27c0 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TConcurrentClientSyncInfo.h> +#include <thrift/TApplicationException.h> +#include <thrift/transport/TTransportException.h> +#include <limits> + +namespace apache { namespace thrift { namespace async { + +using namespace ::apache::thrift::concurrency; + +TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() : + stop_(false), + seqidMutex_(), + // test rollover all the time + nextseqid_((std::numeric_limits<int32_t>::max)()-10), + seqidToMonitorMap_(), + freeMonitors_(), + writeMutex_(), + readMutex_(), + recvPending_(false), + wakeupSomeone_(false), + seqidPending_(0), + fnamePending_(), + mtypePending_(::apache::thrift::protocol::T_CALL) +{ + freeMonitors_.reserve(MONITOR_CACHE_SIZE); +} + +bool TConcurrentClientSyncInfo::getPending( + std::string &fname, + ::apache::thrift::protocol::TMessageType &mtype, + int32_t &rseqid) +{ + if(stop_) + throwDeadConnection_(); + wakeupSomeone_ = false; + if(recvPending_) + { + recvPending_ = false; + rseqid = seqidPending_; + fname = fnamePending_; + mtype = mtypePending_; + return true; + } + return false; +} + +void TConcurrentClientSyncInfo::updatePending( + const std::string &fname, + ::apache::thrift::protocol::TMessageType mtype, + int32_t rseqid) +{ + recvPending_ = true; + seqidPending_ = rseqid; + fnamePending_ = fname; + mtypePending_ = mtype; + MonitorPtr monitor; + { + Guard seqidGuard(seqidMutex_); + MonitorMap::iterator i = seqidToMonitorMap_.find(rseqid); + if(i == seqidToMonitorMap_.end()) + throwBadSeqId_(); + monitor = i->second; + } + monitor->notify(); +} + +void TConcurrentClientSyncInfo::waitForWork(int32_t seqid) +{ + MonitorPtr m; + { + Guard seqidGuard(seqidMutex_); + m = seqidToMonitorMap_[seqid]; + } + while(true) + { + // be very careful about setting state in this loop that affects waking up. You may exit + // this function, attempt to grab some work, and someone else could have beaten you (or not + // left) the read mutex, and that will put you right back in this loop, with the mangled + // state you left behind. + if(stop_) + throwDeadConnection_(); + if(wakeupSomeone_) + return; + if(recvPending_ && seqidPending_ == seqid) + return; + m->waitForever(); + } +} + +void TConcurrentClientSyncInfo::throwBadSeqId_() +{ + throw apache::thrift::TApplicationException( + TApplicationException::BAD_SEQUENCE_ID, + "server sent a bad seqid"); +} + +void TConcurrentClientSyncInfo::throwDeadConnection_() +{ + throw apache::thrift::transport::TTransportException( + apache::thrift::transport::TTransportException::NOT_OPEN, + "this client died on another thread, and is now in an unusable state"); +} + +void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &) +{ + wakeupSomeone_ = true; + if(!seqidToMonitorMap_.empty()) + { + // The monitor map maps integers to monitors. Larger integers are more recent + // messages. Since this is ordered, it means that the last element is the most recent. + // We are trying to guess which thread will have its message complete next, so we are picking + // the most recent. The oldest message is likely to be some polling, long lived message. + // If we guess right, the thread we wake up will handle the message that comes in. + // If we guess wrong, the thread we wake up will hand off the work to the correct thread, + // costing us an extra context switch. + seqidToMonitorMap_.rbegin()->second->notify(); + } +} + +void TConcurrentClientSyncInfo::markBad_(const Guard &) +{ + wakeupSomeone_ = true; + stop_ = true; + for(MonitorMap::iterator i = seqidToMonitorMap_.begin(); i != seqidToMonitorMap_.end(); ++i) + i->second->notify(); +} + +TConcurrentClientSyncInfo::MonitorPtr +TConcurrentClientSyncInfo::newMonitor_(const Guard &) +{ + if(freeMonitors_.empty()) + return MonitorPtr(new Monitor(&readMutex_)); + MonitorPtr retval; + //swapping to avoid an atomic operation + retval.swap(freeMonitors_.back()); + freeMonitors_.pop_back(); + return retval; +} + +void TConcurrentClientSyncInfo::deleteMonitor_( + const Guard &, + TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/ +{ + if(freeMonitors_.size() > MONITOR_CACHE_SIZE) + { + m.reset(); + return; + } + //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor, + //so this shouldn't throw + freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr()); + //swapping to avoid an atomic operation + m.swap(freeMonitors_.back()); +} + +int32_t TConcurrentClientSyncInfo::generateSeqId() +{ + Guard seqidGuard(seqidMutex_); + if(stop_) + throwDeadConnection_(); + + if(!seqidToMonitorMap_.empty()) + if(nextseqid_ == seqidToMonitorMap_.begin()->first) + throw apache::thrift::TApplicationException( + TApplicationException::BAD_SEQUENCE_ID, + "about to repeat a seqid"); + int32_t newSeqId = nextseqid_++; + seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard); + return newSeqId; +} + +TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) : + sync_(*sync), + seqid_(seqid), + committed_(false) +{ + sync_.getReadMutex().lock(); +} + +TConcurrentRecvSentry::~TConcurrentRecvSentry() +{ + { + Guard seqidGuard(sync_.seqidMutex_); + sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]); + + sync_.seqidToMonitorMap_.erase(seqid_); + if(committed_) + sync_.wakeupAnyone_(seqidGuard); + else + sync_.markBad_(seqidGuard); + } + sync_.getReadMutex().unlock(); +} + +void TConcurrentRecvSentry::commit() +{ + committed_ = true; +} + +TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) : + sync_(*sync), + committed_(false) +{ + sync_.getWriteMutex().lock(); +} + +TConcurrentSendSentry::~TConcurrentSendSentry() +{ + if(!committed_) + { + Guard seqidGuard(sync_.seqidMutex_); + sync_.markBad_(seqidGuard); + } + sync_.getWriteMutex().unlock(); +} + +void TConcurrentSendSentry::commit() +{ + committed_ = true; +} + + +}}} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h new file mode 100644 index 0000000..8997a23 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ +#define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1 + +#include <thrift/protocol/TProtocol.h> +#include <thrift/concurrency/Mutex.h> +#include <thrift/concurrency/Monitor.h> +#include <boost/shared_ptr.hpp> +#include <vector> +#include <string> +#include <map> + +namespace apache { namespace thrift { namespace async { + +class TConcurrentClientSyncInfo; + +class TConcurrentSendSentry +{ +public: + explicit TConcurrentSendSentry(TConcurrentClientSyncInfo *sync); + ~TConcurrentSendSentry(); + + void commit(); +private: + TConcurrentClientSyncInfo &sync_; + bool committed_; +}; + +class TConcurrentRecvSentry +{ +public: + TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid); + ~TConcurrentRecvSentry(); + + void commit(); +private: + TConcurrentClientSyncInfo &sync_; + int32_t seqid_; + bool committed_; +}; + +class TConcurrentClientSyncInfo +{ +private: //typedefs + typedef boost::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr; + typedef std::map<int32_t, MonitorPtr> MonitorMap; +public: + TConcurrentClientSyncInfo(); + + int32_t generateSeqId(); + + bool getPending( + std::string &fname, + ::apache::thrift::protocol::TMessageType &mtype, + int32_t &rseqid); /* requires readMutex_ */ + + void updatePending( + const std::string &fname, + ::apache::thrift::protocol::TMessageType mtype, + int32_t rseqid); /* requires readMutex_ */ + + void waitForWork(int32_t seqid); /* requires readMutex_ */ + + ::apache::thrift::concurrency::Mutex &getReadMutex() {return readMutex_;} + ::apache::thrift::concurrency::Mutex &getWriteMutex() {return writeMutex_;} + +private: //constants + enum {MONITOR_CACHE_SIZE = 10}; +private: //functions + MonitorPtr newMonitor_( + const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ + void deleteMonitor_( + const ::apache::thrift::concurrency::Guard &seqidGuard, + MonitorPtr &m); /*noexcept*/ /* requires seqidMutex_ */ + void wakeupAnyone_( + const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ + void markBad_( + const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ + void throwBadSeqId_(); + void throwDeadConnection_(); +private: //data members + + volatile bool stop_; + + ::apache::thrift::concurrency::Mutex seqidMutex_; + // begin seqidMutex_ protected members + int32_t nextseqid_; + MonitorMap seqidToMonitorMap_; + std::vector<MonitorPtr> freeMonitors_; + // end seqidMutex_ protected members + + ::apache::thrift::concurrency::Mutex writeMutex_; + + ::apache::thrift::concurrency::Mutex readMutex_; + // begin readMutex_ protected members + bool recvPending_; + bool wakeupSomeone_; + int32_t seqidPending_; + std::string fnamePending_; + ::apache::thrift::protocol::TMessageType mtypePending_; + // end readMutex_ protected members + + + friend class TConcurrentSendSentry; + friend class TConcurrentRecvSentry; +}; + +}}} // apache::thrift::async + +#endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp new file mode 100644 index 0000000..1279bc6 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TEvhttpClientChannel.h> +#include <evhttp.h> +#include <event2/buffer.h> +#include <event2/buffer_compat.h> +#include <thrift/transport/TBufferTransports.h> +#include <thrift/protocol/TProtocolException.h> + +#include <iostream> +#include <sstream> + +using namespace apache::thrift::protocol; +using apache::thrift::transport::TTransportException; + +namespace apache { +namespace thrift { +namespace async { + +TEvhttpClientChannel::TEvhttpClientChannel(const std::string& host, + const std::string& path, + const char* address, + int port, + struct event_base* eb) + : host_(host), path_(path), recvBuf_(NULL), conn_(NULL) { + conn_ = evhttp_connection_new(address, port); + if (conn_ == NULL) { + throw TException("evhttp_connection_new failed"); + } + evhttp_connection_set_base(conn_, eb); +} + +TEvhttpClientChannel::~TEvhttpClientChannel() { + if (conn_ != NULL) { + evhttp_connection_free(conn_); + } +} + +void TEvhttpClientChannel::sendAndRecvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf) { + cob_ = cob; + recvBuf_ = recvBuf; + + struct evhttp_request* req = evhttp_request_new(response, this); + if (req == NULL) { + throw TException("evhttp_request_new failed"); + } + + int rv; + + rv = evhttp_add_header(req->output_headers, "Host", host_.c_str()); + if (rv != 0) { + throw TException("evhttp_add_header failed"); + } + + rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift"); + if (rv != 0) { + throw TException("evhttp_add_header failed"); + } + + uint8_t* obuf; + uint32_t sz; + sendBuf->getBuffer(&obuf, &sz); + rv = evbuffer_add(req->output_buffer, obuf, sz); + if (rv != 0) { + throw TException("evbuffer_add failed"); + } + + rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str()); + if (rv != 0) { + throw TException("evhttp_make_request failed"); + } +} + +void TEvhttpClientChannel::sendMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message) { + (void)cob; + (void)message; + throw TProtocolException(TProtocolException::NOT_IMPLEMENTED, + "Unexpected call to TEvhttpClientChannel::sendMessage"); +} + +void TEvhttpClientChannel::recvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message) { + (void)cob; + (void)message; + throw TProtocolException(TProtocolException::NOT_IMPLEMENTED, + "Unexpected call to TEvhttpClientChannel::recvMessage"); +} + +void TEvhttpClientChannel::finish(struct evhttp_request* req) { + if (req == NULL) { + try { + cob_(); + } catch (const TTransportException& e) { + if (e.getType() == TTransportException::END_OF_FILE) + throw TException("connect failed"); + else + throw; + } + return; + } else if (req->response_code != 200) { + try { + cob_(); + } catch (const TTransportException& e) { + std::stringstream ss; + ss << "server returned code " << req->response_code; + if (req->response_code_line) + ss << ": " << req->response_code_line; + if (e.getType() == TTransportException::END_OF_FILE) + throw TException(ss.str()); + else + throw; + } + return; + } + recvBuf_->resetBuffer(EVBUFFER_DATA(req->input_buffer), + static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer))); + cob_(); + return; +} + +/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) { + TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg; + try { + self->finish(req); + } catch (std::exception& e) { + // don't propagate a C++ exception in C code (e.g. libevent) + std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what() + << std::endl; + } +} +} +} +} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h new file mode 100644 index 0000000..72ed40f --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ +#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1 + +#include <string> +#include <boost/shared_ptr.hpp> +#include <thrift/async/TAsyncChannel.h> + +struct event_base; +struct evhttp_connection; +struct evhttp_request; + +namespace apache { +namespace thrift { +namespace transport { +class TMemoryBuffer; +} +} +} + +namespace apache { +namespace thrift { +namespace async { + +class TEvhttpClientChannel : public TAsyncChannel { +public: + using TAsyncChannel::VoidCallback; + + TEvhttpClientChannel(const std::string& host, + const std::string& path, + const char* address, + int port, + struct event_base* eb); + ~TEvhttpClientChannel(); + + virtual void sendAndRecvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* sendBuf, + apache::thrift::transport::TMemoryBuffer* recvBuf); + + virtual void sendMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message); + virtual void recvMessage(const VoidCallback& cob, + apache::thrift::transport::TMemoryBuffer* message); + + void finish(struct evhttp_request* req); + + // XXX + virtual bool good() const { return true; } + virtual bool error() const { return false; } + virtual bool timedOut() const { return false; } + +private: + static void response(struct evhttp_request* req, void* arg); + + std::string host_; + std::string path_; + VoidCallback cob_; + apache::thrift::transport::TMemoryBuffer* recvBuf_; + struct evhttp_connection* conn_; +}; +} +} +} // apache::thrift::async + +#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_
