http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp new file mode 100644 index 0000000..d57e7ec --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.cpp @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#if USE_STD_THREAD + +#include <thrift/concurrency/StdThreadFactory.h> +#include <thrift/concurrency/Exception.h> + +#include <cassert> + +#include <boost/enable_shared_from_this.hpp> +#include <boost/weak_ptr.hpp> +#include <thread> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * The C++11 thread class. + * + * Note that we use boost shared_ptr rather than std shared_ptrs here + * because the Thread/Runnable classes use those and we don't want to + * mix them. + * + * @version $Id:$ + */ +class StdThread : public Thread, public boost::enable_shared_from_this<StdThread> { +public: + enum STATE { uninitialized, starting, started, stopping, stopped }; + + static void threadMain(boost::shared_ptr<StdThread> thread); + +private: + std::unique_ptr<std::thread> thread_; + STATE state_; + bool detached_; + +public: + StdThread(bool detached, boost::shared_ptr<Runnable> runnable) + : state_(uninitialized), detached_(detached) { + this->Thread::runnable(runnable); + } + + ~StdThread() { + if (!detached_) { + try { + join(); + } catch (...) { + // We're really hosed. + } + } + } + + void start() { + if (state_ != uninitialized) { + return; + } + + boost::shared_ptr<StdThread> selfRef = shared_from_this(); + state_ = starting; + + thread_ = std::unique_ptr<std::thread>(new std::thread(threadMain, selfRef)); + + if (detached_) + thread_->detach(); + } + + void join() { + if (!detached_ && state_ != uninitialized) { + thread_->join(); + } + } + + Thread::id_t getId() { return thread_.get() ? thread_->get_id() : std::thread::id(); } + + boost::shared_ptr<Runnable> runnable() const { return Thread::runnable(); } + + void runnable(boost::shared_ptr<Runnable> value) { Thread::runnable(value); } +}; + +void StdThread::threadMain(boost::shared_ptr<StdThread> thread) { + if (thread == NULL) { + return; + } + + if (thread->state_ != starting) { + return; + } + + thread->state_ = started; + thread->runnable()->run(); + + if (thread->state_ != stopping && thread->state_ != stopped) { + thread->state_ = stopping; + } + + return; +} + +/** + * std::thread factory implementation + */ +class StdThreadFactory::Impl { + +private: + bool detached_; + +public: + Impl(bool detached) : detached_(detached) {} + + /** + * Creates a new std::thread to run the runnable object + * + * @param runnable A runnable object + */ + boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const { + boost::shared_ptr<StdThread> result + = boost::shared_ptr<StdThread>(new StdThread(detached_, runnable)); + runnable->thread(result); + return result; + } + + bool isDetached() const { return detached_; } + + void setDetached(bool value) { detached_ = value; } + + Thread::id_t getCurrentThreadId() const { return std::this_thread::get_id(); } +}; + +StdThreadFactory::StdThreadFactory(bool detached) : impl_(new StdThreadFactory::Impl(detached)) { +} + +boost::shared_ptr<Thread> StdThreadFactory::newThread(boost::shared_ptr<Runnable> runnable) const { + return impl_->newThread(runnable); +} + +bool StdThreadFactory::isDetached() const { + return impl_->isDetached(); +} + +void StdThreadFactory::setDetached(bool value) { + impl_->setDetached(value); +} + +Thread::id_t StdThreadFactory::getCurrentThreadId() const { + return impl_->getCurrentThreadId(); +} +} +} +} // apache::thrift::concurrency + +#endif // USE_STD_THREAD
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h new file mode 100644 index 0000000..fb86bbf --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/StdThreadFactory.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ +#define _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ 1 + +#include <thrift/concurrency/Thread.h> + +#include <boost/shared_ptr.hpp> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * A thread factory to create std::threads. + * + * @version $Id:$ + */ +class StdThreadFactory : public ThreadFactory { + +public: + /** + * Std thread factory. All threads created by a factory are reference-counted + * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and + * the Runnable tasks they host will be properly cleaned up once the last strong reference + * to both is given up. + * + * By default threads are not joinable. + */ + + StdThreadFactory(bool detached = true); + + // From ThreadFactory; + boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const; + + // From ThreadFactory; + Thread::id_t getCurrentThreadId() const; + + /** + * Sets detached mode of threads + */ + virtual void setDetached(bool detached); + + /** + * Gets current detached mode + */ + virtual bool isDetached() const; + +private: + class Impl; + boost::shared_ptr<Impl> impl_; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_STDTHREADFACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h new file mode 100644 index 0000000..f5eb3a8 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Thread.h @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_THREAD_H_ +#define _THRIFT_CONCURRENCY_THREAD_H_ 1 + +#include <stdint.h> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> + +#include <thrift/thrift-config.h> + +#if USE_BOOST_THREAD +#include <boost/thread.hpp> +#elif USE_STD_THREAD +#include <thread> +#else +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#endif +#endif + +namespace apache { +namespace thrift { +namespace concurrency { + +class Thread; + +/** + * Minimal runnable class. More or less analogous to java.lang.Runnable. + * + * @version $Id:$ + */ +class Runnable { + +public: + virtual ~Runnable(){}; + virtual void run() = 0; + + /** + * Gets the thread object that is hosting this runnable object - can return + * an empty boost::shared pointer if no references remain on that thread object + */ + virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); } + + /** + * Sets the thread that is executing this object. This is only meant for + * use by concrete implementations of Thread. + */ + virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; } + +private: + boost::weak_ptr<Thread> thread_; +}; + +/** + * Minimal thread class. Returned by thread factory bound to a Runnable object + * and ready to start execution. More or less analogous to java.lang.Thread + * (minus all the thread group, priority, mode and other baggage, since that + * is difficult to abstract across platforms and is left for platform-specific + * ThreadFactory implemtations to deal with + * + * @see apache::thrift::concurrency::ThreadFactory) + */ +class Thread { + +public: +#if USE_BOOST_THREAD + typedef boost::thread::id id_t; + + static inline bool is_current(id_t t) { return t == boost::this_thread::get_id(); } + static inline id_t get_current() { return boost::this_thread::get_id(); } +#elif USE_STD_THREAD + typedef std::thread::id id_t; + + static inline bool is_current(id_t t) { return t == std::this_thread::get_id(); } + static inline id_t get_current() { return std::this_thread::get_id(); } +#else + typedef pthread_t id_t; + + static inline bool is_current(id_t t) { return pthread_equal(pthread_self(), t); } + static inline id_t get_current() { return pthread_self(); } +#endif + + virtual ~Thread(){}; + + /** + * Starts the thread. Does platform specific thread creation and + * configuration then invokes the run method of the Runnable object bound + * to this thread. + */ + virtual void start() = 0; + + /** + * Join this thread. Current thread blocks until this target thread + * completes. + */ + virtual void join() = 0; + + /** + * Gets the thread's platform-specific ID + */ + virtual id_t getId() = 0; + + /** + * Gets the runnable object this thread is hosting + */ + virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; } + +protected: + virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; } + +private: + boost::shared_ptr<Runnable> _runnable; +}; + +/** + * Factory to create platform-specific thread object and bind them to Runnable + * object for execution + */ +class ThreadFactory { + +public: + virtual ~ThreadFactory() {} + virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0; + + /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread + */ + + static const Thread::id_t unknown_thread_id; + + virtual Thread::id_t getCurrentThreadId() const = 0; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp new file mode 100644 index 0000000..a2b44d4 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp @@ -0,0 +1,561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/concurrency/ThreadManager.h> +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Util.h> + +#include <boost/shared_ptr.hpp> + +#include <assert.h> +#include <queue> +#include <set> + +#if defined(DEBUG) +#include <iostream> +#endif // defined(DEBUG) + +namespace apache { +namespace thrift { +namespace concurrency { + +using boost::shared_ptr; +using boost::dynamic_pointer_cast; + +/** + * ThreadManager class + * + * This class manages a pool of threads. It uses a ThreadFactory to create + * threads. It never actually creates or destroys worker threads, rather + * it maintains statistics on number of idle threads, number of active threads, + * task backlog, and average wait and service times. + * + * @version $Id:$ + */ +class ThreadManager::Impl : public ThreadManager { + +public: + Impl() + : workerCount_(0), + workerMaxCount_(0), + idleCount_(0), + pendingTaskCountMax_(0), + expiredCount_(0), + state_(ThreadManager::UNINITIALIZED), + monitor_(&mutex_), + maxMonitor_(&mutex_) {} + + ~Impl() { stop(); } + + void start(); + + void stop() { stopImpl(false); } + + void join() { stopImpl(true); } + + ThreadManager::STATE state() const { return state_; } + + shared_ptr<ThreadFactory> threadFactory() const { + Synchronized s(monitor_); + return threadFactory_; + } + + void threadFactory(shared_ptr<ThreadFactory> value) { + Synchronized s(monitor_); + threadFactory_ = value; + } + + void addWorker(size_t value); + + void removeWorker(size_t value); + + size_t idleWorkerCount() const { return idleCount_; } + + size_t workerCount() const { + Synchronized s(monitor_); + return workerCount_; + } + + size_t pendingTaskCount() const { + Synchronized s(monitor_); + return tasks_.size(); + } + + size_t totalTaskCount() const { + Synchronized s(monitor_); + return tasks_.size() + workerCount_ - idleCount_; + } + + size_t pendingTaskCountMax() const { + Synchronized s(monitor_); + return pendingTaskCountMax_; + } + + size_t expiredTaskCount() { + Synchronized s(monitor_); + size_t result = expiredCount_; + expiredCount_ = 0; + return result; + } + + void pendingTaskCountMax(const size_t value) { + Synchronized s(monitor_); + pendingTaskCountMax_ = value; + } + + bool canSleep(); + + void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration); + + void remove(shared_ptr<Runnable> task); + + shared_ptr<Runnable> removeNextPending(); + + void removeExpiredTasks(); + + void setExpireCallback(ExpireCallback expireCallback); + +private: + void stopImpl(bool join); + + size_t workerCount_; + size_t workerMaxCount_; + size_t idleCount_; + size_t pendingTaskCountMax_; + size_t expiredCount_; + ExpireCallback expireCallback_; + + ThreadManager::STATE state_; + shared_ptr<ThreadFactory> threadFactory_; + + friend class ThreadManager::Task; + std::queue<shared_ptr<Task> > tasks_; + Mutex mutex_; + Monitor monitor_; + Monitor maxMonitor_; + Monitor workerMonitor_; + + friend class ThreadManager::Worker; + std::set<shared_ptr<Thread> > workers_; + std::set<shared_ptr<Thread> > deadWorkers_; + std::map<const Thread::id_t, shared_ptr<Thread> > idMap_; +}; + +class ThreadManager::Task : public Runnable { + +public: + enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; + + Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL) + : runnable_(runnable), + state_(WAITING), + expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {} + + ~Task() {} + + void run() { + if (state_ == EXECUTING) { + runnable_->run(); + state_ = COMPLETE; + } + } + + shared_ptr<Runnable> getRunnable() { return runnable_; } + + int64_t getExpireTime() const { return expireTime_; } + +private: + shared_ptr<Runnable> runnable_; + friend class ThreadManager::Worker; + STATE state_; + int64_t expireTime_; +}; + +class ThreadManager::Worker : public Runnable { + enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; + +public: + Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {} + + ~Worker() {} + +private: + bool isActive() const { + return (manager_->workerCount_ <= manager_->workerMaxCount_) + || (manager_->state_ == JOINING && !manager_->tasks_.empty()); + } + +public: + /** + * Worker entry point + * + * As long as worker thread is running, pull tasks off the task queue and + * execute. + */ + void run() { + bool active = false; + bool notifyManager = false; + + /** + * Increment worker semaphore and notify manager if worker count reached + * desired max + * + * Note: We have to release the monitor and acquire the workerMonitor + * since that is what the manager blocks on for worker add/remove + */ + { + Synchronized s(manager_->monitor_); + active = manager_->workerCount_ < manager_->workerMaxCount_; + if (active) { + manager_->workerCount_++; + notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; + } + } + + if (notifyManager) { + Synchronized s(manager_->workerMonitor_); + manager_->workerMonitor_.notify(); + notifyManager = false; + } + + while (active) { + shared_ptr<ThreadManager::Task> task; + + /** + * While holding manager monitor block for non-empty task queue (Also + * check that the thread hasn't been requested to stop). Once the queue + * is non-empty, dequeue a task, release monitor, and execute. If the + * worker max count has been decremented such that we exceed it, mark + * ourself inactive, decrement the worker count and notify the manager + * (technically we're notifying the next blocked thread but eventually + * the manager will see it. + */ + { + Guard g(manager_->mutex_); + active = isActive(); + + while (active && manager_->tasks_.empty()) { + manager_->idleCount_++; + idle_ = true; + manager_->monitor_.wait(); + active = isActive(); + idle_ = false; + manager_->idleCount_--; + } + + if (active) { + manager_->removeExpiredTasks(); + + if (!manager_->tasks_.empty()) { + task = manager_->tasks_.front(); + manager_->tasks_.pop(); + if (task->state_ == ThreadManager::Task::WAITING) { + task->state_ = ThreadManager::Task::EXECUTING; + } + } + + /* If we have a pending task max and we just dropped below it, wakeup any + thread that might be blocked on add. */ + if (manager_->pendingTaskCountMax_ != 0 + && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) { + manager_->maxMonitor_.notify(); + } + } else { + idle_ = true; + manager_->workerCount_--; + notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_); + } + } + + if (task) { + if (task->state_ == ThreadManager::Task::EXECUTING) { + try { + task->run(); + } catch (const std::exception& e) { + GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what()); + } catch (...) { + GlobalOutput.printf("[ERROR] task->run() raised an unknown exception"); + } + } + } + } + + { + Synchronized s(manager_->workerMonitor_); + manager_->deadWorkers_.insert(this->thread()); + if (notifyManager) { + manager_->workerMonitor_.notify(); + } + } + + return; + } + +private: + ThreadManager::Impl* manager_; + friend class ThreadManager::Impl; + STATE state_; + bool idle_; +}; + +void ThreadManager::Impl::addWorker(size_t value) { + std::set<shared_ptr<Thread> > newThreads; + for (size_t ix = 0; ix < value; ix++) { + shared_ptr<ThreadManager::Worker> worker + = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this)); + newThreads.insert(threadFactory_->newThread(worker)); + } + + { + Synchronized s(monitor_); + workerMaxCount_ += value; + workers_.insert(newThreads.begin(), newThreads.end()); + } + + for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); + ++ix) { + shared_ptr<ThreadManager::Worker> worker + = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable()); + worker->state_ = ThreadManager::Worker::STARTING; + (*ix)->start(); + idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix)); + } + + { + Synchronized s(workerMonitor_); + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); + } + } +} + +void ThreadManager::Impl::start() { + + if (state_ == ThreadManager::STOPPED) { + return; + } + + { + Synchronized s(monitor_); + if (state_ == ThreadManager::UNINITIALIZED) { + if (!threadFactory_) { + throw InvalidArgumentException(); + } + state_ = ThreadManager::STARTED; + monitor_.notifyAll(); + } + + while (state_ == STARTING) { + monitor_.wait(); + } + } +} + +void ThreadManager::Impl::stopImpl(bool join) { + bool doStop = false; + if (state_ == ThreadManager::STOPPED) { + return; + } + + { + Synchronized s(monitor_); + if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING + && state_ != ThreadManager::STOPPED) { + doStop = true; + state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING; + } + } + + if (doStop) { + removeWorker(workerCount_); + } + + // XXX + // should be able to block here for transition to STOPPED since we're no + // using shared_ptrs + + { + Synchronized s(monitor_); + state_ = ThreadManager::STOPPED; + } +} + +void ThreadManager::Impl::removeWorker(size_t value) { + std::set<shared_ptr<Thread> > removedThreads; + { + Synchronized s(monitor_); + if (value > workerMaxCount_) { + throw InvalidArgumentException(); + } + + workerMaxCount_ -= value; + + if (idleCount_ < value) { + for (size_t ix = 0; ix < idleCount_; ix++) { + monitor_.notify(); + } + } else { + monitor_.notifyAll(); + } + } + + { + Synchronized s(workerMonitor_); + + while (workerCount_ != workerMaxCount_) { + workerMonitor_.wait(); + } + + for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); + ix != deadWorkers_.end(); + ++ix) { + idMap_.erase((*ix)->getId()); + workers_.erase(*ix); + } + + deadWorkers_.clear(); + } +} + +bool ThreadManager::Impl::canSleep() { + const Thread::id_t id = threadFactory_->getCurrentThreadId(); + return idMap_.find(id) == idMap_.end(); +} + +void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) { + Guard g(mutex_, timeout); + + if (!g) { + throw TimedOutException(); + } + + if (state_ != ThreadManager::STARTED) { + throw IllegalStateException( + "ThreadManager::Impl::add ThreadManager " + "not started"); + } + + removeExpiredTasks(); + if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) { + if (canSleep() && timeout >= 0) { + while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) { + // This is thread safe because the mutex is shared between monitors. + maxMonitor_.wait(timeout); + } + } else { + throw TooManyPendingTasksException(); + } + } + + tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration))); + + // If idle thread is available notify it, otherwise all worker threads are + // running and will get around to this task in time. + if (idleCount_ > 0) { + monitor_.notify(); + } +} + +void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { + (void)task; + Synchronized s(monitor_); + if (state_ != ThreadManager::STARTED) { + throw IllegalStateException( + "ThreadManager::Impl::remove ThreadManager not " + "started"); + } +} + +boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() { + Guard g(mutex_); + if (state_ != ThreadManager::STARTED) { + throw IllegalStateException( + "ThreadManager::Impl::removeNextPending " + "ThreadManager not started"); + } + + if (tasks_.empty()) { + return boost::shared_ptr<Runnable>(); + } + + shared_ptr<ThreadManager::Task> task = tasks_.front(); + tasks_.pop(); + + return task->getRunnable(); +} + +void ThreadManager::Impl::removeExpiredTasks() { + int64_t now = 0LL; // we won't ask for the time untile we need it + + // note that this loop breaks at the first non-expiring task + while (!tasks_.empty()) { + shared_ptr<ThreadManager::Task> task = tasks_.front(); + if (task->getExpireTime() == 0LL) { + break; + } + if (now == 0LL) { + now = Util::currentTime(); + } + if (task->getExpireTime() > now) { + break; + } + if (expireCallback_) { + expireCallback_(task->getRunnable()); + } + tasks_.pop(); + expiredCount_++; + } +} + +void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) { + expireCallback_ = expireCallback; +} + +class SimpleThreadManager : public ThreadManager::Impl { + +public: + SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0) + : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {} + + void start() { + ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_); + ThreadManager::Impl::start(); + addWorker(workerCount_); + } + +private: + const size_t workerCount_; + const size_t pendingTaskCountMax_; + Monitor monitor_; +}; + +shared_ptr<ThreadManager> ThreadManager::newThreadManager() { + return shared_ptr<ThreadManager>(new ThreadManager::Impl()); +} + +shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, + size_t pendingTaskCountMax) { + return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax)); +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h new file mode 100644 index 0000000..2112845 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.h @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ +#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1 + +#include <boost/shared_ptr.hpp> +#include <thrift/cxxfunctional.h> +#include <sys/types.h> +#include <thrift/concurrency/Thread.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Thread Pool Manager and related classes + * + * @version $Id:$ + */ +class ThreadManager; + +/** + * ThreadManager class + * + * This class manages a pool of threads. It uses a ThreadFactory to create + * threads. It never actually creates or destroys worker threads, rather + * it maintains statistics on number of idle threads, number of active threads, + * task backlog, and average wait and service times and informs the PoolPolicy + * object bound to instances of this manager of interesting transitions. It is + * then up the PoolPolicy object to decide if the thread pool size needs to be + * adjusted and call this object addWorker and removeWorker methods to make + * changes. + * + * This design allows different policy implementations to use this code to + * handle basic worker thread management and worker task execution and focus on + * policy issues. The simplest policy, StaticPolicy, does nothing other than + * create a fixed number of threads. + */ +class ThreadManager { + +protected: + ThreadManager() {} + +public: + typedef apache::thrift::stdcxx::function<void(boost::shared_ptr<Runnable>)> ExpireCallback; + + virtual ~ThreadManager() {} + + /** + * Starts the thread manager. Verifies all attributes have been properly + * initialized, then allocates necessary resources to begin operation + */ + virtual void start() = 0; + + /** + * Stops the thread manager. Aborts all remaining unprocessed task, shuts + * down all created worker threads, and realeases all allocated resources. + * This method blocks for all worker threads to complete, thus it can + * potentially block forever if a worker thread is running a task that + * won't terminate. + */ + virtual void stop() = 0; + + /** + * Joins the thread manager. This is the same as stop, except that it will + * block until all the workers have finished their work. At that point + * the ThreadManager will transition into the STOPPED state. + */ + virtual void join() = 0; + + enum STATE { UNINITIALIZED, STARTING, STARTED, JOINING, STOPPING, STOPPED }; + + virtual STATE state() const = 0; + + virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0; + + virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0; + + virtual void addWorker(size_t value = 1) = 0; + + virtual void removeWorker(size_t value = 1) = 0; + + /** + * Gets the current number of idle worker threads + */ + virtual size_t idleWorkerCount() const = 0; + + /** + * Gets the current number of total worker threads + */ + virtual size_t workerCount() const = 0; + + /** + * Gets the current number of pending tasks + */ + virtual size_t pendingTaskCount() const = 0; + + /** + * Gets the current number of pending and executing tasks + */ + virtual size_t totalTaskCount() const = 0; + + /** + * Gets the maximum pending task count. 0 indicates no maximum + */ + virtual size_t pendingTaskCountMax() const = 0; + + /** + * Gets the number of tasks which have been expired without being run. + */ + virtual size_t expiredTaskCount() = 0; + + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount() + * is greater than or equalt to pendingTaskCountMax(). If this method is called in the + * context of a ThreadManager worker thread it will throw a + * TooManyPendingTasksException + * + * @param task The task to queue for execution + * + * @param timeout Time to wait in milliseconds to add a task when a pending-task-count + * is specified. Specific cases: + * timeout = 0 : Wait forever to queue task. + * timeout = -1 : Return immediately if pending task count exceeds specified max + * @param expiration when nonzero, the number of milliseconds the task is valid + * to be run; if exceeded, the task will be dropped off the queue and not run. + * + * @throws TooManyPendingTasksException Pending task count exceeds max pending task count + */ + virtual void add(boost::shared_ptr<Runnable> task, + int64_t timeout = 0LL, + int64_t expiration = 0LL) = 0; + + /** + * Removes a pending task + */ + virtual void remove(boost::shared_ptr<Runnable> task) = 0; + + /** + * Remove the next pending task which would be run. + * + * @return the task removed. + */ + virtual boost::shared_ptr<Runnable> removeNextPending() = 0; + + /** + * Remove tasks from front of task queue that have expired. + */ + virtual void removeExpiredTasks() = 0; + + /** + * Set a callback to be called when a task is expired and not run. + * + * @param expireCallback a function called with the shared_ptr<Runnable> for + * the expired task. + */ + virtual void setExpireCallback(ExpireCallback expireCallback) = 0; + + static boost::shared_ptr<ThreadManager> newThreadManager(); + + /** + * Creates a simple thread manager the uses count number of worker threads and has + * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit + * on pending tasks + */ + static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4, + size_t pendingTaskCountMax = 0); + + class Task; + + class Worker; + + class Impl; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp new file mode 100644 index 0000000..122d26e --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.cpp @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/concurrency/TimerManager.h> +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Util.h> + +#include <assert.h> +#include <iostream> +#include <set> + +namespace apache { +namespace thrift { +namespace concurrency { + +using boost::shared_ptr; + +/** + * TimerManager class + * + * @version $Id:$ + */ +class TimerManager::Task : public Runnable { + +public: + enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE }; + + Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {} + + ~Task() {} + + void run() { + if (state_ == EXECUTING) { + runnable_->run(); + state_ = COMPLETE; + } + } + +private: + shared_ptr<Runnable> runnable_; + friend class TimerManager::Dispatcher; + STATE state_; +}; + +class TimerManager::Dispatcher : public Runnable { + +public: + Dispatcher(TimerManager* manager) : manager_(manager) {} + + ~Dispatcher() {} + + /** + * Dispatcher entry point + * + * As long as dispatcher thread is running, pull tasks off the task taskMap_ + * and execute. + */ + void run() { + { + Synchronized s(manager_->monitor_); + if (manager_->state_ == TimerManager::STARTING) { + manager_->state_ = TimerManager::STARTED; + manager_->monitor_.notifyAll(); + } + } + + do { + std::set<shared_ptr<TimerManager::Task> > expiredTasks; + { + Synchronized s(manager_->monitor_); + task_iterator expiredTaskEnd; + int64_t now = Util::currentTime(); + while (manager_->state_ == TimerManager::STARTED + && (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) + == manager_->taskMap_.begin()) { + int64_t timeout = 0LL; + if (!manager_->taskMap_.empty()) { + timeout = manager_->taskMap_.begin()->first - now; + } + assert((timeout != 0 && manager_->taskCount_ > 0) + || (timeout == 0 && manager_->taskCount_ == 0)); + try { + manager_->monitor_.wait(timeout); + } catch (TimedOutException&) { + } + now = Util::currentTime(); + } + + if (manager_->state_ == TimerManager::STARTED) { + for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { + shared_ptr<TimerManager::Task> task = ix->second; + expiredTasks.insert(task); + if (task->state_ == TimerManager::Task::WAITING) { + task->state_ = TimerManager::Task::EXECUTING; + } + manager_->taskCount_--; + } + manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); + } + } + + for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); + ix != expiredTasks.end(); + ++ix) { + (*ix)->run(); + } + + } while (manager_->state_ == TimerManager::STARTED); + + { + Synchronized s(manager_->monitor_); + if (manager_->state_ == TimerManager::STOPPING) { + manager_->state_ = TimerManager::STOPPED; + manager_->monitor_.notify(); + } + } + return; + } + +private: + TimerManager* manager_; + friend class TimerManager; +}; + +#if defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4355) // 'this' used in base member initializer list +#endif + +TimerManager::TimerManager() + : taskCount_(0), + state_(TimerManager::UNINITIALIZED), + dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) { +} + +#if defined(_MSC_VER) +#pragma warning(pop) +#endif + +TimerManager::~TimerManager() { + + // If we haven't been explicitly stopped, do so now. We don't need to grab + // the monitor here, since stop already takes care of reentrancy. + + if (state_ != STOPPED) { + try { + stop(); + } catch (...) { + throw; + // uhoh + } + } +} + +void TimerManager::start() { + bool doStart = false; + { + Synchronized s(monitor_); + if (!threadFactory_) { + throw InvalidArgumentException(); + } + if (state_ == TimerManager::UNINITIALIZED) { + state_ = TimerManager::STARTING; + doStart = true; + } + } + + if (doStart) { + dispatcherThread_ = threadFactory_->newThread(dispatcher_); + dispatcherThread_->start(); + } + + { + Synchronized s(monitor_); + while (state_ == TimerManager::STARTING) { + monitor_.wait(); + } + assert(state_ != TimerManager::STARTING); + } +} + +void TimerManager::stop() { + bool doStop = false; + { + Synchronized s(monitor_); + if (state_ == TimerManager::UNINITIALIZED) { + state_ = TimerManager::STOPPED; + } else if (state_ != STOPPING && state_ != STOPPED) { + doStop = true; + state_ = STOPPING; + monitor_.notifyAll(); + } + while (state_ != STOPPED) { + monitor_.wait(); + } + } + + if (doStop) { + // Clean up any outstanding tasks + taskMap_.clear(); + + // Remove dispatcher's reference to us. + dispatcher_->manager_ = NULL; + } +} + +shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { + Synchronized s(monitor_); + return threadFactory_; +} + +void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) { + Synchronized s(monitor_); + threadFactory_ = value; +} + +size_t TimerManager::taskCount() const { + return taskCount_; +} + +void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) { + int64_t now = Util::currentTime(); + timeout += now; + + { + Synchronized s(monitor_); + if (state_ != TimerManager::STARTED) { + throw IllegalStateException(); + } + + // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him + // if the expiration time is shorter than the current value. Need to test before we insert, + // because the new task might insert at the front. + bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first; + + taskCount_++; + taskMap_.insert( + std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task)))); + + // If the task map was empty, or if we have an expiration that is earlier + // than any previously seen, kick the dispatcher so it can update its + // timeout + if (notifyRequired) { + monitor_.notify(); + } + } +} + +void TimerManager::add(shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& value) { + + int64_t expiration; + Util::toMilliseconds(expiration, value); + + int64_t now = Util::currentTime(); + + if (expiration < now) { + throw InvalidArgumentException(); + } + + add(task, expiration - now); +} + +void TimerManager::add(shared_ptr<Runnable> task, const struct timeval& value) { + + int64_t expiration; + Util::toMilliseconds(expiration, value); + + int64_t now = Util::currentTime(); + + if (expiration < now) { + throw InvalidArgumentException(); + } + + add(task, expiration - now); +} + +void TimerManager::remove(shared_ptr<Runnable> task) { + (void)task; + Synchronized s(monitor_); + if (state_ != TimerManager::STARTED) { + throw IllegalStateException(); + } +} + +TimerManager::STATE TimerManager::state() const { + return state_; +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h new file mode 100644 index 0000000..3946827 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/TimerManager.h @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_ +#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1 + +#include <thrift/concurrency/Exception.h> +#include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/Thread.h> + +#include <boost/shared_ptr.hpp> +#include <map> +#include <time.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Timer Manager + * + * This class dispatches timer tasks when they fall due. + * + * @version $Id:$ + */ +class TimerManager { + +public: + TimerManager(); + + virtual ~TimerManager(); + + virtual boost::shared_ptr<const ThreadFactory> threadFactory() const; + + virtual void threadFactory(boost::shared_ptr<const ThreadFactory> value); + + /** + * Starts the timer manager service + * + * @throws IllegalArgumentException Missing thread factory attribute + */ + virtual void start(); + + /** + * Stops the timer manager service + */ + virtual void stop(); + + virtual size_t taskCount() const; + + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Time in milliseconds to delay before executing task + */ + virtual void add(boost::shared_ptr<Runnable> task, int64_t timeout); + + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Absolute time in the future to execute task. + */ + virtual void add(boost::shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& timeout); + + /** + * Adds a task to be executed at some time in the future by a worker thread. + * + * @param task The task to execute + * @param timeout Absolute time in the future to execute task. + */ + virtual void add(boost::shared_ptr<Runnable> task, const struct timeval& timeout); + + /** + * Removes a pending task + * + * @throws NoSuchTaskException Specified task doesn't exist. It was either + * processed already or this call was made for a + * task that was never added to this timer + * + * @throws UncancellableTaskException Specified task is already being + * executed or has completed execution. + */ + virtual void remove(boost::shared_ptr<Runnable> task); + + enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; + + virtual STATE state() const; + +private: + boost::shared_ptr<const ThreadFactory> threadFactory_; + class Task; + friend class Task; + std::multimap<int64_t, boost::shared_ptr<Task> > taskMap_; + size_t taskCount_; + Monitor monitor_; + STATE state_; + class Dispatcher; + friend class Dispatcher; + boost::shared_ptr<Dispatcher> dispatcher_; + boost::shared_ptr<Thread> dispatcherThread_; + typedef std::multimap<int64_t, boost::shared_ptr<TimerManager::Task> >::iterator task_iterator; + typedef std::pair<task_iterator, task_iterator> task_range; +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp new file mode 100644 index 0000000..dd6d19f --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.cpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/thrift-config.h> + +#include <thrift/Thrift.h> +#include <thrift/concurrency/Util.h> + +#if defined(HAVE_SYS_TIME_H) +#include <sys/time.h> +#endif + +namespace apache { +namespace thrift { +namespace concurrency { + +int64_t Util::currentTimeTicks(int64_t ticksPerSec) { + int64_t result; + struct timeval now; + int ret = THRIFT_GETTIMEOFDAY(&now, NULL); + assert(ret == 0); + THRIFT_UNUSED_VARIABLE(ret); // squelching "unused variable" warning + toTicks(result, now, ticksPerSec); + return result; +} +} +} +} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h new file mode 100644 index 0000000..ba070b6 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Util.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CONCURRENCY_UTIL_H_ +#define _THRIFT_CONCURRENCY_UTIL_H_ 1 + +#include <assert.h> +#include <stddef.h> +#include <stdint.h> +#include <time.h> + +#ifdef HAVE_SYS_TIME_H +#include <sys/time.h> +#endif + +#include <thrift/transport/PlatformSocket.h> + +namespace apache { +namespace thrift { +namespace concurrency { + +/** + * Utility methods + * + * This class contains basic utility methods for converting time formats, + * and other common platform-dependent concurrency operations. + * It should not be included in API headers for other concurrency library + * headers, since it will, by definition, pull in all sorts of horrid + * platform dependent stuff. Rather it should be inluded directly in + * concurrency library implementation source. + * + * @version $Id:$ + */ +class Util { + + static const int64_t NS_PER_S = 1000000000LL; + static const int64_t US_PER_S = 1000000LL; + static const int64_t MS_PER_S = 1000LL; + + static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S; + static const int64_t NS_PER_US = NS_PER_S / US_PER_S; + static const int64_t US_PER_MS = US_PER_S / MS_PER_S; + +public: + /** + * Converts millisecond timestamp into a THRIFT_TIMESPEC struct + * + * @param struct THRIFT_TIMESPEC& result + * @param time or duration in milliseconds + */ + static void toTimespec(struct THRIFT_TIMESPEC& result, int64_t value) { + result.tv_sec = value / MS_PER_S; // ms to s + result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns + } + + static void toTimeval(struct timeval& result, int64_t value) { + result.tv_sec = static_cast<uint32_t>(value / MS_PER_S); // ms to s + result.tv_usec = static_cast<uint32_t>((value % MS_PER_S) * US_PER_MS); // ms to us + } + + static void toTicks(int64_t& result, + int64_t secs, + int64_t oldTicks, + int64_t oldTicksPerSec, + int64_t newTicksPerSec) { + result = secs * newTicksPerSec; + result += oldTicks * newTicksPerSec / oldTicksPerSec; + + int64_t oldPerNew = oldTicksPerSec / newTicksPerSec; + if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) { + ++result; + } + } + /** + * Converts struct THRIFT_TIMESPEC to arbitrary-sized ticks since epoch + */ + static void toTicks(int64_t& result, const struct THRIFT_TIMESPEC& value, int64_t ticksPerSec) { + return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec); + } + + /** + * Converts struct timeval to arbitrary-sized ticks since epoch + */ + static void toTicks(int64_t& result, const struct timeval& value, int64_t ticksPerSec) { + return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec); + } + + /** + * Converts struct THRIFT_TIMESPEC to milliseconds + */ + static void toMilliseconds(int64_t& result, const struct THRIFT_TIMESPEC& value) { + return toTicks(result, value, MS_PER_S); + } + + /** + * Converts struct timeval to milliseconds + */ + static void toMilliseconds(int64_t& result, const struct timeval& value) { + return toTicks(result, value, MS_PER_S); + } + + /** + * Converts struct THRIFT_TIMESPEC to microseconds + */ + static void toUsec(int64_t& result, const struct THRIFT_TIMESPEC& value) { + return toTicks(result, value, US_PER_S); + } + + /** + * Converts struct timeval to microseconds + */ + static void toUsec(int64_t& result, const struct timeval& value) { + return toTicks(result, value, US_PER_S); + } + + /** + * Get current time as a number of arbitrary-size ticks from epoch + */ + static int64_t currentTimeTicks(int64_t ticksPerSec); + + /** + * Get current time as milliseconds from epoch + */ + static int64_t currentTime() { return currentTimeTicks(MS_PER_S); } + + /** + * Get current time as micros from epoch + */ + static int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); } +}; +} +} +} // apache::thrift::concurrency + +#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h new file mode 100644 index 0000000..dadaac3 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/cxxfunctional.h @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_CXXFUNCTIONAL_H_ +#define _THRIFT_CXXFUNCTIONAL_H_ 1 + +// clang-format off + +/** + * Loads <functional> from the 'right' location, depending + * on compiler and whether or not it's using C++03 with TR1 + * or C++11. + */ + +/* + * MSVC 10 and 11 have the <functional> stuff at <functional>. + * In MSVC 10 all of the implementations live in std::tr1. + * In MSVC 11 all of the implementations live in std, with aliases + * in std::tr1 to point to the ones in std. + */ +#ifdef _WIN32 + #define _THRIFT_USING_MICROSOFT_STDLIB 1 +#endif + +#ifdef __clang__ + /* Clang has two options, depending on standard library: + * - no -stdlib or -stdlib=libstdc++ set; uses GNU libstdc++. + * <tr1/functional> + * - -stdlib=libc++; uses LLVM libc++. + * <functional>, no 'std::tr1'. + * + * The compiler itself doesn't define anything differently + * depending on the value of -stdlib, but the library headers + * will set different preprocessor options. In order to check, + * though, we have to pull in some library header. + */ + #include <utility> + + /* With LLVM libc++, utility pulls in __config, which sets + _LIBCPP_VERSION. */ + #if defined(_LIBCPP_VERSION) + #define _THRIFT_USING_CLANG_LIBCXX 1 + + /* With GNU libstdc++, utility pulls in bits/c++config.h, + which sets __GLIBCXX__. */ + #elif defined(__GLIBCXX__) + #define _THRIFT_USING_GNU_LIBSTDCXX 1 + + /* No idea. */ + #else + #error Unable to detect which C++ standard library is in use. + #endif +#elif __GNUC__ + #define _THRIFT_USING_GNU_LIBSTDCXX 1 +#endif + +#if _THRIFT_USING_MICROSOFT_STDLIB + #include <functional> + + namespace apache { namespace thrift { namespace stdcxx { + using ::std::tr1::function; + using ::std::tr1::bind; + + namespace placeholders { + using ::std::tr1::placeholders::_1; + using ::std::tr1::placeholders::_2; + using ::std::tr1::placeholders::_3; + using ::std::tr1::placeholders::_4; + using ::std::tr1::placeholders::_5; + using ::std::tr1::placeholders::_6; + } // apache::thrift::stdcxx::placeholders + }}} // apache::thrift::stdcxx + +#elif _THRIFT_USING_CLANG_LIBCXX + #include <functional> + + namespace apache { namespace thrift { namespace stdcxx { + using ::std::function; + using ::std::bind; + + namespace placeholders { + using ::std::placeholders::_1; + using ::std::placeholders::_2; + using ::std::placeholders::_3; + using ::std::placeholders::_4; + using ::std::placeholders::_5; + using ::std::placeholders::_6; + } // apache::thrift::stdcxx::placeholders + }}} // apache::thrift::stdcxx + +#elif _THRIFT_USING_GNU_LIBSTDCXX + #include <tr1/functional> + + namespace apache { namespace thrift { namespace stdcxx { + using ::std::tr1::function; + using ::std::tr1::bind; + + namespace placeholders { + using ::std::tr1::placeholders::_1; + using ::std::tr1::placeholders::_2; + using ::std::tr1::placeholders::_3; + using ::std::tr1::placeholders::_4; + using ::std::tr1::placeholders::_5; + using ::std::tr1::placeholders::_6; + } // apache::thrift::stdcxx::placeholders + }}} // apache::thrift::stdcxx +#endif + + // Alias for thrift c++ compatibility namespace + namespace tcxx = apache::thrift::stdcxx; + +#endif // #ifndef _THRIFT_CXXFUNCTIONAL_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp new file mode 100644 index 0000000..8c9a463 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.cpp @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/processor/PeekProcessor.h> + +using namespace apache::thrift::transport; +using namespace apache::thrift::protocol; +using namespace apache::thrift; + +namespace apache { +namespace thrift { +namespace processor { + +PeekProcessor::PeekProcessor() { + memoryBuffer_.reset(new TMemoryBuffer()); + targetTransport_ = memoryBuffer_; +} +PeekProcessor::~PeekProcessor() { +} + +void PeekProcessor::initialize(boost::shared_ptr<TProcessor> actualProcessor, + boost::shared_ptr<TProtocolFactory> protocolFactory, + boost::shared_ptr<TPipedTransportFactory> transportFactory) { + actualProcessor_ = actualProcessor; + pipedProtocol_ = protocolFactory->getProtocol(targetTransport_); + transportFactory_ = transportFactory; + transportFactory_->initializeTargetTransport(targetTransport_); +} + +boost::shared_ptr<TTransport> PeekProcessor::getPipedTransport(boost::shared_ptr<TTransport> in) { + return transportFactory_->getTransport(in); +} + +void PeekProcessor::setTargetTransport(boost::shared_ptr<TTransport> targetTransport) { + targetTransport_ = targetTransport; + if (boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport_)) { + memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>(targetTransport); + } else if (boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)) { + memoryBuffer_ = boost::dynamic_pointer_cast<TMemoryBuffer>( + boost::dynamic_pointer_cast<TPipedTransport>(targetTransport_)->getTargetTransport()); + } + + if (!memoryBuffer_) { + throw TException( + "Target transport must be a TMemoryBuffer or a TPipedTransport with TMemoryBuffer"); + } +} + +bool PeekProcessor::process(boost::shared_ptr<TProtocol> in, + boost::shared_ptr<TProtocol> out, + void* connectionContext) { + + std::string fname; + TMessageType mtype; + int32_t seqid; + in->readMessageBegin(fname, mtype, seqid); + + if (mtype != T_CALL && mtype != T_ONEWAY) { + throw TException("Unexpected message type"); + } + + // Peek at the name + peekName(fname); + + TType ftype; + int16_t fid; + while (true) { + in->readFieldBegin(fname, ftype, fid); + if (ftype == T_STOP) { + break; + } + + // Peek at the variable + peek(in, ftype, fid); + in->readFieldEnd(); + } + in->readMessageEnd(); + in->getTransport()->readEnd(); + + // + // All the data is now in memoryBuffer_ and ready to be processed + // + + // Let's first take a peek at the full data in memory + uint8_t* buffer; + uint32_t size; + memoryBuffer_->getBuffer(&buffer, &size); + peekBuffer(buffer, size); + + // Done peeking at variables + peekEnd(); + + bool ret = actualProcessor_->process(pipedProtocol_, out, connectionContext); + memoryBuffer_->resetBuffer(); + return ret; +} + +void PeekProcessor::peekName(const std::string& fname) { + (void)fname; +} + +void PeekProcessor::peekBuffer(uint8_t* buffer, uint32_t size) { + (void)buffer; + (void)size; +} + +void PeekProcessor::peek(boost::shared_ptr<TProtocol> in, TType ftype, int16_t fid) { + (void)fid; + in->skip(ftype); +} + +void PeekProcessor::peekEnd() { +} +} +} +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h new file mode 100644 index 0000000..21c5999 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/PeekProcessor.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PEEKPROCESSOR_H +#define PEEKPROCESSOR_H + +#include <string> +#include <thrift/TProcessor.h> +#include <thrift/transport/TTransport.h> +#include <thrift/transport/TTransportUtils.h> +#include <thrift/transport/TBufferTransports.h> +#include <boost/shared_ptr.hpp> + +namespace apache { +namespace thrift { +namespace processor { + +/* + * Class for peeking at the raw data that is being processed by another processor + * and gives the derived class a chance to change behavior accordingly + * + */ +class PeekProcessor : public apache::thrift::TProcessor { + +public: + PeekProcessor(); + virtual ~PeekProcessor(); + + // Input here: actualProcessor - the underlying processor + // protocolFactory - the protocol factory used to wrap the memory buffer + // transportFactory - this TPipedTransportFactory is used to wrap the source transport + // via a call to getPipedTransport + void initialize( + boost::shared_ptr<apache::thrift::TProcessor> actualProcessor, + boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> protocolFactory, + boost::shared_ptr<apache::thrift::transport::TPipedTransportFactory> transportFactory); + + boost::shared_ptr<apache::thrift::transport::TTransport> getPipedTransport( + boost::shared_ptr<apache::thrift::transport::TTransport> in); + + void setTargetTransport(boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport); + + virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> in, + boost::shared_ptr<apache::thrift::protocol::TProtocol> out, + void* connectionContext); + + // The following three functions can be overloaded by child classes to + // achieve desired peeking behavior + virtual void peekName(const std::string& fname); + virtual void peekBuffer(uint8_t* buffer, uint32_t size); + virtual void peek(boost::shared_ptr<apache::thrift::protocol::TProtocol> in, + apache::thrift::protocol::TType ftype, + int16_t fid); + virtual void peekEnd(); + +private: + boost::shared_ptr<apache::thrift::TProcessor> actualProcessor_; + boost::shared_ptr<apache::thrift::protocol::TProtocol> pipedProtocol_; + boost::shared_ptr<apache::thrift::transport::TPipedTransportFactory> transportFactory_; + boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> memoryBuffer_; + boost::shared_ptr<apache::thrift::transport::TTransport> targetTransport_; +}; +} +} +} // apache::thrift::processor + +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h new file mode 100644 index 0000000..e8ca067 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/StatsProcessor.h @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef STATSPROCESSOR_H +#define STATSPROCESSOR_H + +#include <boost/shared_ptr.hpp> +#include <thrift/transport/TTransport.h> +#include <thrift/protocol/TProtocol.h> +#include <TProcessor.h> + +namespace apache { +namespace thrift { +namespace processor { + +/* + * Class for keeping track of function call statistics and printing them if desired + * + */ +class StatsProcessor : public apache::thrift::TProcessor { +public: + StatsProcessor(bool print, bool frequency) : print_(print), frequency_(frequency) {} + virtual ~StatsProcessor(){}; + + virtual bool process(boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot, + boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot, + void* serverContext) { + + piprot_ = piprot; + + std::string fname; + apache::thrift::protocol::TMessageType mtype; + int32_t seqid; + + piprot_->readMessageBegin(fname, mtype, seqid); + if (mtype != apache::thrift::protocol::T_CALL && mtype != apache::thrift::protocol::T_ONEWAY) { + if (print_) { + printf("Unknown message type\n"); + } + throw apache::thrift::TException("Unexpected message type"); + } + if (print_) { + printf("%s (", fname.c_str()); + } + if (frequency_) { + if (frequency_map_.find(fname) != frequency_map_.end()) { + frequency_map_[fname]++; + } else { + frequency_map_[fname] = 1; + } + } + + apache::thrift::protocol::TType ftype; + int16_t fid; + + while (true) { + piprot_->readFieldBegin(fname, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + + printAndPassToBuffer(ftype); + if (print_) { + printf(", "); + } + } + + if (print_) { + printf("\b\b)\n"); + } + return true; + } + + const std::map<std::string, int64_t>& get_frequency_map() { return frequency_map_; } + +protected: + void printAndPassToBuffer(apache::thrift::protocol::TType ftype) { + switch (ftype) { + case apache::thrift::protocol::T_BOOL: { + bool boolv; + piprot_->readBool(boolv); + if (print_) { + printf("%d", boolv); + } + } break; + case apache::thrift::protocol::T_BYTE: { + int8_t bytev; + piprot_->readByte(bytev); + if (print_) { + printf("%d", bytev); + } + } break; + case apache::thrift::protocol::T_I16: { + int16_t i16; + piprot_->readI16(i16); + if (print_) { + printf("%d", i16); + } + } break; + case apache::thrift::protocol::T_I32: { + int32_t i32; + piprot_->readI32(i32); + if (print_) { + printf("%d", i32); + } + } break; + case apache::thrift::protocol::T_I64: { + int64_t i64; + piprot_->readI64(i64); + if (print_) { + printf("%ld", i64); + } + } break; + case apache::thrift::protocol::T_DOUBLE: { + double dub; + piprot_->readDouble(dub); + if (print_) { + printf("%f", dub); + } + } break; + case apache::thrift::protocol::T_STRING: { + std::string str; + piprot_->readString(str); + if (print_) { + printf("%s", str.c_str()); + } + } break; + case apache::thrift::protocol::T_STRUCT: { + std::string name; + int16_t fid; + apache::thrift::protocol::TType ftype; + piprot_->readStructBegin(name); + if (print_) { + printf("<"); + } + while (true) { + piprot_->readFieldBegin(name, ftype, fid); + if (ftype == apache::thrift::protocol::T_STOP) { + break; + } + printAndPassToBuffer(ftype); + if (print_) { + printf(","); + } + piprot_->readFieldEnd(); + } + piprot_->readStructEnd(); + if (print_) { + printf("\b>"); + } + } break; + case apache::thrift::protocol::T_MAP: { + apache::thrift::protocol::TType keyType; + apache::thrift::protocol::TType valType; + uint32_t i, size; + piprot_->readMapBegin(keyType, valType, size); + if (print_) { + printf("{"); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(keyType); + if (print_) { + printf("=>"); + } + printAndPassToBuffer(valType); + if (print_) { + printf(","); + } + } + piprot_->readMapEnd(); + if (print_) { + printf("\b}"); + } + } break; + case apache::thrift::protocol::T_SET: { + apache::thrift::protocol::TType elemType; + uint32_t i, size; + piprot_->readSetBegin(elemType, size); + if (print_) { + printf("{"); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(elemType); + if (print_) { + printf(","); + } + } + piprot_->readSetEnd(); + if (print_) { + printf("\b}"); + } + } break; + case apache::thrift::protocol::T_LIST: { + apache::thrift::protocol::TType elemType; + uint32_t i, size; + piprot_->readListBegin(elemType, size); + if (print_) { + printf("["); + } + for (i = 0; i < size; i++) { + printAndPassToBuffer(elemType); + if (print_) { + printf(","); + } + } + piprot_->readListEnd(); + if (print_) { + printf("\b]"); + } + } break; + default: + break; + } + } + + boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot_; + std::map<std::string, int64_t> frequency_map_; + + bool print_; + bool frequency_; +}; +} +} +} // apache::thrift::processor + +#endif http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h new file mode 100644 index 0000000..0ef7261 --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/processor/TMultiplexedProcessor.h @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_TMULTIPLEXEDPROCESSOR_H_ +#define THRIFT_TMULTIPLEXEDPROCESSOR_H_ 1 + +#include <thrift/protocol/TProtocolDecorator.h> +#include <thrift/TApplicationException.h> +#include <thrift/TProcessor.h> +#include <boost/tokenizer.hpp> + +namespace apache { +namespace thrift { +using boost::shared_ptr; + +namespace protocol { + +/** + * To be able to work with any protocol, we needed + * to allow them to call readMessageBegin() and get a TMessage in exactly + * the standard format, without the service name prepended to TMessage.name. + */ +class StoredMessageProtocol : public TProtocolDecorator { +public: + StoredMessageProtocol(shared_ptr<protocol::TProtocol> _protocol, + const std::string& _name, + const TMessageType _type, + const int32_t _seqid) + : TProtocolDecorator(_protocol), name(_name), type(_type), seqid(_seqid) {} + + uint32_t readMessageBegin_virt(std::string& _name, TMessageType& _type, int32_t& _seqid) { + + _name = name; + _type = type; + _seqid = seqid; + + return 0; // (Normal TProtocol read functions return number of bytes read) + } + + std::string name; + TMessageType type; + int32_t seqid; +}; +} // namespace protocol + +/** + * <code>TMultiplexedProcessor</code> is a <code>TProcessor</code> allowing + * a single <code>TServer</code> to provide multiple services. + * + * <p>To do so, you instantiate the processor and then register additional + * processors with it, as shown in the following example:</p> + * + * <blockquote><code> + * shared_ptr<TMultiplexedProcessor> processor(new TMultiplexedProcessor()); + * + * processor->registerProcessor( + * "Calculator", + * shared_ptr<TProcessor>( new CalculatorProcessor( + * shared_ptr<CalculatorHandler>( new CalculatorHandler())))); + * + * processor->registerProcessor( + * "WeatherReport", + * shared_ptr<TProcessor>( new WeatherReportProcessor( + * shared_ptr<WeatherReportHandler>( new WeatherReportHandler())))); + * + * shared_ptr<TServerTransport> transport(new TServerSocket(9090)); + * TSimpleServer server(processor, transport); + * + * server.serve(); + * </code></blockquote> + */ +class TMultiplexedProcessor : public TProcessor { +public: + typedef std::map<std::string, shared_ptr<TProcessor> > services_t; + + /** + * 'Register' a service with this <code>TMultiplexedProcessor</code>. This + * allows us to broker requests to individual services by using the service + * name to select them at request time. + * + * \param [in] serviceName Name of a service, has to be identical to the name + * declared in the Thrift IDL, e.g. "WeatherReport". + * \param [in] processor Implementation of a service, usually referred to + * as "handlers", e.g. WeatherReportHandler, + * implementing WeatherReportIf interface. + */ + void registerProcessor(const std::string& serviceName, shared_ptr<TProcessor> processor) { + services[serviceName] = processor; + } + + /** + * This implementation of <code>process</code> performs the following steps: + * + * <ol> + * <li>Read the beginning of the message.</li> + * <li>Extract the service name from the message.</li> + * <li>Using the service name to locate the appropriate processor.</li> + * <li>Dispatch to the processor, with a decorated instance of TProtocol + * that allows readMessageBegin() to return the original TMessage.</li> + * </ol> + * + * \throws TException If the message type is not T_CALL or T_ONEWAY, if + * the service name was not found in the message, or if the service + * name was not found in the service map. + */ + bool process(shared_ptr<protocol::TProtocol> in, + shared_ptr<protocol::TProtocol> out, + void* connectionContext) { + std::string name; + protocol::TMessageType type; + int32_t seqid; + + // Use the actual underlying protocol (e.g. TBinaryProtocol) to read the + // message header. This pulls the message "off the wire", which we'll + // deal with at the end of this method. + in->readMessageBegin(name, type, seqid); + + if (type != protocol::T_CALL && type != protocol::T_ONEWAY) { + // Unexpected message type. + in->skip(::apache::thrift::protocol::T_STRUCT); + in->readMessageEnd(); + in->getTransport()->readEnd(); + const std::string msg("TMultiplexedProcessor: Unexpected message type"); + ::apache::thrift::TApplicationException + x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg); + out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(out.get()); + out->writeMessageEnd(); + out->getTransport()->writeEnd(); + out->getTransport()->flush(); + throw TException(msg); + } + + // Extract the service name + + boost::tokenizer<boost::char_separator<char> > tok(name, boost::char_separator<char>(":")); + + std::vector<std::string> tokens; + std::copy(tok.begin(), tok.end(), std::back_inserter(tokens)); + + // A valid message should consist of two tokens: the service + // name and the name of the method to call. + if (tokens.size() == 2) { + // Search for a processor associated with this service name. + services_t::iterator it = services.find(tokens[0]); + + if (it != services.end()) { + shared_ptr<TProcessor> processor = it->second; + // Let the processor registered for this service name + // process the message. + return processor + ->process(shared_ptr<protocol::TProtocol>( + new protocol::StoredMessageProtocol(in, tokens[1], type, seqid)), + out, + connectionContext); + } else { + // Unknown service. + in->skip(::apache::thrift::protocol::T_STRUCT); + in->readMessageEnd(); + in->getTransport()->readEnd(); + + std::string msg("TMultiplexedProcessor: Unknown service: "); + msg += tokens[0]; + ::apache::thrift::TApplicationException + x(::apache::thrift::TApplicationException::PROTOCOL_ERROR, msg); + out->writeMessageBegin(name, ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(out.get()); + out->writeMessageEnd(); + out->getTransport()->writeEnd(); + out->getTransport()->flush(); + msg += ". Did you forget to call registerProcessor()?"; + throw TException(msg); + } + } + return false; + } + +private: + /** Map of service processor objects, indexed by service names. */ + services_t services; +}; +} +} + +#endif // THRIFT_TMULTIPLEXEDPROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/d709f67d/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp new file mode 100644 index 0000000..beb76eb --- /dev/null +++ b/depends/thirdparty/thrift/lib/cpp/src/thrift/protocol/TBase64Utils.cpp @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/protocol/TBase64Utils.h> + +#include <boost/static_assert.hpp> + +using std::string; + +namespace apache { +namespace thrift { +namespace protocol { + +static const uint8_t* kBase64EncodeTable + = (const uint8_t*)"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +void base64_encode(const uint8_t* in, uint32_t len, uint8_t* buf) { + buf[0] = kBase64EncodeTable[(in[0] >> 2) & 0x3f]; + if (len == 3) { + buf[1] = kBase64EncodeTable[((in[0] << 4) & 0x30) | ((in[1] >> 4) & 0x0f)]; + buf[2] = kBase64EncodeTable[((in[1] << 2) & 0x3c) | ((in[2] >> 6) & 0x03)]; + buf[3] = kBase64EncodeTable[in[2] & 0x3f]; + } else if (len == 2) { + buf[1] = kBase64EncodeTable[((in[0] << 4) & 0x30) | ((in[1] >> 4) & 0x0f)]; + buf[2] = kBase64EncodeTable[(in[1] << 2) & 0x3c]; + } else { // len == 1 + buf[1] = kBase64EncodeTable[(in[0] << 4) & 0x30]; + } +} + +static const uint8_t kBase64DecodeTable[256] = { + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0x3e, + 0xff, + 0xff, + 0xff, + 0x3f, + 0x34, + 0x35, + 0x36, + 0x37, + 0x38, + 0x39, + 0x3a, + 0x3b, + 0x3c, + 0x3d, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0x00, + 0x01, + 0x02, + 0x03, + 0x04, + 0x05, + 0x06, + 0x07, + 0x08, + 0x09, + 0x0a, + 0x0b, + 0x0c, + 0x0d, + 0x0e, + 0x0f, + 0x10, + 0x11, + 0x12, + 0x13, + 0x14, + 0x15, + 0x16, + 0x17, + 0x18, + 0x19, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0x1a, + 0x1b, + 0x1c, + 0x1d, + 0x1e, + 0x1f, + 0x20, + 0x21, + 0x22, + 0x23, + 0x24, + 0x25, + 0x26, + 0x27, + 0x28, + 0x29, + 0x2a, + 0x2b, + 0x2c, + 0x2d, + 0x2e, + 0x2f, + 0x30, + 0x31, + 0x32, + 0x33, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, + 0xff, +}; + +void base64_decode(uint8_t* buf, uint32_t len) { + buf[0] = (kBase64DecodeTable[buf[0]] << 2) | (kBase64DecodeTable[buf[1]] >> 4); + if (len > 2) { + buf[1] = ((kBase64DecodeTable[buf[1]] << 4) & 0xf0) | (kBase64DecodeTable[buf[2]] >> 2); + if (len > 3) { + buf[2] = ((kBase64DecodeTable[buf[2]] << 6) & 0xc0) | (kBase64DecodeTable[buf[3]]); + } + } +} +} +} +} // apache::thrift::protocol
