http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h deleted file mode 100644 index e79c57d..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncDispatchProcessor.h +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_ -#define _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_ 1 - -#include <thrift/async/TAsyncProcessor.h> - -namespace apache { -namespace thrift { -namespace async { - -/** - * TAsyncDispatchProcessor is a helper class to parse the message header then - * call another function to dispatch based on the function name. - * - * Subclasses must implement dispatchCall() to dispatch on the function name. - */ -template <class Protocol_> -class TAsyncDispatchProcessorT : public TAsyncProcessor { -public: - virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, - boost::shared_ptr<protocol::TProtocol> in, - boost::shared_ptr<protocol::TProtocol> out) { - protocol::TProtocol* inRaw = in.get(); - protocol::TProtocol* outRaw = out.get(); - - // Try to dynamic cast to the template protocol type - Protocol_* specificIn = dynamic_cast<Protocol_*>(inRaw); - Protocol_* specificOut = dynamic_cast<Protocol_*>(outRaw); - if (specificIn && specificOut) { - return processFast(_return, specificIn, specificOut); - } - - // Log the fact that we have to use the slow path - T_GENERIC_PROTOCOL(this, inRaw, specificIn); - T_GENERIC_PROTOCOL(this, outRaw, specificOut); - - std::string fname; - protocol::TMessageType mtype; - int32_t seqid; - inRaw->readMessageBegin(fname, mtype, seqid); - - // If this doesn't look like a valid call, log an error and return false so - // that the server will close the connection. - // - // (The old generated processor code used to try to skip a T_STRUCT and - // continue. However, that seems unsafe.) - if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { - GlobalOutput.printf("received invalid message type %d from client", mtype); - _return(false); - return; - } - - return this->dispatchCall(_return, inRaw, outRaw, fname, seqid); - } - - void processFast(apache::thrift::stdcxx::function<void(bool success)> _return, - Protocol_* in, - Protocol_* out) { - std::string fname; - protocol::TMessageType mtype; - int32_t seqid; - in->readMessageBegin(fname, mtype, seqid); - - if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { - GlobalOutput.printf("received invalid message type %d from client", mtype); - _return(false); - return; - } - - return this->dispatchCallTemplated(_return, in, out, fname, seqid); - } - - virtual void dispatchCall(apache::thrift::stdcxx::function<void(bool ok)> _return, - apache::thrift::protocol::TProtocol* in, - apache::thrift::protocol::TProtocol* out, - const std::string& fname, - int32_t seqid) = 0; - - virtual void dispatchCallTemplated(apache::thrift::stdcxx::function<void(bool ok)> _return, - Protocol_* in, - Protocol_* out, - const std::string& fname, - int32_t seqid) = 0; -}; - -/** - * Non-templatized version of TAsyncDispatchProcessor, - * that doesn't bother trying to perform a dynamic_cast. - */ -class TAsyncDispatchProcessor : public TAsyncProcessor { -public: - virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, - boost::shared_ptr<protocol::TProtocol> in, - boost::shared_ptr<protocol::TProtocol> out) { - protocol::TProtocol* inRaw = in.get(); - protocol::TProtocol* outRaw = out.get(); - - std::string fname; - protocol::TMessageType mtype; - int32_t seqid; - inRaw->readMessageBegin(fname, mtype, seqid); - - // If this doesn't look like a valid call, log an error and return false so - // that the server will close the connection. - // - // (The old generated processor code used to try to skip a T_STRUCT and - // continue. However, that seems unsafe.) - if (mtype != protocol::T_CALL && mtype != protocol::T_ONEWAY) { - GlobalOutput.printf("received invalid message type %d from client", mtype); - _return(false); - return; - } - - return dispatchCall(_return, inRaw, outRaw, fname, seqid); - } - - virtual void dispatchCall(apache::thrift::stdcxx::function<void(bool ok)> _return, - apache::thrift::protocol::TProtocol* in, - apache::thrift::protocol::TProtocol* out, - const std::string& fname, - int32_t seqid) = 0; -}; - -// Specialize TAsyncDispatchProcessorT for TProtocol and TDummyProtocol just to -// use the generic TDispatchProcessor. -template <> -class TAsyncDispatchProcessorT<protocol::TDummyProtocol> : public TAsyncDispatchProcessor {}; -template <> -class TAsyncDispatchProcessorT<protocol::TProtocol> : public TAsyncDispatchProcessor {}; -} -} -} // apache::thrift::async - -#endif // _THRIFT_ASYNC_TASYNCDISPATCHPROCESSOR_H_
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h deleted file mode 100644 index 033f7d9..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProcessor.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TASYNCPROCESSOR_H_ -#define _THRIFT_TASYNCPROCESSOR_H_ 1 - -#include <thrift/cxxfunctional.h> -#include <boost/shared_ptr.hpp> -#include <thrift/protocol/TProtocol.h> -#include <thrift/TProcessor.h> - -namespace apache { -namespace thrift { -namespace async { - -/** - * Async version of a TProcessor. It is not expected to complete by the time - * the call to process returns. Instead, it calls a cob to signal completion. - */ - -class TEventServer; // forward declaration - -class TAsyncProcessor { -public: - virtual ~TAsyncProcessor() {} - - virtual void process(apache::thrift::stdcxx::function<void(bool success)> _return, - boost::shared_ptr<protocol::TProtocol> in, - boost::shared_ptr<protocol::TProtocol> out) = 0; - - void process(apache::thrift::stdcxx::function<void(bool success)> _return, - boost::shared_ptr<apache::thrift::protocol::TProtocol> io) { - return process(_return, io, io); - } - - boost::shared_ptr<TProcessorEventHandler> getEventHandler() { return eventHandler_; } - - void setEventHandler(boost::shared_ptr<TProcessorEventHandler> eventHandler) { - eventHandler_ = eventHandler; - } - - const TEventServer* getAsyncServer() { return asyncServer_; } - -protected: - TAsyncProcessor() {} - - boost::shared_ptr<TProcessorEventHandler> eventHandler_; - const TEventServer* asyncServer_; - -private: - friend class TEventServer; - void setAsyncServer(const TEventServer* server) { asyncServer_ = server; } -}; - -class TAsyncProcessorFactory { -public: - virtual ~TAsyncProcessorFactory() {} - - /** - * Get the TAsyncProcessor to use for a particular connection. - * - * This method is always invoked in the same thread that the connection was - * accepted on. This generally means that this call does not need to be - * thread safe, as it will always be invoked from a single thread. - */ - virtual boost::shared_ptr<TAsyncProcessor> getProcessor(const TConnectionInfo& connInfo) = 0; -}; -} -} -} // apache::thrift::async - -// XXX I'm lazy for now -namespace apache { -namespace thrift { -using apache::thrift::async::TAsyncProcessor; -} -} - -#endif // #ifndef _THRIFT_TASYNCPROCESSOR_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp deleted file mode 100644 index 5a4f347..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.cpp +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/async/TAsyncProtocolProcessor.h> - -using apache::thrift::transport::TBufferBase; -using apache::thrift::protocol::TProtocol; - -namespace apache { -namespace thrift { -namespace async { - -void TAsyncProtocolProcessor::process(apache::thrift::stdcxx::function<void(bool healthy)> _return, - boost::shared_ptr<TBufferBase> ibuf, - boost::shared_ptr<TBufferBase> obuf) { - boost::shared_ptr<TProtocol> iprot(pfact_->getProtocol(ibuf)); - boost::shared_ptr<TProtocol> oprot(pfact_->getProtocol(obuf)); - return underlying_ - ->process(apache::thrift::stdcxx::bind(&TAsyncProtocolProcessor::finish, - _return, - oprot, - apache::thrift::stdcxx::placeholders::_1), - iprot, - oprot); -} - -/* static */ void TAsyncProtocolProcessor::finish( - apache::thrift::stdcxx::function<void(bool healthy)> _return, - boost::shared_ptr<TProtocol> oprot, - bool healthy) { - (void)oprot; - // This is a stub function to hold a reference to oprot. - return _return(healthy); -} -} -} -} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h deleted file mode 100644 index 3f2b394..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TAsyncProtocolProcessor.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TNAME_ME_H_ -#define _THRIFT_TNAME_ME_H_ 1 - -#include <thrift/async/TAsyncProcessor.h> -#include <thrift/async/TAsyncBufferProcessor.h> -#include <thrift/protocol/TProtocol.h> - -namespace apache { -namespace thrift { -namespace async { - -class TAsyncProtocolProcessor : public TAsyncBufferProcessor { -public: - TAsyncProtocolProcessor(boost::shared_ptr<TAsyncProcessor> underlying, - boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact) - : underlying_(underlying), pfact_(pfact) {} - - virtual void process(apache::thrift::stdcxx::function<void(bool healthy)> _return, - boost::shared_ptr<apache::thrift::transport::TBufferBase> ibuf, - boost::shared_ptr<apache::thrift::transport::TBufferBase> obuf); - - virtual ~TAsyncProtocolProcessor() {} - -private: - static void finish(apache::thrift::stdcxx::function<void(bool healthy)> _return, - boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot, - bool healthy); - - boost::shared_ptr<TAsyncProcessor> underlying_; - boost::shared_ptr<apache::thrift::protocol::TProtocolFactory> pfact_; -}; -} -} -} // apache::thrift::async - -#endif // #ifndef _THRIFT_TNAME_ME_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp deleted file mode 100644 index c7e27c0..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.cpp +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/async/TConcurrentClientSyncInfo.h> -#include <thrift/TApplicationException.h> -#include <thrift/transport/TTransportException.h> -#include <limits> - -namespace apache { namespace thrift { namespace async { - -using namespace ::apache::thrift::concurrency; - -TConcurrentClientSyncInfo::TConcurrentClientSyncInfo() : - stop_(false), - seqidMutex_(), - // test rollover all the time - nextseqid_((std::numeric_limits<int32_t>::max)()-10), - seqidToMonitorMap_(), - freeMonitors_(), - writeMutex_(), - readMutex_(), - recvPending_(false), - wakeupSomeone_(false), - seqidPending_(0), - fnamePending_(), - mtypePending_(::apache::thrift::protocol::T_CALL) -{ - freeMonitors_.reserve(MONITOR_CACHE_SIZE); -} - -bool TConcurrentClientSyncInfo::getPending( - std::string &fname, - ::apache::thrift::protocol::TMessageType &mtype, - int32_t &rseqid) -{ - if(stop_) - throwDeadConnection_(); - wakeupSomeone_ = false; - if(recvPending_) - { - recvPending_ = false; - rseqid = seqidPending_; - fname = fnamePending_; - mtype = mtypePending_; - return true; - } - return false; -} - -void TConcurrentClientSyncInfo::updatePending( - const std::string &fname, - ::apache::thrift::protocol::TMessageType mtype, - int32_t rseqid) -{ - recvPending_ = true; - seqidPending_ = rseqid; - fnamePending_ = fname; - mtypePending_ = mtype; - MonitorPtr monitor; - { - Guard seqidGuard(seqidMutex_); - MonitorMap::iterator i = seqidToMonitorMap_.find(rseqid); - if(i == seqidToMonitorMap_.end()) - throwBadSeqId_(); - monitor = i->second; - } - monitor->notify(); -} - -void TConcurrentClientSyncInfo::waitForWork(int32_t seqid) -{ - MonitorPtr m; - { - Guard seqidGuard(seqidMutex_); - m = seqidToMonitorMap_[seqid]; - } - while(true) - { - // be very careful about setting state in this loop that affects waking up. You may exit - // this function, attempt to grab some work, and someone else could have beaten you (or not - // left) the read mutex, and that will put you right back in this loop, with the mangled - // state you left behind. - if(stop_) - throwDeadConnection_(); - if(wakeupSomeone_) - return; - if(recvPending_ && seqidPending_ == seqid) - return; - m->waitForever(); - } -} - -void TConcurrentClientSyncInfo::throwBadSeqId_() -{ - throw apache::thrift::TApplicationException( - TApplicationException::BAD_SEQUENCE_ID, - "server sent a bad seqid"); -} - -void TConcurrentClientSyncInfo::throwDeadConnection_() -{ - throw apache::thrift::transport::TTransportException( - apache::thrift::transport::TTransportException::NOT_OPEN, - "this client died on another thread, and is now in an unusable state"); -} - -void TConcurrentClientSyncInfo::wakeupAnyone_(const Guard &) -{ - wakeupSomeone_ = true; - if(!seqidToMonitorMap_.empty()) - { - // The monitor map maps integers to monitors. Larger integers are more recent - // messages. Since this is ordered, it means that the last element is the most recent. - // We are trying to guess which thread will have its message complete next, so we are picking - // the most recent. The oldest message is likely to be some polling, long lived message. - // If we guess right, the thread we wake up will handle the message that comes in. - // If we guess wrong, the thread we wake up will hand off the work to the correct thread, - // costing us an extra context switch. - seqidToMonitorMap_.rbegin()->second->notify(); - } -} - -void TConcurrentClientSyncInfo::markBad_(const Guard &) -{ - wakeupSomeone_ = true; - stop_ = true; - for(MonitorMap::iterator i = seqidToMonitorMap_.begin(); i != seqidToMonitorMap_.end(); ++i) - i->second->notify(); -} - -TConcurrentClientSyncInfo::MonitorPtr -TConcurrentClientSyncInfo::newMonitor_(const Guard &) -{ - if(freeMonitors_.empty()) - return MonitorPtr(new Monitor(&readMutex_)); - MonitorPtr retval; - //swapping to avoid an atomic operation - retval.swap(freeMonitors_.back()); - freeMonitors_.pop_back(); - return retval; -} - -void TConcurrentClientSyncInfo::deleteMonitor_( - const Guard &, - TConcurrentClientSyncInfo::MonitorPtr &m) /*noexcept*/ -{ - if(freeMonitors_.size() > MONITOR_CACHE_SIZE) - { - m.reset(); - return; - } - //freeMonitors_ was reserved up to MONITOR_CACHE_SIZE in the ctor, - //so this shouldn't throw - freeMonitors_.push_back(TConcurrentClientSyncInfo::MonitorPtr()); - //swapping to avoid an atomic operation - m.swap(freeMonitors_.back()); -} - -int32_t TConcurrentClientSyncInfo::generateSeqId() -{ - Guard seqidGuard(seqidMutex_); - if(stop_) - throwDeadConnection_(); - - if(!seqidToMonitorMap_.empty()) - if(nextseqid_ == seqidToMonitorMap_.begin()->first) - throw apache::thrift::TApplicationException( - TApplicationException::BAD_SEQUENCE_ID, - "about to repeat a seqid"); - int32_t newSeqId = nextseqid_++; - seqidToMonitorMap_[newSeqId] = newMonitor_(seqidGuard); - return newSeqId; -} - -TConcurrentRecvSentry::TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid) : - sync_(*sync), - seqid_(seqid), - committed_(false) -{ - sync_.getReadMutex().lock(); -} - -TConcurrentRecvSentry::~TConcurrentRecvSentry() -{ - { - Guard seqidGuard(sync_.seqidMutex_); - sync_.deleteMonitor_(seqidGuard, sync_.seqidToMonitorMap_[seqid_]); - - sync_.seqidToMonitorMap_.erase(seqid_); - if(committed_) - sync_.wakeupAnyone_(seqidGuard); - else - sync_.markBad_(seqidGuard); - } - sync_.getReadMutex().unlock(); -} - -void TConcurrentRecvSentry::commit() -{ - committed_ = true; -} - -TConcurrentSendSentry::TConcurrentSendSentry(TConcurrentClientSyncInfo *sync) : - sync_(*sync), - committed_(false) -{ - sync_.getWriteMutex().lock(); -} - -TConcurrentSendSentry::~TConcurrentSendSentry() -{ - if(!committed_) - { - Guard seqidGuard(sync_.seqidMutex_); - sync_.markBad_(seqidGuard); - } - sync_.getWriteMutex().unlock(); -} - -void TConcurrentSendSentry::commit() -{ - committed_ = true; -} - - -}}} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h deleted file mode 100644 index 8997a23..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TConcurrentClientSyncInfo.h +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ -#define _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ 1 - -#include <thrift/protocol/TProtocol.h> -#include <thrift/concurrency/Mutex.h> -#include <thrift/concurrency/Monitor.h> -#include <boost/shared_ptr.hpp> -#include <vector> -#include <string> -#include <map> - -namespace apache { namespace thrift { namespace async { - -class TConcurrentClientSyncInfo; - -class TConcurrentSendSentry -{ -public: - explicit TConcurrentSendSentry(TConcurrentClientSyncInfo *sync); - ~TConcurrentSendSentry(); - - void commit(); -private: - TConcurrentClientSyncInfo &sync_; - bool committed_; -}; - -class TConcurrentRecvSentry -{ -public: - TConcurrentRecvSentry(TConcurrentClientSyncInfo *sync, int32_t seqid); - ~TConcurrentRecvSentry(); - - void commit(); -private: - TConcurrentClientSyncInfo &sync_; - int32_t seqid_; - bool committed_; -}; - -class TConcurrentClientSyncInfo -{ -private: //typedefs - typedef boost::shared_ptr< ::apache::thrift::concurrency::Monitor> MonitorPtr; - typedef std::map<int32_t, MonitorPtr> MonitorMap; -public: - TConcurrentClientSyncInfo(); - - int32_t generateSeqId(); - - bool getPending( - std::string &fname, - ::apache::thrift::protocol::TMessageType &mtype, - int32_t &rseqid); /* requires readMutex_ */ - - void updatePending( - const std::string &fname, - ::apache::thrift::protocol::TMessageType mtype, - int32_t rseqid); /* requires readMutex_ */ - - void waitForWork(int32_t seqid); /* requires readMutex_ */ - - ::apache::thrift::concurrency::Mutex &getReadMutex() {return readMutex_;} - ::apache::thrift::concurrency::Mutex &getWriteMutex() {return writeMutex_;} - -private: //constants - enum {MONITOR_CACHE_SIZE = 10}; -private: //functions - MonitorPtr newMonitor_( - const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ - void deleteMonitor_( - const ::apache::thrift::concurrency::Guard &seqidGuard, - MonitorPtr &m); /*noexcept*/ /* requires seqidMutex_ */ - void wakeupAnyone_( - const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ - void markBad_( - const ::apache::thrift::concurrency::Guard &seqidGuard); /* requires seqidMutex_ */ - void throwBadSeqId_(); - void throwDeadConnection_(); -private: //data members - - volatile bool stop_; - - ::apache::thrift::concurrency::Mutex seqidMutex_; - // begin seqidMutex_ protected members - int32_t nextseqid_; - MonitorMap seqidToMonitorMap_; - std::vector<MonitorPtr> freeMonitors_; - // end seqidMutex_ protected members - - ::apache::thrift::concurrency::Mutex writeMutex_; - - ::apache::thrift::concurrency::Mutex readMutex_; - // begin readMutex_ protected members - bool recvPending_; - bool wakeupSomeone_; - int32_t seqidPending_; - std::string fnamePending_; - ::apache::thrift::protocol::TMessageType mtypePending_; - // end readMutex_ protected members - - - friend class TConcurrentSendSentry; - friend class TConcurrentRecvSentry; -}; - -}}} // apache::thrift::async - -#endif // _THRIFT_TCONCURRENTCLIENTSYNCINFO_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp deleted file mode 100644 index 1279bc6..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/async/TEvhttpClientChannel.h> -#include <evhttp.h> -#include <event2/buffer.h> -#include <event2/buffer_compat.h> -#include <thrift/transport/TBufferTransports.h> -#include <thrift/protocol/TProtocolException.h> - -#include <iostream> -#include <sstream> - -using namespace apache::thrift::protocol; -using apache::thrift::transport::TTransportException; - -namespace apache { -namespace thrift { -namespace async { - -TEvhttpClientChannel::TEvhttpClientChannel(const std::string& host, - const std::string& path, - const char* address, - int port, - struct event_base* eb) - : host_(host), path_(path), recvBuf_(NULL), conn_(NULL) { - conn_ = evhttp_connection_new(address, port); - if (conn_ == NULL) { - throw TException("evhttp_connection_new failed"); - } - evhttp_connection_set_base(conn_, eb); -} - -TEvhttpClientChannel::~TEvhttpClientChannel() { - if (conn_ != NULL) { - evhttp_connection_free(conn_); - } -} - -void TEvhttpClientChannel::sendAndRecvMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* sendBuf, - apache::thrift::transport::TMemoryBuffer* recvBuf) { - cob_ = cob; - recvBuf_ = recvBuf; - - struct evhttp_request* req = evhttp_request_new(response, this); - if (req == NULL) { - throw TException("evhttp_request_new failed"); - } - - int rv; - - rv = evhttp_add_header(req->output_headers, "Host", host_.c_str()); - if (rv != 0) { - throw TException("evhttp_add_header failed"); - } - - rv = evhttp_add_header(req->output_headers, "Content-Type", "application/x-thrift"); - if (rv != 0) { - throw TException("evhttp_add_header failed"); - } - - uint8_t* obuf; - uint32_t sz; - sendBuf->getBuffer(&obuf, &sz); - rv = evbuffer_add(req->output_buffer, obuf, sz); - if (rv != 0) { - throw TException("evbuffer_add failed"); - } - - rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str()); - if (rv != 0) { - throw TException("evhttp_make_request failed"); - } -} - -void TEvhttpClientChannel::sendMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* message) { - (void)cob; - (void)message; - throw TProtocolException(TProtocolException::NOT_IMPLEMENTED, - "Unexpected call to TEvhttpClientChannel::sendMessage"); -} - -void TEvhttpClientChannel::recvMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* message) { - (void)cob; - (void)message; - throw TProtocolException(TProtocolException::NOT_IMPLEMENTED, - "Unexpected call to TEvhttpClientChannel::recvMessage"); -} - -void TEvhttpClientChannel::finish(struct evhttp_request* req) { - if (req == NULL) { - try { - cob_(); - } catch (const TTransportException& e) { - if (e.getType() == TTransportException::END_OF_FILE) - throw TException("connect failed"); - else - throw; - } - return; - } else if (req->response_code != 200) { - try { - cob_(); - } catch (const TTransportException& e) { - std::stringstream ss; - ss << "server returned code " << req->response_code; - if (req->response_code_line) - ss << ": " << req->response_code_line; - if (e.getType() == TTransportException::END_OF_FILE) - throw TException(ss.str()); - else - throw; - } - return; - } - recvBuf_->resetBuffer(EVBUFFER_DATA(req->input_buffer), - static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer))); - cob_(); - return; -} - -/* static */ void TEvhttpClientChannel::response(struct evhttp_request* req, void* arg) { - TEvhttpClientChannel* self = (TEvhttpClientChannel*)arg; - try { - self->finish(req); - } catch (std::exception& e) { - // don't propagate a C++ exception in C code (e.g. libevent) - std::cerr << "TEvhttpClientChannel::response exception thrown (ignored): " << e.what() - << std::endl; - } -} -} -} -} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h deleted file mode 100644 index 72ed40f..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpClientChannel.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ -#define _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ 1 - -#include <string> -#include <boost/shared_ptr.hpp> -#include <thrift/async/TAsyncChannel.h> - -struct event_base; -struct evhttp_connection; -struct evhttp_request; - -namespace apache { -namespace thrift { -namespace transport { -class TMemoryBuffer; -} -} -} - -namespace apache { -namespace thrift { -namespace async { - -class TEvhttpClientChannel : public TAsyncChannel { -public: - using TAsyncChannel::VoidCallback; - - TEvhttpClientChannel(const std::string& host, - const std::string& path, - const char* address, - int port, - struct event_base* eb); - ~TEvhttpClientChannel(); - - virtual void sendAndRecvMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* sendBuf, - apache::thrift::transport::TMemoryBuffer* recvBuf); - - virtual void sendMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* message); - virtual void recvMessage(const VoidCallback& cob, - apache::thrift::transport::TMemoryBuffer* message); - - void finish(struct evhttp_request* req); - - // XXX - virtual bool good() const { return true; } - virtual bool error() const { return false; } - virtual bool timedOut() const { return false; } - -private: - static void response(struct evhttp_request* req, void* arg); - - std::string host_; - std::string path_; - VoidCallback cob_; - apache::thrift::transport::TMemoryBuffer* recvBuf_; - struct evhttp_connection* conn_; -}; -} -} -} // apache::thrift::async - -#endif // #ifndef _THRIFT_TEVHTTP_CLIENT_CHANNEL_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp deleted file mode 100644 index 57d0d61..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.cpp +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/async/TEvhttpServer.h> -#include <thrift/async/TAsyncBufferProcessor.h> -#include <thrift/transport/TBufferTransports.h> -#include <evhttp.h> -#include <event2/buffer.h> -#include <event2/buffer_compat.h> - -#include <iostream> - -#ifndef HTTP_INTERNAL // libevent < 2 -#define HTTP_INTERNAL 500 -#endif - -using apache::thrift::transport::TMemoryBuffer; - -namespace apache { -namespace thrift { -namespace async { - -struct TEvhttpServer::RequestContext { - struct evhttp_request* req; - boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> ibuf; - boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> obuf; - - RequestContext(struct evhttp_request* req); -}; - -TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor) - : processor_(processor), eb_(NULL), eh_(NULL) { -} - -TEvhttpServer::TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port) - : processor_(processor), eb_(NULL), eh_(NULL) { - // Create event_base and evhttp. - eb_ = event_base_new(); - if (eb_ == NULL) { - throw TException("event_base_new failed"); - } - eh_ = evhttp_new(eb_); - if (eh_ == NULL) { - event_base_free(eb_); - throw TException("evhttp_new failed"); - } - - // Bind to port. - int ret = evhttp_bind_socket(eh_, NULL, port); - if (ret < 0) { - evhttp_free(eh_); - event_base_free(eb_); - throw TException("evhttp_bind_socket failed"); - } - - // Register a handler. If you use the other constructor, - // you will want to do this yourself. - // Don't forget to unregister before destorying this TEvhttpServer. - evhttp_set_cb(eh_, "/", request, (void*)this); -} - -TEvhttpServer::~TEvhttpServer() { - if (eh_ != NULL) { - evhttp_free(eh_); - } - if (eb_ != NULL) { - event_base_free(eb_); - } -} - -int TEvhttpServer::serve() { - if (eb_ == NULL) { - throw TException("Unexpected call to TEvhttpServer::serve"); - } - return event_base_dispatch(eb_); -} - -TEvhttpServer::RequestContext::RequestContext(struct evhttp_request* req) - : req(req), - ibuf(new TMemoryBuffer(EVBUFFER_DATA(req->input_buffer), - static_cast<uint32_t>(EVBUFFER_LENGTH(req->input_buffer)))), - obuf(new TMemoryBuffer()) { -} - -void TEvhttpServer::request(struct evhttp_request* req, void* self) { - try { - static_cast<TEvhttpServer*>(self)->process(req); - } catch (std::exception& e) { - evhttp_send_reply(req, HTTP_INTERNAL, e.what(), 0); - } -} - -void TEvhttpServer::process(struct evhttp_request* req) { - RequestContext* ctx = new RequestContext(req); - return processor_->process(apache::thrift::stdcxx::bind(&TEvhttpServer::complete, - this, - ctx, - apache::thrift::stdcxx::placeholders::_1), - ctx->ibuf, - ctx->obuf); -} - -void TEvhttpServer::complete(RequestContext* ctx, bool success) { - (void)success; - std::auto_ptr<RequestContext> ptr(ctx); - - int code = success ? 200 : 400; - const char* reason = success ? "OK" : "Bad Request"; - - int rv = evhttp_add_header(ctx->req->output_headers, "Content-Type", "application/x-thrift"); - if (rv != 0) { - // TODO: Log an error. - std::cerr << "evhttp_add_header failed " << __FILE__ << ":" << __LINE__ << std::endl; - } - - struct evbuffer* buf = evbuffer_new(); - if (buf == NULL) { - // TODO: Log an error. - std::cerr << "evbuffer_new failed " << __FILE__ << ":" << __LINE__ << std::endl; - } else { - uint8_t* obuf; - uint32_t sz; - ctx->obuf->getBuffer(&obuf, &sz); - int ret = evbuffer_add(buf, obuf, sz); - if (ret != 0) { - // TODO: Log an error. - std::cerr << "evhttp_add failed with " << ret << " " << __FILE__ << ":" << __LINE__ - << std::endl; - } - } - - evhttp_send_reply(ctx->req, code, reason, buf); - if (buf != NULL) { - evbuffer_free(buf); - } -} - -struct event_base* TEvhttpServer::getEventBase() { - return eb_; -} -} -} -} // apache::thrift::async http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h deleted file mode 100644 index 89bf337..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/async/TEvhttpServer.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TEVHTTP_SERVER_H_ -#define _THRIFT_TEVHTTP_SERVER_H_ 1 - -#include <boost/shared_ptr.hpp> - -struct event_base; -struct evhttp; -struct evhttp_request; - -namespace apache { -namespace thrift { -namespace async { - -class TAsyncBufferProcessor; - -class TEvhttpServer { -public: - /** - * Create a TEvhttpServer for use with an external evhttp instance. - * Must be manually installed with evhttp_set_cb, using - * TEvhttpServer::request as the callback and the - * address of the server as the extra arg. - * Do not call "serve" on this server. - */ - TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor); - - /** - * Create a TEvhttpServer with an embedded event_base and evhttp, - * listening on port and responding on the endpoint "/". - * Call "serve" on this server to serve forever. - */ - TEvhttpServer(boost::shared_ptr<TAsyncBufferProcessor> processor, int port); - - ~TEvhttpServer(); - - static void request(struct evhttp_request* req, void* self); - int serve(); - - struct event_base* getEventBase(); - -private: - struct RequestContext; - - void process(struct evhttp_request* req); - void complete(RequestContext* ctx, bool success); - - boost::shared_ptr<TAsyncBufferProcessor> processor_; - struct event_base* eb_; - struct evhttp* eh_; -}; -} -} -} // apache::thrift::async - -#endif // #ifndef _THRIFT_TEVHTTP_SERVER_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp deleted file mode 100644 index 6c24d82..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMonitor.cpp +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <thrift/concurrency/Monitor.h> -#include <thrift/concurrency/Exception.h> -#include <thrift/concurrency/Util.h> -#include <thrift/transport/PlatformSocket.h> -#include <assert.h> - -#include <boost/scoped_ptr.hpp> -#include <boost/thread.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Monitor implementation using the boost thread library - * - * @version $Id:$ - */ -class Monitor::Impl : public boost::condition_variable_any { - -public: - Impl() : ownedMutex_(new Mutex()), mutex_(NULL) { init(ownedMutex_.get()); } - - Impl(Mutex* mutex) : mutex_(NULL) { init(mutex); } - - Impl(Monitor* monitor) : mutex_(NULL) { init(&(monitor->mutex())); } - - Mutex& mutex() { return *mutex_; } - void lock() { mutex().lock(); } - void unlock() { mutex().unlock(); } - - /** - * Exception-throwing version of waitForTimeRelative(), called simply - * wait(int64) for historical reasons. Timeout is in milliseconds. - * - * If the condition occurs, this function returns cleanly; on timeout or - * error an exception is thrown. - */ - void wait(int64_t timeout_ms) { - int result = waitForTimeRelative(timeout_ms); - if (result == THRIFT_ETIMEDOUT) { - throw TimedOutException(); - } else if (result != 0) { - throw TException("Monitor::wait() failed"); - } - } - - /** - * Waits until the specified timeout in milliseconds for the condition to - * occur, or waits forever if timeout_ms == 0. - * - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTimeRelative(int64_t timeout_ms) { - if (timeout_ms == 0LL) { - return waitForever(); - } - - assert(mutex_); - boost::timed_mutex* mutexImpl - = reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); - int res - = timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(timeout_ms)) - ? 0 - : THRIFT_ETIMEDOUT; - lock.release(); - return res; - } - - /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const THRIFT_TIMESPEC* abstime) { - struct timeval temp; - temp.tv_sec = static_cast<long>(abstime->tv_sec); - temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000; - return waitForTime(&temp); - } - - /** - * Waits until the absolute time specified using struct timeval. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const struct timeval* abstime) { - assert(mutex_); - boost::timed_mutex* mutexImpl = static_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - struct timeval currenttime; - Util::toTimeval(currenttime, Util::currentTime()); - - long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec); - long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec); - if (tv_sec < 0) - tv_sec = 0; - if (tv_usec < 0) - tv_usec = 0; - - boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); - int res = timed_wait(lock, - boost::get_system_time() + boost::posix_time::seconds(tv_sec) - + boost::posix_time::microseconds(tv_usec)) - ? 0 - : THRIFT_ETIMEDOUT; - lock.release(); - return res; - } - - /** - * Waits forever until the condition occurs. - * Returns 0 if condition occurs, or an error code otherwise. - */ - int waitForever() { - assert(mutex_); - boost::timed_mutex* mutexImpl - = reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock); - ((boost::condition_variable_any*)this)->wait(lock); - lock.release(); - return 0; - } - - void notify() { notify_one(); } - - void notifyAll() { notify_all(); } - -private: - void init(Mutex* mutex) { mutex_ = mutex; } - - boost::scoped_ptr<Mutex> ownedMutex_; - Mutex* mutex_; -}; - -Monitor::Monitor() : impl_(new Monitor::Impl()) { -} -Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { -} -Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { -} - -Monitor::~Monitor() { - delete impl_; -} - -Mutex& Monitor::mutex() const { - return const_cast<Monitor::Impl*>(impl_)->mutex(); -} - -void Monitor::lock() const { - const_cast<Monitor::Impl*>(impl_)->lock(); -} - -void Monitor::unlock() const { - const_cast<Monitor::Impl*>(impl_)->unlock(); -} - -void Monitor::wait(int64_t timeout) const { - const_cast<Monitor::Impl*>(impl_)->wait(timeout); -} - -int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { - return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); -} - -int Monitor::waitForTime(const timeval* abstime) const { - return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime); -} - -int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms); -} - -int Monitor::waitForever() const { - return const_cast<Monitor::Impl*>(impl_)->waitForever(); -} - -void Monitor::notify() const { - const_cast<Monitor::Impl*>(impl_)->notify(); -} - -void Monitor::notifyAll() const { - const_cast<Monitor::Impl*>(impl_)->notifyAll(); -} -} -} -} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp deleted file mode 100644 index f7cadab..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostMutex.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <thrift/concurrency/Mutex.h> -#include <thrift/concurrency/Util.h> -#include <thrift/Thrift.h> - -#include <cassert> -#include <boost/thread.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/date_time/posix_time/posix_time.hpp> - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Implementation of Mutex class using boost interprocess mutex - * - * @version $Id:$ - */ -class Mutex::impl : public boost::timed_mutex {}; - -Mutex::Mutex(Initializer init) : impl_(new Mutex::impl()) { - THRIFT_UNUSED_VARIABLE(init); -} - -void* Mutex::getUnderlyingImpl() const { - return impl_.get(); -} - -void Mutex::lock() const { - impl_->lock(); -} - -bool Mutex::trylock() const { - return impl_->try_lock(); -} - -bool Mutex::timedlock(int64_t ms) const { - return impl_->timed_lock(boost::get_system_time() + boost::posix_time::milliseconds(ms)); -} - -void Mutex::unlock() const { - impl_->unlock(); -} - -void Mutex::DEFAULT_INITIALIZER(void* arg) { - THRIFT_UNUSED_VARIABLE(arg); -} -} -} -} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp deleted file mode 100644 index 96cb6d6..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.cpp +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#if USE_BOOST_THREAD - -#include <thrift/concurrency/BoostThreadFactory.h> -#include <thrift/concurrency/Exception.h> - -#include <cassert> - -#include <boost/weak_ptr.hpp> -#include <boost/thread.hpp> - -namespace apache { -namespace thrift { -namespace concurrency { - -using boost::shared_ptr; -using boost::weak_ptr; - -/** - * The boost thread class. - * - * @version $Id:$ - */ -class BoostThread : public Thread { -public: - enum STATE { uninitialized, starting, started, stopping, stopped }; - - static void* threadMain(void* arg); - -private: - std::auto_ptr<boost::thread> thread_; - STATE state_; - weak_ptr<BoostThread> self_; - bool detached_; - -public: - BoostThread(bool detached, shared_ptr<Runnable> runnable) - : state_(uninitialized), detached_(detached) { - this->Thread::runnable(runnable); - } - - ~BoostThread() { - if (!detached_) { - try { - join(); - } catch (...) { - // We're really hosed. - } - } - } - - void start() { - if (state_ != uninitialized) { - return; - } - - // Create reference - shared_ptr<BoostThread>* selfRef = new shared_ptr<BoostThread>(); - *selfRef = self_.lock(); - - state_ = starting; - - thread_ - = std::auto_ptr<boost::thread>(new boost::thread(boost::bind(threadMain, (void*)selfRef))); - - if (detached_) - thread_->detach(); - } - - void join() { - if (!detached_ && state_ != uninitialized) { - thread_->join(); - } - } - - Thread::id_t getId() { return thread_.get() ? thread_->get_id() : boost::thread::id(); } - - shared_ptr<Runnable> runnable() const { return Thread::runnable(); } - - void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); } - - void weakRef(shared_ptr<BoostThread> self) { - assert(self.get() == this); - self_ = weak_ptr<BoostThread>(self); - } -}; - -void* BoostThread::threadMain(void* arg) { - shared_ptr<BoostThread> thread = *(shared_ptr<BoostThread>*)arg; - delete reinterpret_cast<shared_ptr<BoostThread>*>(arg); - - if (!thread) { - return (void*)0; - } - - if (thread->state_ != starting) { - return (void*)0; - } - - thread->state_ = started; - thread->runnable()->run(); - - if (thread->state_ != stopping && thread->state_ != stopped) { - thread->state_ = stopping; - } - return (void*)0; -} - -/** - * POSIX Thread factory implementation - */ -class BoostThreadFactory::Impl { - -private: - bool detached_; - -public: - Impl(bool detached) : detached_(detached) {} - - /** - * Creates a new POSIX thread to run the runnable object - * - * @param runnable A runnable object - */ - shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { - shared_ptr<BoostThread> result = shared_ptr<BoostThread>(new BoostThread(detached_, runnable)); - result->weakRef(result); - runnable->thread(result); - return result; - } - - bool isDetached() const { return detached_; } - - void setDetached(bool value) { detached_ = value; } - - Thread::id_t getCurrentThreadId() const { return boost::this_thread::get_id(); } -}; - -BoostThreadFactory::BoostThreadFactory(bool detached) - : impl_(new BoostThreadFactory::Impl(detached)) { -} - -shared_ptr<Thread> BoostThreadFactory::newThread(shared_ptr<Runnable> runnable) const { - return impl_->newThread(runnable); -} - -bool BoostThreadFactory::isDetached() const { - return impl_->isDetached(); -} - -void BoostThreadFactory::setDetached(bool value) { - impl_->setDetached(value); -} - -Thread::id_t BoostThreadFactory::getCurrentThreadId() const { - return impl_->getCurrentThreadId(); -} -} -} -} // apache::thrift::concurrency - -#endif // USE_BOOST_THREAD http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h deleted file mode 100644 index e6d1a56..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/BoostThreadFactory.h +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ -#define _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ 1 - -#include <thrift/concurrency/Thread.h> - -#include <boost/shared_ptr.hpp> - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * A thread factory to create posix threads - * - * @version $Id:$ - */ -class BoostThreadFactory : public ThreadFactory { - -public: - /** - * Boost thread factory. All threads created by a factory are reference-counted - * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and - * the Runnable tasks they host will be properly cleaned up once the last strong reference - * to both is given up. - * - * Threads are created with the specified boost policy, priority, stack-size. A detachable thread - * is not joinable. - * - * By default threads are not joinable. - */ - - BoostThreadFactory(bool detached = true); - - // From ThreadFactory; - boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const; - - // From ThreadFactory; - Thread::id_t getCurrentThreadId() const; - - /** - * Sets detached mode of threads - */ - virtual void setDetached(bool detached); - - /** - * Gets current detached mode - */ - virtual bool isDetached() const; - -private: - class Impl; - boost::shared_ptr<Impl> impl_; -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_BOOSTTHREADFACTORY_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h deleted file mode 100644 index 6438fda..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Exception.h +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ -#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1 - -#include <exception> -#include <thrift/Thrift.h> - -namespace apache { -namespace thrift { -namespace concurrency { - -class NoSuchTaskException : public apache::thrift::TException {}; - -class UncancellableTaskException : public apache::thrift::TException {}; - -class InvalidArgumentException : public apache::thrift::TException {}; - -class IllegalStateException : public apache::thrift::TException { -public: - IllegalStateException() {} - IllegalStateException(const std::string& message) : TException(message) {} -}; - -class TimedOutException : public apache::thrift::TException { -public: - TimedOutException() : TException("TimedOutException"){}; - TimedOutException(const std::string& message) : TException(message) {} -}; - -class TooManyPendingTasksException : public apache::thrift::TException { -public: - TooManyPendingTasksException() : TException("TooManyPendingTasksException"){}; - TooManyPendingTasksException(const std::string& message) : TException(message) {} -}; - -class SystemResourceException : public apache::thrift::TException { -public: - SystemResourceException() {} - - SystemResourceException(const std::string& message) : TException(message) {} -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h deleted file mode 100644 index b776794..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/FunctionRunner.h +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H -#define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1 - -#include <thrift/cxxfunctional.h> -#include <thrift/concurrency/Thread.h> - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * Convenient implementation of Runnable that will execute arbitrary callbacks. - * Interfaces are provided to accept both a generic 'void(void)' callback, and - * a 'void* (void*)' pthread_create-style callback. - * - * Example use: - * void* my_thread_main(void* arg); - * shared_ptr<ThreadFactory> factory = ...; - * // To create a thread that executes my_thread_main once: - * shared_ptr<Thread> thread = factory->newThread( - * FunctionRunner::create(my_thread_main, some_argument)); - * thread->start(); - * - * bool A::foo(); - * A* a = new A(); - * // To create a thread that executes a.foo() every 100 milliseconds: - * factory->newThread(FunctionRunner::create( - * apache::thrift::stdcxx::bind(&A::foo, a), 100))->start(); - * - */ - -class FunctionRunner : public Runnable { -public: - // This is the type of callback 'pthread_create()' expects. - typedef void* (*PthreadFuncPtr)(void* arg); - // This a fully-generic void(void) callback for custom bindings. - typedef apache::thrift::stdcxx::function<void()> VoidFunc; - - typedef apache::thrift::stdcxx::function<bool()> BoolFunc; - - /** - * Syntactic sugar to make it easier to create new FunctionRunner - * objects wrapped in shared_ptr. - */ - static boost::shared_ptr<FunctionRunner> create(const VoidFunc& cob) { - return boost::shared_ptr<FunctionRunner>(new FunctionRunner(cob)); - } - - static boost::shared_ptr<FunctionRunner> create(PthreadFuncPtr func, void* arg) { - return boost::shared_ptr<FunctionRunner>(new FunctionRunner(func, arg)); - } - -private: - static void pthread_func_wrapper(PthreadFuncPtr func, void* arg) { - // discard return value - func(arg); - } - -public: - /** - * Given a 'pthread_create' style callback, this FunctionRunner will - * execute the given callback. Note that the 'void*' return value is ignored. - */ - FunctionRunner(PthreadFuncPtr func, void* arg) - : func_(apache::thrift::stdcxx::bind(pthread_func_wrapper, func, arg)) {} - - /** - * Given a generic callback, this FunctionRunner will execute it. - */ - FunctionRunner(const VoidFunc& cob) : func_(cob) {} - - /** - * Given a bool foo(...) type callback, FunctionRunner will execute - * the callback repeatedly with 'intervalMs' milliseconds between the calls, - * until it returns false. Note that the actual interval between calls will - * be intervalMs plus execution time of the callback. - */ - FunctionRunner(const BoolFunc& cob, int intervalMs) : repFunc_(cob), intervalMs_(intervalMs) {} - - void run() { - if (repFunc_) { - while (repFunc_()) { - THRIFT_SLEEP_USEC(intervalMs_ * 1000); - } - } else { - func_(); - } - } - -private: - VoidFunc func_; - BoolFunc repFunc_; - int intervalMs_; -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp deleted file mode 100644 index 5e713c0..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.cpp +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/concurrency/Monitor.h> -#include <thrift/concurrency/Exception.h> -#include <thrift/concurrency/Util.h> -#include <thrift/transport/PlatformSocket.h> - -#include <boost/scoped_ptr.hpp> - -#include <assert.h> - -#include <iostream> - -#include <pthread.h> - -namespace apache { -namespace thrift { -namespace concurrency { - -using boost::scoped_ptr; - -/** - * Monitor implementation using the POSIX pthread library - * - * @version $Id:$ - */ -class Monitor::Impl { - -public: - Impl() : ownedMutex_(new Mutex()), mutex_(NULL), condInitialized_(false) { - init(ownedMutex_.get()); - } - - Impl(Mutex* mutex) : mutex_(NULL), condInitialized_(false) { init(mutex); } - - Impl(Monitor* monitor) : mutex_(NULL), condInitialized_(false) { init(&(monitor->mutex())); } - - ~Impl() { cleanup(); } - - Mutex& mutex() { return *mutex_; } - void lock() { mutex().lock(); } - void unlock() { mutex().unlock(); } - - /** - * Exception-throwing version of waitForTimeRelative(), called simply - * wait(int64) for historical reasons. Timeout is in milliseconds. - * - * If the condition occurs, this function returns cleanly; on timeout or - * error an exception is thrown. - */ - void wait(int64_t timeout_ms) const { - int result = waitForTimeRelative(timeout_ms); - if (result == THRIFT_ETIMEDOUT) { - // pthread_cond_timedwait has been observed to return early on - // various platforms, so comment out this assert. - // assert(Util::currentTime() >= (now + timeout)); - throw TimedOutException(); - } else if (result != 0) { - throw TException("pthread_cond_wait() or pthread_cond_timedwait() failed"); - } - } - - /** - * Waits until the specified timeout in milliseconds for the condition to - * occur, or waits forever if timeout_ms == 0. - * - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTimeRelative(int64_t timeout_ms) const { - if (timeout_ms == 0LL) { - return waitForever(); - } - - struct THRIFT_TIMESPEC abstime; - Util::toTimespec(abstime, Util::currentTime() + timeout_ms); - return waitForTime(&abstime); - } - - /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const THRIFT_TIMESPEC* abstime) const { - assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - - // XXX Need to assert that caller owns mutex - return pthread_cond_timedwait(&pthread_cond_, mutexImpl, abstime); - } - - int waitForTime(const struct timeval* abstime) const { - struct THRIFT_TIMESPEC temp; - temp.tv_sec = abstime->tv_sec; - temp.tv_nsec = abstime->tv_usec * 1000; - return waitForTime(&temp); - } - /** - * Waits forever until the condition occurs. - * Returns 0 if condition occurs, or an error code otherwise. - */ - int waitForever() const { - assert(mutex_); - pthread_mutex_t* mutexImpl = reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl()); - assert(mutexImpl); - return pthread_cond_wait(&pthread_cond_, mutexImpl); - } - - void notify() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_signal(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - - void notifyAll() { - // XXX Need to assert that caller owns mutex - int iret = pthread_cond_broadcast(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - -private: - void init(Mutex* mutex) { - mutex_ = mutex; - - if (pthread_cond_init(&pthread_cond_, NULL) == 0) { - condInitialized_ = true; - } - - if (!condInitialized_) { - cleanup(); - throw SystemResourceException(); - } - } - - void cleanup() { - if (condInitialized_) { - condInitialized_ = false; - int iret = pthread_cond_destroy(&pthread_cond_); - THRIFT_UNUSED_VARIABLE(iret); - assert(iret == 0); - } - } - - scoped_ptr<Mutex> ownedMutex_; - Mutex* mutex_; - - mutable pthread_cond_t pthread_cond_; - mutable bool condInitialized_; -}; - -Monitor::Monitor() : impl_(new Monitor::Impl()) { -} -Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) { -} -Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) { -} - -Monitor::~Monitor() { - delete impl_; -} - -Mutex& Monitor::mutex() const { - return impl_->mutex(); -} - -void Monitor::lock() const { - impl_->lock(); -} - -void Monitor::unlock() const { - impl_->unlock(); -} - -void Monitor::wait(int64_t timeout) const { - impl_->wait(timeout); -} - -int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const { - return impl_->waitForTime(abstime); -} - -int Monitor::waitForTime(const timeval* abstime) const { - return impl_->waitForTime(abstime); -} - -int Monitor::waitForTimeRelative(int64_t timeout_ms) const { - return impl_->waitForTimeRelative(timeout_ms); -} - -int Monitor::waitForever() const { - return impl_->waitForever(); -} - -void Monitor::notify() const { - impl_->notify(); -} - -void Monitor::notifyAll() const { - impl_->notifyAll(); -} -} -} -} // apache::thrift::concurrency http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/72ea8afd/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h ---------------------------------------------------------------------- diff --git a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h b/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h deleted file mode 100644 index 5472f85..0000000 --- a/depends/thirdparty/thrift/lib/cpp/src/thrift/concurrency/Monitor.h +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_CONCURRENCY_MONITOR_H_ -#define _THRIFT_CONCURRENCY_MONITOR_H_ 1 - -#include <thrift/concurrency/Exception.h> -#include <thrift/concurrency/Mutex.h> - -#include <boost/utility.hpp> - -namespace apache { -namespace thrift { -namespace concurrency { - -/** - * A monitor is a combination mutex and condition-event. Waiting and - * notifying condition events requires that the caller own the mutex. Mutex - * lock and unlock operations can be performed independently of condition - * events. This is more or less analogous to java.lang.Object multi-thread - * operations. - * - * Note the Monitor can create a new, internal mutex; alternatively, a - * separate Mutex can be passed in and the Monitor will re-use it without - * taking ownership. It's the user's responsibility to make sure that the - * Mutex is not deallocated before the Monitor. - * - * Note that all methods are const. Monitors implement logical constness, not - * bit constness. This allows const methods to call monitor methods without - * needing to cast away constness or change to non-const signatures. - * - * @version $Id:$ - */ -class Monitor : boost::noncopyable { -public: - /** Creates a new mutex, and takes ownership of it. */ - Monitor(); - - /** Uses the provided mutex without taking ownership. */ - explicit Monitor(Mutex* mutex); - - /** Uses the mutex inside the provided Monitor without taking ownership. */ - explicit Monitor(Monitor* monitor); - - /** Deallocates the mutex only if we own it. */ - virtual ~Monitor(); - - Mutex& mutex() const; - - virtual void lock() const; - - virtual void unlock() const; - - /** - * Waits a maximum of the specified timeout in milliseconds for the condition - * to occur, or waits forever if timeout_ms == 0. - * - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTimeRelative(int64_t timeout_ms) const; - - /** - * Waits until the absolute time specified using struct THRIFT_TIMESPEC. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const THRIFT_TIMESPEC* abstime) const; - - /** - * Waits until the absolute time specified using struct timeval. - * Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code. - */ - int waitForTime(const struct timeval* abstime) const; - - /** - * Waits forever until the condition occurs. - * Returns 0 if condition occurs, or an error code otherwise. - */ - int waitForever() const; - - /** - * Exception-throwing version of waitForTimeRelative(), called simply - * wait(int64) for historical reasons. Timeout is in milliseconds. - * - * If the condition occurs, this function returns cleanly; on timeout or - * error an exception is thrown. - */ - void wait(int64_t timeout_ms = 0LL) const; - - /** Wakes up one thread waiting on this monitor. */ - virtual void notify() const; - - /** Wakes up all waiting threads on this monitor. */ - virtual void notifyAll() const; - -private: - class Impl; - - Impl* impl_; -}; - -class Synchronized { -public: - Synchronized(const Monitor* monitor) : g(monitor->mutex()) {} - Synchronized(const Monitor& monitor) : g(monitor.mutex()) {} - -private: - Guard g; -}; -} -} -} // apache::thrift::concurrency - -#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
