http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp new file mode 100644 index 0000000..57d0d61 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/async/TEvhttpServer.h> +#include <thrift/async/TAsyncBufferProcessor.h> +#include <thrift/transport/TBufferTransports.h> +#include <evhttp.h> +#include <event2/buffer.h> +#include <event2/buffer_compat.h> + +#include <iostream> + +#ifndef HTTP_INTERNAL // libevent < 2 +#define HTTP_INTERNAL 500 +#endif + +using apache::thrift::transport::TMemoryBuffer; + +namespace apache { +namespace thrift { +namespace async { + +struct TEvhttpServer::RequestContext { + struct evhttp_request* req; + boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf; + boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf; + + RequestContext(struct evhttp_request* req); +}; + +TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor) + : processor_(processor), eb_(NULL), eh_(NULL) { +} + +TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port) + : processor_(processor), eb_(NULL), eh_(NULL) { + // Create event_base and evhttp. + eb_ = event_base_new(); + if (eb_ == NULL) { + throw TException("event_base_new failed"); + } + eh_ = evhttp_new(eb_); + if (eh_ == NULL) { + event_base_free(eb_); + throw TException("evhttp_new failed"); + } + + // Bind to port. + int ret = evhttp_bind_socket(eh_, NULL, port); + if (ret < 0) { + evhttp_free(eh_); + event_base_free(eb_); + throw TException("evhttp_bind_socket failed"); + } + + // Register a handler. If you use the other constructor, + // you will want to do this yourself. + // Don't forget to unregister before destorying this TEvhttpServer. + evhttp_set_cb(eh_, "/", request, (void*)this); +} + +TEvhttpServer::~TEvhttpServer() { + if (eh_ != NULL) { + evhttp_free(eh_); + } + if (eb_ != NULL) { + event_base_free(eb_); + } +} + +int TEvhttpServer::serve() { + if (eb_ == NULL) { + throw TException("Unexpected call to TEvhttpServer::serve"); + } + return event_base_dispatch(eb_); +} + +TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) + : req(req), + ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), + static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer)))), + obuf(new TMemoryBuffer()) { +} + +void TEvhttpServer::request(struct evhttp_request* req, void* self) { + try { + static_cast<TEvhttpServer*>(self)->process(req); + } catch (std::exception& e) { + evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0); + } +} + +void TEvhttpServer::process(struct evhttp_request* req) { + RequestContext* ctx = new RequestContext(req); + return processor_->process(apache::thrift::stdcxx::bind(&TEvhttpServer::complete, + this, + ctx, + apache::thrift::stdcxx::placeholders::_1), + ctx->ibuf, + ctx->obuf); +} + +void TEvhttpServer::complete(RequestContext* ctx, bool success) { + (void)success; + std::auto_ptr<RequestContext> ptr(ctx); + + int code = success ? 200 : 400; + const char* reason = success ? "OK" : "Bad Request"; + + int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift"); + if (rv != 0) { + // TODO: Log an error. + std::cerr << "evhttp_add_header failed " << __FILE__ << ":" << __LINE__ << std::endl; + } + + struct evbuffer* buf = evbuffer_new(); + if (buf == NULL) { + // TODO: Log an error. + std::cerr << "evbuffer_new failed " << __FILE__ << ":" << __LINE__ << std::endl; + } else { + uint8_t* obuf; + uint32_t sz; + ctx->obuf->getBuffer(&obuf, &sz); + int ret = evbuffer_add(buf, obuf, sz); + if (ret != 0) { + // TODO: Log an error. + std::cerr << "evhttp_add failed with " << ret << " " << __FILE__ << ":" << __LINE__ + << std::endl; + } + } + + evhttp_send_reply(ctx->req, code, reason, buf); + if (buf != NULL) { + evbuffer_free(buf); + } +} + +struct event_base* TEvhttpServer::getEventBase() { + return eb_; +} +} +} +} // apache::thrift::async
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h new file mode 100644 index 0000000..89bf337 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TEVHTTP_SERVER_H_ +#define _THRIFT_TEVHTTP_SERVER_H_ 1 + +#include <boost/shared_ptr.hpp> + +struct event_base; +struct evhttp; +struct evhttp_request; + +namespace apache { +namespace thrift { +namespace async { + +class TAsyncBufferProcessor; + +class TEvhttpServer { +public: + /** + * Create a TEvhttpServer for use with an external evhttp instance. + * Must be manually installed with evhttp_set_cb, using + * TEvhttpServer::request as the callback and the + * address of the server as the extra arg. + * Do not call "serve" on this server. + */ + TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor); + + /** + * Create a TEvhttpServer with an embedded event_base and evhttp, + * listening on port and responding on the endpoint "/". + * Call "serve" on this server to serve forever. + */ + TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port); + + ~TEvhttpServer(); + + static void request(struct evhttp_request* req, void* self); + int serve(); + + struct event_base* getEventBase(); + +private: + struct RequestContext; + + void process(struct evhttp_request* req); + void complete(RequestContext* ctx, bool success); + + boost::shared_ptr<TAsyncBufferProcessor> processor_; + struct event_base* eb_; + struct evhttp* eh_; +}; +} +} +} // apache::thrift::async + +#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp new file mode 100644 index 0000000..6c24d82 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Util.h> +#include <thrift/transport/PlatformSocket.h> +#include <assert.h> + +#include <boost/scoped_ptr.hpp> +#include <boost/thread.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Monitor implementation using the boost thread library + * + * @version $Id:$ + */ +class Monitor::Impl : public boost::condition_variable_any { + +public: + Impl() : ownedMutex_(new Mutex()), mutex_(NULL) { init(ownedMutex_.get()); } + + Impl(Mutex* mutex) : mutex_(NULL) { init(mutex); } + + Impl(Monitor* monitor) : mutex_(NULL) { init(&(monitor->mutex())); } + + Mutex& mutex() { return *mutex_; } + void lock() { mutex().lock(); } + void unlock() { mutex().unlock(); } + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms) { + int result = waitForTimeRelative(timeout_ms); + if (result == THRIFT_ETIMEDOUT) { + throw TimedOutException(); + } else if (result != 0) { + throw TException("Monitor::wait() failed"); + } + } + + /** + * Waits until the specified timeout in milliseconds for the condition to + * occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) { + if (timeout_ms == 0LL) { + return waitForever(); + } + + assert(mutex_); + boost::timed_mutex* mutexImpl + = reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); + int res + = timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(timeout_ms)) + ? 0 + : THRIFT_ETIMEDOUT; + lock.release(); + return res; + } + + /** + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast<long>(abstime->tv_sec); + temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) { + assert(mutex_); + boost::timed_mutex* mutexImpl = static_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); + + long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec); + long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec); + if (tv_sec < 0) + tv_sec = 0; + if (tv_usec < 0) + tv_usec = 0; + + boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); + int res = timed_wait(lock, + boost::get_system_time() + boost::posix_time::seconds(tv_sec) + + boost::posix_time::microseconds(tv_usec)) + ? 0 + : THRIFT_ETIMEDOUT; + lock.release(); + return res; + } + + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() { + assert(mutex_); + boost::timed_mutex* mutexImpl + = reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); + ((boost::condition_variable_any*)this)->wait(lock); + lock.release(); + return 0; + } + + void notify() { notify_one(); } + + void notifyAll() { notify_all(); } + +private: + void init(Mutex* mutex) { mutex_ = mutex; } + + boost::scoped_ptr<Mutex> ownedMutex_; + Mutex* mutex_; +}; + +Monitor::Monitor() : impl_(new Monitor::Impl()) { +} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { +} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { +} + +Monitor::~Monitor() { + delete impl_; +} + +Mutex& Monitor::mutex() const { + return const_cast<Monitor::Impl*>(impl_)->mutex(); +} + +void Monitor::lock() const { + const_cast<Monitor::Impl*>(impl_)->lock(); +} + +void Monitor::unlock() const { + const_cast<Monitor::Impl*>(impl_)->unlock(); +} + +void Monitor::wait(int64_t timeout) const { + const_cast<Monitor::Impl*>(impl_)->wait(timeout); +} + +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTimeRelative(int64_t timeout_ms) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); +} + +int Monitor::waitForever() const { + return const_cast<Monitor::Impl*>(impl_)->waitForever(); +} + +void Monitor::notify() const { + const_cast<Monitor::Impl*>(impl_)->notify(); +} + +void Monitor::notifyAll() const { + const_cast<Monitor::Impl*>(impl_)->notifyAll(); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp new file mode 100644 index 0000000..f7cadab --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/Mutex.h> +#include <thrift/concurrency/Util.h> +#include <thrift/Thrift.h> + +#include <cassert> +#include <boost/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/date_time/posix_time/posix_time.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Implementation of Mutex class using boost interprocess mutex + * + * @version $Id:$ + */ +class Mutex::impl : public boost::timed_mutex {}; + +Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) { + THRIFT_UNUSED_VARIABLE(init); +} + +void* Mutex::getUnderlyingImpl() const { + return impl_.get(); +} + +void Mutex::lock() const { + impl_->lock(); +} + +bool Mutex::trylock() const { + return impl_->try_lock(); +} + +bool Mutex::timedlock(int64_t ms) const { + return impl_->timed_lock(boost::get_system_time() + boost::posix_time::milliseconds(ms)); +} + +void Mutex::unlock() const { + impl_->unlock(); +} + +void Mutex::DEFAULT_INITIALIZER(void* arg) { + THRIFT_UNUSED_VARIABLE(arg); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp new file mode 100644 index 0000000..96cb6d6 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#if USE_BOOST_THREAD + +#include <thrift/concurrency/BoostThreadFactory.h> +#include <thrift/concurrency/Exception.h> + +#include <cassert> + +#include <boost/weak_ptr.hpp> +#include <boost/thread.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +using boost::shared_ptr; +using boost::weak_ptr; + +/** + * The boost thread class. + * + * @version $Id:$ + */ +class BoostThread : public Thread { +public: + enum STATE { uninitialized, starting, started, stopping, stopped }; + + static void* threadMain(void* arg); + +private: + std::auto_ptr<boost::thread> thread_; + STATE state_; + weak_ptr<BoostThread> self_; + bool detached_; + +public: + BoostThread(bool detached, shared_ptr<Runnable> runnable) + : state_(uninitialized), detached_(detached) { + this->Thread::runnable(runnable); + } + + ~BoostThread() { + if (!detached_) { + try { + join(); + } catch (...) { + // We're really hosed. + } + } + } + + void start() { + if (state_ != uninitialized) { + return; + } + + // Create reference + shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>(); + *selfRef = self_.lock(); + + state_ = starting; + + thread_ + = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef))); + + if (detached_) + thread_->detach(); + } + + void join() { + if (!detached_ && state_ != uninitialized) { + thread_->join(); + } + } + + Thread::id_t getId() { return thread_.get() ? thread_->get_id() : boost::thread::id(); } + + shared_ptr<Runnable> runnable() const { return Thread::runnable(); } + + void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); } + + void weakRef(shared_ptr<BoostThread> self) { + assert(self.get() == this); + self_ = weak_ptr<BoostThread>(self); + } +}; + +void* BoostThread::threadMain(void* arg) { + shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg; + delete reinterpret_cast<shared_ptr<BoostThread>*>(arg); + + if (!thread) { + return (void*)0; + } + + if (thread->state_ != starting) { + return (void*)0; + } + + thread->state_ = started; + thread->runnable()->run(); + + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; + } + return (void*)0; +} + +/** + * POSIX Thread factory implementation + */ +class BoostThreadFactory::Impl { + +private: + bool detached_; + +public: + Impl(bool detached) : detached_(detached) {} + + /** + * Creates a new POSIX thread to run the runnable object + * + * @param runnable A runnable object + */ + shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { + shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable)); + result->weakRef(result); + runnable->thread(result); + return result; + } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); } +}; + +BoostThreadFactory::BoostThreadFactory(bool detached) + : impl_(new BoostThreadFactory::Impl(detached)) { +} + +shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { + return impl_->newThread(runnable); +} + +bool BoostThreadFactory::isDetached() const { + return impl_->isDetached(); +} + +void BoostThreadFactory::setDetached(bool value) { + impl_->setDetached(value); +} + +Thread::id_t BoostThreadFactory::getCurrentThreadId() const { + return impl_->getCurrentThreadId(); +} +} +} +} // apache::thrift::concurrency + +#endif // USE_BOOST_THREAD http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h new file mode 100644 index 0000000..e6d1a56 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1 + +#include <thrift/concurrency/Thread.h> + +#include <boost/shared_ptr.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * A thread factory to create posix threads + * + * @version $Id:$ + */ +class BoostThreadFactory : public ThreadFactory { + +public: + /** + * Boost thread factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * Threads are created with the specified boost policy, priority, stack-size. A detachable thread + * is not joinable. + * + * By default threads are not joinable. + */ + + BoostThreadFactory(bool detached = true); + + // From ThreadFactory; + boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const; + + // From ThreadFactory; + Thread::id_t getCurrentThreadId() const; + + /** + * Sets detached mode of threads + */ + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + +private: + class Impl; + boost::shared_ptr<Impl> impl_; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h new file mode 100644 index 0000000..6438fda --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ +#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1 + +#include <exception> +#include <thrift/Thrift.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +class NoSuchTaskException : public apache::thrift::TException {}; + +class UncancellableTaskException : public apache::thrift::TException {}; + +class InvalidArgumentException : public apache::thrift::TException {}; + +class IllegalStateException : public apache::thrift::TException { +public: + IllegalStateException() {} + IllegalStateException(const std::string& message) : TException(message) {} +}; + +class TimedOutException : public apache::thrift::TException { +public: + TimedOutException() : TException("TimedOutException"){}; + TimedOutException(const std::string& message) : TException(message) {} +}; + +class TooManyPendingTasksException : public apache::thrift::TException { +public: + TooManyPendingTasksException() : TException("TooManyPendingTasksException"){}; + TooManyPendingTasksException(const std::string& message) : TException(message) {} +}; + +class SystemResourceException : public apache::thrift::TException { +public: + SystemResourceException() {} + + SystemResourceException(const std::string& message) : TException(message) {} +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h new file mode 100644 index 0000000..b776794 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H +#define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1 + +#include <thrift/cxxfunctional.h> +#include <thrift/concurrency/Thread.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Convenient implementation of Runnable that will execute arbitrary callbacks. + * Interfaces are provided to accept both a generic 'void(void)' callback, and + * a 'void* (void*)' pthread_create-style callback. + * + * Example use: + * void* my_thread_main(void* arg); + * shared_ptr<ThreadFactory> factory = ...; + * // To create a thread that executes my_thread_main once: + * shared_ptr<Thread> thread = factory->newThread( + * FunctionRunner::create(my_thread_main, some_argument)); + * thread->start(); + * + * bool A::foo(); + * A* a = new A(); + * // To create a thread that executes a.foo() every 100 milliseconds: + * factory->newThread(FunctionRunner::create( + * apache::thrift::stdcxx::bind(&A::foo, a), 100))->start(); + * + */ + +class FunctionRunner : public Runnable { +public: + // This is the type of callback 'pthread_create()' expects. + typedef void* (*PthreadFuncPtr)(void* arg); + // This a fully-generic void(void) callback for custom bindings. + typedef apache::thrift::stdcxx::function<void()> VoidFunc; + + typedef apache::thrift::stdcxx::function<bool()> BoolFunc; + + /** + * Syntactic sugar to make it easier to create new FunctionRunner + * objects wrapped in shared_ptr. + */ + static boost::shared_ptr<FunctionRunner> create(const VoidFunc& cob) { + return boost::shared_ptr<FunctionRunner>(new FunctionRunner(cob)); + } + + static boost::shared_ptr<FunctionRunner> create(PthreadFuncPtr func, void* arg) { + return boost::shared_ptr<FunctionRunner>(new FunctionRunner(func, arg)); + } + +private: + static void pthread_func_wrapper(PthreadFuncPtr func, void* arg) { + // discard return value + func(arg); + } + +public: + /** + * Given a 'pthread_create' style callback, this FunctionRunner will + * execute the given callback. Note that the 'void*' return value is ignored. + */ + FunctionRunner(PthreadFuncPtr func, void* arg) + : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg)) {} + + /** + * Given a generic callback, this FunctionRunner will execute it. + */ + FunctionRunner(const VoidFunc& cob) : func_(cob) {} + + /** + * Given a bool foo(...) type callback, FunctionRunner will execute + * the callback repeatedly with 'intervalMs' milliseconds between the calls, + * until it returns false. Note that the actual interval between calls will + * be intervalMs plus execution time of the callback. + */ + FunctionRunner(const BoolFunc& cob, int intervalMs) : repFunc_(cob), intervalMs_(intervalMs) {} + + void run() { + if (repFunc_) { + while (repFunc_()) { + THRIFT_SLEEP_USEC(intervalMs_ * 1000); + } + } else { + func_(); + } + } + +private: + VoidFunc func_; + BoolFunc repFunc_; + int intervalMs_; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp new file mode 100644 index 0000000..5e713c0 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Util.h> +#include <thrift/transport/PlatformSocket.h> + +#include <boost/scoped_ptr.hpp> + +#include <assert.h> + +#include <iostream> + +#include <pthread.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +using boost::scoped_ptr; + +/** + * Monitor implementation using the POSIX pthread library + * + * @version $Id:$ + */ +class Monitor::Impl { + +public: + Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) { + init(ownedMutex_.get()); + } + + Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); } + + Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); } + + ~Impl() { cleanup(); } + + Mutex& mutex() { return *mutex_; } + void lock() { mutex().lock(); } + void unlock() { mutex().unlock(); } + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms) const { + int result = waitForTimeRelative(timeout_ms); + if (result == THRIFT_ETIMEDOUT) { + // pthread_cond_timedwait has been observed to return early on + // various platforms, so comment out this assert. + // assert(Util::currentTime() >= (now + timeout)); + throw TimedOutException(); + } else if (result != 0) { + throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed"); + } + } + + /** + * Waits until the specified timeout in milliseconds for the condition to + * occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) const { + if (timeout_ms == 0LL) { + return waitForever(); + } + + struct THRIFT_TIMESPEC abstime; + Util::toTimespec(abstime, Util::currentTime() + timeout_ms); + return waitForTime(&abstime); + } + + /** + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) const { + assert(mutex_); + pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + // XXX Need to assert that caller owns mutex + return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime); + } + + int waitForTime(const struct timeval* abstime) const { + struct THRIFT_TIMESPEC temp; + temp.tv_sec = abstime->tv_sec; + temp.tv_nsec = abstime->tv_usec * 1000; + return waitForTime(&temp); + } + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() const { + assert(mutex_); + pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + return pthread_cond_wait(&pthread_cond_, mutexImpl); + } + + void notify() { + // XXX Need to assert that caller owns mutex + int iret = pthread_cond_signal(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); + assert(iret == 0); + } + + void notifyAll() { + // XXX Need to assert that caller owns mutex + int iret = pthread_cond_broadcast(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); + assert(iret == 0); + } + +private: + void init(Mutex* mutex) { + mutex_ = mutex; + + if (pthread_cond_init(&pthread_cond_, NULL) == 0) { + condInitialized_ = true; + } + + if (!condInitialized_) { + cleanup(); + throw SystemResourceException(); + } + } + + void cleanup() { + if (condInitialized_) { + condInitialized_ = false; + int iret = pthread_cond_destroy(&pthread_cond_); + THRIFT_UNUSED_VARIABLE(iret); + assert(iret == 0); + } + } + + scoped_ptr<Mutex> ownedMutex_; + Mutex* mutex_; + + mutable pthread_cond_t pthread_cond_; + mutable bool condInitialized_; +}; + +Monitor::Monitor() : impl_(new Monitor::Impl()) { +} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { +} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { +} + +Monitor::~Monitor() { + delete impl_; +} + +Mutex& Monitor::mutex() const { + return impl_->mutex(); +} + +void Monitor::lock() const { + impl_->lock(); +} + +void Monitor::unlock() const { + impl_->unlock(); +} + +void Monitor::wait(int64_t timeout) const { + impl_->wait(timeout); +} + +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return impl_->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { + return impl_->waitForTime(abstime); +} + +int Monitor::waitForTimeRelative(int64_t timeout_ms) const { + return impl_->waitForTimeRelative(timeout_ms); +} + +int Monitor::waitForever() const { + return impl_->waitForever(); +} + +void Monitor::notify() const { + impl_->notify(); +} + +void Monitor::notifyAll() const { + impl_->notifyAll(); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h new file mode 100644 index 0000000..5472f85 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_MONITOR_H_ +#define _THRIFT_CONCURRENCY_MONITOR_H_ 1 + +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Mutex.h> + +#include <boost/utility.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * A monitor is a combination mutex and condition-event. Waiting and + * notifying condition events requires that the caller own the mutex. Mutex + * lock and unlock operations can be performed independently of condition + * events. This is more or less analogous to java.lang.Object multi-thread + * operations. + * + * Note the Monitor can create a new, internal mutex; alternatively, a + * separate Mutex can be passed in and the Monitor will re-use it without + * taking ownership. It's the user's responsibility to make sure that the + * Mutex is not deallocated before the Monitor. + * + * Note that all methods are const. Monitors implement logical constness, not + * bit constness. This allows const methods to call monitor methods without + * needing to cast away constness or change to non-const signatures. + * + * @version $Id:$ + */ +class Monitor : boost::noncopyable { +public: + /** Creates a new mutex, and takes ownership of it. */ + Monitor(); + + /** Uses the provided mutex without taking ownership. */ + explicit Monitor(Mutex* mutex); + + /** Uses the mutex inside the provided Monitor without taking ownership. */ + explicit Monitor(Monitor* monitor); + + /** Deallocates the mutex only if we own it. */ + virtual ~Monitor(); + + Mutex& mutex() const; + + virtual void lock() const; + + virtual void unlock() const; + + /** + * Waits a maximum of the specified timeout in milliseconds for the condition + * to occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) const; + + /** + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) const; + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) const; + + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() const; + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms = 0LL) const; + + /** Wakes up one thread waiting on this monitor. */ + virtual void notify() const; + + /** Wakes up all waiting threads on this monitor. */ + virtual void notifyAll() const; + +private: + class Impl; + + Impl* impl_; +}; + +class Synchronized { +public: + Synchronized(const Monitor* monitor) : g(monitor->mutex()) {} + Synchronized(const Monitor& monitor) : g(monitor.mutex()) {} + +private: + Guard g; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.cpp new file mode 100644 index 0000000..d9921aa --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.cpp @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/Thrift.h> +#include <thrift/concurrency/Mutex.h> +#include <thrift/concurrency/Util.h> + +#include <assert.h> +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif +#include <signal.h> + +using boost::shared_ptr; + +namespace apache { +namespace thrift { +namespace concurrency { + +#ifndef THRIFT_NO_CONTENTION_PROFILING + +static sig_atomic_t mutexProfilingSampleRate = 0; +static MutexWaitCallback mutexProfilingCallback = 0; + +volatile static sig_atomic_t mutexProfilingCounter = 0; + +void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback) { + mutexProfilingSampleRate = profilingSampleRate; + mutexProfilingCallback = callback; +} + +#define PROFILE_MUTEX_START_LOCK() int64_t _lock_startTime = maybeGetProfilingStartTime(); + +#define PROFILE_MUTEX_NOT_LOCKED() \ + do { \ + if (_lock_startTime > 0) { \ + int64_t endTime = Util::currentTimeUsec(); \ + (*mutexProfilingCallback)(this, endTime - _lock_startTime); \ + } \ + } while (0) + +#define PROFILE_MUTEX_LOCKED() \ + do { \ + profileTime_ = _lock_startTime; \ + if (profileTime_ > 0) { \ + profileTime_ = Util::currentTimeUsec() - profileTime_; \ + } \ + } while (0) + +#define PROFILE_MUTEX_START_UNLOCK() \ + int64_t _temp_profileTime = profileTime_; \ + profileTime_ = 0; + +#define PROFILE_MUTEX_UNLOCKED() \ + do { \ + if (_temp_profileTime > 0) { \ + (*mutexProfilingCallback)(this, _temp_profileTime); \ + } \ + } while (0) + +static inline int64_t maybeGetProfilingStartTime() { + if (mutexProfilingSampleRate && mutexProfilingCallback) { + // This block is unsynchronized, but should produce a reasonable sampling + // rate on most architectures. The main race conditions are the gap + // between the decrement and the test, the non-atomicity of decrement, and + // potential caching of different values at different CPUs. + // + // - if two decrements race, the likeliest result is that the counter + // decrements slowly (perhaps much more slowly) than intended. + // + // - many threads could potentially decrement before resetting the counter + // to its large value, causing each additional incoming thread to + // profile every call. This situation is unlikely to persist for long + // as the critical gap is quite short, but profiling could be bursty. + sig_atomic_t localValue = --mutexProfilingCounter; + if (localValue <= 0) { + mutexProfilingCounter = mutexProfilingSampleRate; + return Util::currentTimeUsec(); + } + } + + return 0; +} + +#else +#define PROFILE_MUTEX_START_LOCK() +#define PROFILE_MUTEX_NOT_LOCKED() +#define PROFILE_MUTEX_LOCKED() +#define PROFILE_MUTEX_START_UNLOCK() +#define PROFILE_MUTEX_UNLOCKED() +#endif // THRIFT_NO_CONTENTION_PROFILING + +/** + * Implementation of Mutex class using POSIX mutex + * + * @version $Id:$ + */ +class Mutex::impl { +public: + impl(Initializer init) : initialized_(false) { +#ifndef THRIFT_NO_CONTENTION_PROFILING + profileTime_ = 0; +#endif + init(&pthread_mutex_); + initialized_ = true; + } + + ~impl() { + if (initialized_) { + initialized_ = false; + int ret = pthread_mutex_destroy(&pthread_mutex_); + THRIFT_UNUSED_VARIABLE(ret); + assert(ret == 0); + } + } + + void lock() const { + PROFILE_MUTEX_START_LOCK(); + pthread_mutex_lock(&pthread_mutex_); + PROFILE_MUTEX_LOCKED(); + } + + bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); } + + bool timedlock(int64_t milliseconds) const { +#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L + PROFILE_MUTEX_START_LOCK(); + + struct THRIFT_TIMESPEC ts; + Util::toTimespec(ts, milliseconds + Util::currentTime()); + int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts); + if (ret == 0) { + PROFILE_MUTEX_LOCKED(); + return true; + } + + PROFILE_MUTEX_NOT_LOCKED(); + return false; +#else + /* Otherwise follow solution used by Mono for Android */ + struct THRIFT_TIMESPEC sleepytime, now, to; + + /* This is just to avoid a completely busy wait */ + sleepytime.tv_sec = 0; + sleepytime.tv_nsec = 10000000L; /* 10ms */ + + Util::toTimespec(to, milliseconds + Util::currentTime()); + + while ((trylock()) == false) { + Util::toTimespec(now, Util::currentTime()); + if (now.tv_sec >= to.tv_sec && now.tv_nsec >= to.tv_nsec) { + return false; + } + nanosleep(&sleepytime, NULL); + } + + return true; +#endif + } + + void unlock() const { + PROFILE_MUTEX_START_UNLOCK(); + pthread_mutex_unlock(&pthread_mutex_); + PROFILE_MUTEX_UNLOCKED(); + } + + void* getUnderlyingImpl() const { return (void*)&pthread_mutex_; } + +private: + mutable pthread_mutex_t pthread_mutex_; + mutable bool initialized_; +#ifndef THRIFT_NO_CONTENTION_PROFILING + mutable int64_t profileTime_; +#endif +}; + +Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) { +} + +void* Mutex::getUnderlyingImpl() const { + return impl_->getUnderlyingImpl(); +} + +void Mutex::lock() const { + impl_->lock(); +} + +bool Mutex::trylock() const { + return impl_->trylock(); +} + +bool Mutex::timedlock(int64_t ms) const { + return impl_->timedlock(ms); +} + +void Mutex::unlock() const { + impl_->unlock(); +} + +void Mutex::DEFAULT_INITIALIZER(void* arg) { + pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg; + int ret = pthread_mutex_init(pthread_mutex, NULL); + THRIFT_UNUSED_VARIABLE(ret); + assert(ret == 0); +} + +#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) \ + || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP) +static void init_with_kind(pthread_mutex_t* mutex, int kind) { + pthread_mutexattr_t mutexattr; + int ret = pthread_mutexattr_init(&mutexattr); + assert(ret == 0); + + // Apparently, this can fail. Should we really be aborting? + ret = pthread_mutexattr_settype(&mutexattr, kind); + assert(ret == 0); + + ret = pthread_mutex_init(mutex, &mutexattr); + assert(ret == 0); + + ret = pthread_mutexattr_destroy(&mutexattr); + assert(ret == 0); + THRIFT_UNUSED_VARIABLE(ret); +} +#endif + +#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP +void Mutex::ADAPTIVE_INITIALIZER(void* arg) { + // From mysql source: mysys/my_thr_init.c + // Set mutex type to "fast" a.k.a "adaptive" + // + // In this case the thread may steal the mutex from some other thread + // that is waiting for the same mutex. This will save us some + // context switches but may cause a thread to 'starve forever' while + // waiting for the mutex (not likely if the code within the mutex is + // short). + init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP); +} +#endif + +#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP +void Mutex::RECURSIVE_INITIALIZER(void* arg) { + init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP); +} +#endif + +/** + * Implementation of ReadWriteMutex class using POSIX rw lock + * + * @version $Id:$ + */ +class ReadWriteMutex::impl { +public: + impl() : initialized_(false) { +#ifndef THRIFT_NO_CONTENTION_PROFILING + profileTime_ = 0; +#endif + int ret = pthread_rwlock_init(&rw_lock_, NULL); + THRIFT_UNUSED_VARIABLE(ret); + assert(ret == 0); + initialized_ = true; + } + + ~impl() { + if (initialized_) { + initialized_ = false; + int ret = pthread_rwlock_destroy(&rw_lock_); + THRIFT_UNUSED_VARIABLE(ret); + assert(ret == 0); + } + } + + void acquireRead() const { + PROFILE_MUTEX_START_LOCK(); + pthread_rwlock_rdlock(&rw_lock_); + PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path + } + + void acquireWrite() const { + PROFILE_MUTEX_START_LOCK(); + pthread_rwlock_wrlock(&rw_lock_); + PROFILE_MUTEX_LOCKED(); + } + + bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); } + + bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); } + + void release() const { + PROFILE_MUTEX_START_UNLOCK(); + pthread_rwlock_unlock(&rw_lock_); + PROFILE_MUTEX_UNLOCKED(); + } + +private: + mutable pthread_rwlock_t rw_lock_; + mutable bool initialized_; +#ifndef THRIFT_NO_CONTENTION_PROFILING + mutable int64_t profileTime_; +#endif +}; + +ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) { +} + +void ReadWriteMutex::acquireRead() const { + impl_->acquireRead(); +} + +void ReadWriteMutex::acquireWrite() const { + impl_->acquireWrite(); +} + +bool ReadWriteMutex::attemptRead() const { + return impl_->attemptRead(); +} + +bool ReadWriteMutex::attemptWrite() const { + return impl_->attemptWrite(); +} + +void ReadWriteMutex::release() const { + impl_->release(); +} + +NoStarveReadWriteMutex::NoStarveReadWriteMutex() : writerWaiting_(false) { +} + +void NoStarveReadWriteMutex::acquireRead() const { + if (writerWaiting_) { + // writer is waiting, block on the writer's mutex until he's done with it + mutex_.lock(); + mutex_.unlock(); + } + + ReadWriteMutex::acquireRead(); +} + +void NoStarveReadWriteMutex::acquireWrite() const { + // if we can acquire the rwlock the easy way, we're done + if (attemptWrite()) { + return; + } + + // failed to get the rwlock, do it the hard way: + // locking the mutex and setting writerWaiting will cause all new readers to + // block on the mutex rather than on the rwlock. + mutex_.lock(); + writerWaiting_ = true; + ReadWriteMutex::acquireWrite(); + writerWaiting_ = false; + mutex_.unlock(); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.h new file mode 100644 index 0000000..6f892dc --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Mutex.h @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_MUTEX_H_ +#define _THRIFT_CONCURRENCY_MUTEX_H_ 1 + +#include <boost/shared_ptr.hpp> +#include <boost/noncopyable.hpp> +#include <stdint.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +#ifndef THRIFT_NO_CONTENTION_PROFILING + +/** + * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to + * profile their blocking acquire methods. If this value is set to non-zero, + * Thrift will attempt to invoke the callback once every profilingSampleRate + * times. However, as the sampling is not synchronized the rate is not + * guranateed, and could be subject to big bursts and swings. Please ensure + * your sampling callback is as performant as your application requires. + * + * The callback will get called with the wait time taken to lock the mutex in + * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex) + * being locked. + * + * The enableMutexProfiling() function is unsynchronized; calling this function + * while profiling is already enabled may result in race conditions. On + * architectures where a pointer assignment is atomic, this is safe but there + * is no guarantee threads will agree on a single callback within any + * particular time period. + */ +typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros); +void enableMutexProfiling(int32_t profilingSampleRate, MutexWaitCallback callback); + +#endif + +/** + * A simple mutex class + * + * @version $Id:$ + */ +class Mutex { +public: + typedef void (*Initializer)(void*); + + Mutex(Initializer init = DEFAULT_INITIALIZER); + virtual ~Mutex() {} + virtual void lock() const; + virtual bool trylock() const; + virtual bool timedlock(int64_t milliseconds) const; + virtual void unlock() const; + + void* getUnderlyingImpl() const; + + static void DEFAULT_INITIALIZER(void*); + static void ADAPTIVE_INITIALIZER(void*); + static void RECURSIVE_INITIALIZER(void*); + +private: + class impl; + boost::shared_ptr<impl> impl_; +}; + +class ReadWriteMutex { +public: + ReadWriteMutex(); + virtual ~ReadWriteMutex() {} + + // these get the lock and block until it is done successfully + virtual void acquireRead() const; + virtual void acquireWrite() const; + + // these attempt to get the lock, returning false immediately if they fail + virtual bool attemptRead() const; + virtual bool attemptWrite() const; + + // this releases both read and write locks + virtual void release() const; + +private: + class impl; + boost::shared_ptr<impl> impl_; +}; + +/** + * A ReadWriteMutex that guarantees writers will not be starved by readers: + * When a writer attempts to acquire the mutex, all new readers will be + * blocked from acquiring the mutex until the writer has acquired and + * released it. In some operating systems, this may already be guaranteed + * by a regular ReadWriteMutex. + */ +class NoStarveReadWriteMutex : public ReadWriteMutex { +public: + NoStarveReadWriteMutex(); + + virtual void acquireRead() const; + virtual void acquireWrite() const; + +private: + Mutex mutex_; + mutable volatile bool writerWaiting_; +}; + +class Guard : boost::noncopyable { +public: + Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) { + if (timeout == 0) { + value.lock(); + } else if (timeout < 0) { + if (!value.trylock()) { + mutex_ = NULL; + } + } else { + if (!value.timedlock(timeout)) { + mutex_ = NULL; + } + } + } + ~Guard() { + if (mutex_) { + mutex_->unlock(); + } + } + + operator bool() const { return (mutex_ != NULL); } + +private: + const Mutex* mutex_; +}; + +// Can be used as second argument to RWGuard to make code more readable +// as to whether we're doing acquireRead() or acquireWrite(). +enum RWGuardType { RW_READ = 0, RW_WRITE = 1 }; + +class RWGuard : boost::noncopyable { +public: + RWGuard(const ReadWriteMutex& value, bool write = false) : rw_mutex_(value) { + if (write) { + rw_mutex_.acquireWrite(); + } else { + rw_mutex_.acquireRead(); + } + } + + RWGuard(const ReadWriteMutex& value, RWGuardType type) : rw_mutex_(value) { + if (type == RW_WRITE) { + rw_mutex_.acquireWrite(); + } else { + rw_mutex_.acquireRead(); + } + } + ~RWGuard() { rw_mutex_.release(); } + +private: + const ReadWriteMutex& rw_mutex_; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h new file mode 100644 index 0000000..545b572 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PlatformThreadFactory.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ 1 + +// clang-format off +#include <thrift/thrift-config.h> +#if USE_BOOST_THREAD +# include <thrift/concurrency/BoostThreadFactory.h> +#elif USE_STD_THREAD +# include <thrift/concurrency/StdThreadFactory.h> +#else +# include <thrift/concurrency/PosixThreadFactory.h> +#endif +// clang-format on + +namespace apache { +namespace thrift { +namespace concurrency { + +// clang-format off +#if USE_BOOST_THREAD + typedef BoostThreadFactory PlatformThreadFactory; +#elif USE_STD_THREAD + typedef StdThreadFactory PlatformThreadFactory; +#else + typedef PosixThreadFactory PlatformThreadFactory; +#endif +// clang-format on + +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_PLATFORMTHREADFACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp new file mode 100644 index 0000000..47c5034 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.cpp @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/PosixThreadFactory.h> +#include <thrift/concurrency/Exception.h> + +#if GOOGLE_PERFTOOLS_REGISTER_THREAD +#include <google/profiler.h> +#endif + +#include <assert.h> +#include <pthread.h> + +#include <iostream> + +#include <boost/weak_ptr.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +using boost::shared_ptr; +using boost::weak_ptr; + +/** + * The POSIX thread class. + * + * @version $Id:$ + */ +class PthreadThread : public Thread { +public: + enum STATE { uninitialized, starting, started, stopping, stopped }; + + static const int MB = 1024 * 1024; + + static void* threadMain(void* arg); + +private: + pthread_t pthread_; + STATE state_; + int policy_; + int priority_; + int stackSize_; + weak_ptr<PthreadThread> self_; + bool detached_; + +public: + PthreadThread(int policy, + int priority, + int stackSize, + bool detached, + shared_ptr<Runnable> runnable) + : + +#ifndef _WIN32 + pthread_(0), +#endif // _WIN32 + + state_(uninitialized), + policy_(policy), + priority_(priority), + stackSize_(stackSize), + detached_(detached) { + + this->Thread::runnable(runnable); + } + + ~PthreadThread() { + /* Nothing references this thread, if is is not detached, do a join + now, otherwise the thread-id and, possibly, other resources will + be leaked. */ + if (!detached_) { + try { + join(); + } catch (...) { + // We're really hosed. + } + } + } + + void start() { + if (state_ != uninitialized) { + return; + } + + pthread_attr_t thread_attr; + if (pthread_attr_init(&thread_attr) != 0) { + throw SystemResourceException("pthread_attr_init failed"); + } + + if (pthread_attr_setdetachstate(&thread_attr, + detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE) + != 0) { + throw SystemResourceException("pthread_attr_setdetachstate failed"); + } + + // Set thread stack size + if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) { + throw SystemResourceException("pthread_attr_setstacksize failed"); + } + +// Set thread policy +#ifdef _WIN32 + // WIN32 Pthread implementation doesn't seem to support sheduling policies other then + // PosixThreadFactory::OTHER - runtime error + policy_ = PosixThreadFactory::OTHER; +#endif + + if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) { + throw SystemResourceException("pthread_attr_setschedpolicy failed"); + } + + struct sched_param sched_param; + sched_param.sched_priority = priority_; + + // Set thread priority + if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) { + throw SystemResourceException("pthread_attr_setschedparam failed"); + } + + // Create reference + shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>(); + *selfRef = self_.lock(); + + state_ = starting; + + if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) { + throw SystemResourceException("pthread_create failed"); + } + } + + void join() { + if (!detached_ && state_ != uninitialized) { + void* ignore; + /* XXX + If join fails it is most likely due to the fact + that the last reference was the thread itself and cannot + join. This results in leaked threads and will eventually + cause the process to run out of thread resources. + We're beyond the point of throwing an exception. Not clear how + best to handle this. */ + int res = pthread_join(pthread_, &ignore); + detached_ = (res == 0); + if (res != 0) { + GlobalOutput.printf("PthreadThread::join(): fail with code %d", res); + } + } else { + GlobalOutput.printf("PthreadThread::join(): detached thread"); + } + } + + Thread::id_t getId() { + +#ifndef _WIN32 + return (Thread::id_t)pthread_; +#else + return (Thread::id_t)pthread_.p; +#endif // _WIN32 + } + + shared_ptr<Runnable> runnable() const { return Thread::runnable(); } + + void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); } + + void weakRef(shared_ptr<PthreadThread> self) { + assert(self.get() == this); + self_ = weak_ptr<PthreadThread>(self); + } +}; + +void* PthreadThread::threadMain(void* arg) { + shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg; + delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg); + + if (thread == NULL) { + return (void*)0; + } + + if (thread->state_ != starting) { + return (void*)0; + } + +#if GOOGLE_PERFTOOLS_REGISTER_THREAD + ProfilerRegisterThread(); +#endif + + thread->state_ = started; + thread->runnable()->run(); + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; + } + + return (void*)0; +} + +/** + * POSIX Thread factory implementation + */ +class PosixThreadFactory::Impl { + +private: + POLICY policy_; + PRIORITY priority_; + int stackSize_; + bool detached_; + + /** + * Converts generic posix thread schedule policy enums into pthread + * API values. + */ + static int toPthreadPolicy(POLICY policy) { + switch (policy) { + case OTHER: + return SCHED_OTHER; + case FIFO: + return SCHED_FIFO; + case ROUND_ROBIN: + return SCHED_RR; + } + return SCHED_OTHER; + } + + /** + * Converts relative thread priorities to absolute value based on posix + * thread scheduler policy + * + * The idea is simply to divide up the priority range for the given policy + * into the correpsonding relative priority level (lowest..highest) and + * then pro-rate accordingly. + */ + static int toPthreadPriority(POLICY policy, PRIORITY priority) { + int pthread_policy = toPthreadPolicy(policy); + int min_priority = 0; + int max_priority = 0; +#ifdef HAVE_SCHED_GET_PRIORITY_MIN + min_priority = sched_get_priority_min(pthread_policy); +#endif +#ifdef HAVE_SCHED_GET_PRIORITY_MAX + max_priority = sched_get_priority_max(pthread_policy); +#endif + int quanta = (HIGHEST - LOWEST) + 1; + float stepsperquanta = (float)(max_priority - min_priority) / quanta; + + if (priority <= HIGHEST) { + return (int)(min_priority + stepsperquanta * priority); + } else { + // should never get here for priority increments. + assert(false); + return (int)(min_priority + stepsperquanta * NORMAL); + } + } + +public: + Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) + : policy_(policy), priority_(priority), stackSize_(stackSize), detached_(detached) {} + + /** + * Creates a new POSIX thread to run the runnable object + * + * @param runnable A runnable object + */ + shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { + shared_ptr<PthreadThread> result + = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), + toPthreadPriority(policy_, priority_), + stackSize_, + detached_, + runnable)); + result->weakRef(result); + runnable->thread(result); + return result; + } + + int getStackSize() const { return stackSize_; } + + void setStackSize(int value) { stackSize_ = value; } + + PRIORITY getPriority() const { return priority_; } + + /** + * Sets priority. + * + * XXX + * Need to handle incremental priorities properly. + */ + void setPriority(PRIORITY value) { priority_ = value; } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const { + +#ifndef _WIN32 + return (Thread::id_t)pthread_self(); +#else + return (Thread::id_t)pthread_self().p; +#endif // _WIN32 + } +}; + +PosixThreadFactory::PosixThreadFactory(POLICY policy, + PRIORITY priority, + int stackSize, + bool detached) + : impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) { +} + +shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { + return impl_->newThread(runnable); +} + +int PosixThreadFactory::getStackSize() const { + return impl_->getStackSize(); +} + +void PosixThreadFactory::setStackSize(int value) { + impl_->setStackSize(value); +} + +PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { + return impl_->getPriority(); +} + +void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { + impl_->setPriority(value); +} + +bool PosixThreadFactory::isDetached() const { + return impl_->isDetached(); +} + +void PosixThreadFactory::setDetached(bool value) { + impl_->setDetached(value); +} + +Thread::id_t PosixThreadFactory::getCurrentThreadId() const { + return impl_->getCurrentThreadId(); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h new file mode 100644 index 0000000..b26d296 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/PosixThreadFactory.h @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1 + +#include <thrift/concurrency/Thread.h> + +#include <boost/shared_ptr.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * A thread factory to create posix threads + * + * @version $Id:$ + */ +class PosixThreadFactory : public ThreadFactory { + +public: + /** + * POSIX Thread scheduler policies + */ + enum POLICY { OTHER, FIFO, ROUND_ROBIN }; + + /** + * POSIX Thread scheduler relative priorities, + * + * Absolute priority is determined by scheduler policy and OS. This + * enumeration specifies relative priorities such that one can specify a + * priority within a giving scheduler policy without knowing the absolute + * value of the priority. + */ + enum PRIORITY { + LOWEST = 0, + LOWER = 1, + LOW = 2, + NORMAL = 3, + HIGH = 4, + HIGHER = 5, + HIGHEST = 6, + INCREMENT = 7, + DECREMENT = 8 + }; + + /** + * Posix thread (pthread) factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * Threads are created with the specified policy, priority, stack-size and detachable-mode + * detached means the thread is free-running and will release all system resources the + * when it completes. A detachable thread is not joinable. The join method + * of a detachable thread will return immediately with no error. + * + * By default threads are not joinable. + */ + + PosixThreadFactory(POLICY policy = ROUND_ROBIN, + PRIORITY priority = NORMAL, + int stackSize = 1, + bool detached = true); + + // From ThreadFactory; + boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const; + + // From ThreadFactory; + Thread::id_t getCurrentThreadId() const; + + /** + * Gets stack size for created threads + * + * @return int size in megabytes + */ + virtual int getStackSize() const; + + /** + * Sets stack size for created threads + * + * @param value size in megabytes + */ + virtual void setStackSize(int value); + + /** + * Gets priority relative to current policy + */ + virtual PRIORITY getPriority() const; + + /** + * Sets priority relative to current policy + */ + virtual void setPriority(PRIORITY priority); + + /** + * Sets detached mode of threads + */ + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + +private: + class Impl; + boost::shared_ptr<Impl> impl_; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMonitor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMonitor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMonitor.cpp new file mode 100644 index 0000000..7b3b209 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMonitor.cpp @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Util.h> +#include <thrift/transport/PlatformSocket.h> +#include <assert.h> + +#include <condition_variable> +#include <chrono> +#include <thread> +#include <mutex> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Monitor implementation using the std thread library + * + * @version $Id:$ + */ +class Monitor::Impl { + +public: + Impl() : ownedMutex_(new Mutex()), conditionVariable_(), mutex_(NULL) { init(ownedMutex_.get()); } + + Impl(Mutex* mutex) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { init(mutex); } + + Impl(Monitor* monitor) : ownedMutex_(), conditionVariable_(), mutex_(NULL) { + init(&(monitor->mutex())); + } + + Mutex& mutex() { return *mutex_; } + void lock() { mutex_->lock(); } + void unlock() { mutex_->unlock(); } + + /** + * Exception-throwing version of waitForTimeRelative(), called simply + * wait(int64) for historical reasons. Timeout is in milliseconds. + * + * If the condition occurs, this function returns cleanly; on timeout or + * error an exception is thrown. + */ + void wait(int64_t timeout_ms) { + int result = waitForTimeRelative(timeout_ms); + if (result == THRIFT_ETIMEDOUT) { + throw TimedOutException(); + } else if (result != 0) { + throw TException("Monitor::wait() failed"); + } + } + + /** + * Waits until the specified timeout in milliseconds for the condition to + * occur, or waits forever if timeout_ms == 0. + * + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTimeRelative(int64_t timeout_ms) { + if (timeout_ms == 0LL) { + return waitForever(); + } + + assert(mutex_); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, std::chrono::milliseconds(timeout_ms)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); + } + + /** + * Waits until the absolute time specified using struct THRIFT_TIMESPEC. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const THRIFT_TIMESPEC* abstime) { + struct timeval temp; + temp.tv_sec = static_cast<long>(abstime->tv_sec); + temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000; + return waitForTime(&temp); + } + + /** + * Waits until the absolute time specified using struct timeval. + * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. + */ + int waitForTime(const struct timeval* abstime) { + assert(mutex_); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + struct timeval currenttime; + Util::toTimeval(currenttime, Util::currentTime()); + + long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec); + long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec); + if (tv_sec < 0) + tv_sec = 0; + if (tv_usec < 0) + tv_usec = 0; + + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + bool timedout = (conditionVariable_.wait_for(lock, + std::chrono::seconds(tv_sec) + + std::chrono::microseconds(tv_usec)) + == std::cv_status::timeout); + lock.release(); + return (timedout ? THRIFT_ETIMEDOUT : 0); + } + + /** + * Waits forever until the condition occurs. + * Returns 0 if condition occurs, or an error code otherwise. + */ + int waitForever() { + assert(mutex_); + std::timed_mutex* mutexImpl = static_cast<std::timed_mutex*>(mutex_->getUnderlyingImpl()); + assert(mutexImpl); + + std::unique_lock<std::timed_mutex> lock(*mutexImpl, std::adopt_lock); + conditionVariable_.wait(lock); + lock.release(); + return 0; + } + + void notify() { conditionVariable_.notify_one(); } + + void notifyAll() { conditionVariable_.notify_all(); } + +private: + void init(Mutex* mutex) { mutex_ = mutex; } + + const std::unique_ptr<Mutex> ownedMutex_; + std::condition_variable_any conditionVariable_; + Mutex* mutex_; +}; + +Monitor::Monitor() : impl_(new Monitor::Impl()) { +} +Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { +} +Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { +} + +Monitor::~Monitor() { + delete impl_; +} + +Mutex& Monitor::mutex() const { + return const_cast<Monitor::Impl*>(impl_)->mutex(); +} + +void Monitor::lock() const { + const_cast<Monitor::Impl*>(impl_)->lock(); +} + +void Monitor::unlock() const { + const_cast<Monitor::Impl*>(impl_)->unlock(); +} + +void Monitor::wait(int64_t timeout) const { + const_cast<Monitor::Impl*>(impl_)->wait(timeout); +} + +int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTime(const timeval* abstime) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); +} + +int Monitor::waitForTimeRelative(int64_t timeout_ms) const { + return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); +} + +int Monitor::waitForever() const { + return const_cast<Monitor::Impl*>(impl_)->waitForever(); +} + +void Monitor::notify() const { + const_cast<Monitor::Impl*>(impl_)->notify(); +} + +void Monitor::notifyAll() const { + const_cast<Monitor::Impl*>(impl_)->notifyAll(); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMutex.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMutex.cpp new file mode 100644 index 0000000..69678a2 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdMutex.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/Mutex.h> +#include <thrift/concurrency/Util.h> + +#include <cassert> +#include <chrono> +#include <mutex> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Implementation of Mutex class using C++11 std::timed_mutex + * + * @version $Id:$ + */ +class Mutex::impl : public std::timed_mutex {}; + +Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) { +} + +void* Mutex::getUnderlyingImpl() const { + return impl_.get(); +} + +void Mutex::lock() const { + impl_->lock(); +} + +bool Mutex::trylock() const { + return impl_->try_lock(); +} + +bool Mutex::timedlock(int64_t ms) const { + return impl_->try_lock_for(std::chrono::milliseconds(ms)); +} + +void Mutex::unlock() const { + impl_->unlock(); +} + +void Mutex::DEFAULT_INITIALIZER(void* arg) { +} +} +} +} // apache::thrift::concurrency
