Repository: thrift Updated Branches: refs/heads/master 811d279d5 -> 5ec805b22
THRIFT-3081 consolidate client processing loop in Simple, Threaded, and Thread Pool servers Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/5ec805b2 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/5ec805b2 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/5ec805b2 Branch: refs/heads/master Commit: 5ec805b22b81001b1b785cd7f85eb8647fde60df Parents: 811d279 Author: Jim King <[email protected]> Authored: Sun Apr 26 07:52:40 2015 -0400 Committer: Roger Meier <[email protected]> Committed: Sun Apr 26 20:58:17 2015 +0200 ---------------------------------------------------------------------- lib/cpp/CMakeLists.txt | 1 + lib/cpp/Makefile.am | 1 + lib/cpp/src/thrift/server/TConnectedClient.cpp | 121 +++++++++++++++++ lib/cpp/src/thrift/server/TConnectedClient.h | 116 +++++++++++++++++ lib/cpp/src/thrift/server/TSimpleServer.cpp | 56 +------- lib/cpp/src/thrift/server/TSimpleServer.h | 2 +- lib/cpp/src/thrift/server/TThreadPoolServer.cpp | 84 ++---------- lib/cpp/src/thrift/server/TThreadPoolServer.h | 13 +- lib/cpp/src/thrift/server/TThreadedServer.cpp | 129 ++++--------------- lib/cpp/src/thrift/server/TThreadedServer.h | 96 ++++++-------- 10 files changed, 333 insertions(+), 286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index a965593..c11fc56 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -54,6 +54,7 @@ set( thriftcpp_SOURCES src/thrift/transport/TServerSocket.cpp src/thrift/transport/TTransportUtils.cpp src/thrift/transport/TBufferTransports.cpp + src/thrift/server/TConnectedClient.cpp src/thrift/server/TServer.cpp src/thrift/server/TSimpleServer.cpp src/thrift/server/TThreadPoolServer.cpp http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index e6a6015..cb30bda 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -89,6 +89,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/transport/TSSLServerSocket.cpp \ src/thrift/transport/TTransportUtils.cpp \ src/thrift/transport/TBufferTransports.cpp \ + src/thrift/server/TConnectedClient.cpp \ src/thrift/server/TServer.cpp \ src/thrift/server/TSimpleServer.cpp \ src/thrift/server/TThreadPoolServer.cpp \ http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TConnectedClient.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TConnectedClient.cpp b/lib/cpp/src/thrift/server/TConnectedClient.cpp new file mode 100644 index 0000000..630c28e --- /dev/null +++ b/lib/cpp/src/thrift/server/TConnectedClient.cpp @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/server/TConnectedClient.h> + +namespace apache { +namespace thrift { +namespace server { + +using apache::thrift::TProcessor; +using apache::thrift::protocol::TProtocol; +using apache::thrift::server::TServerEventHandler; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; +using boost::shared_ptr; +using std::string; + +TConnectedClient::TConnectedClient(const string& serverType, + const shared_ptr<TProcessor>& processor, + const shared_ptr<TProtocol>& inputProtocol, + const shared_ptr<TProtocol>& outputProtocol, + const shared_ptr<TServerEventHandler>& eventHandler, + const shared_ptr<TTransport>& client) + + : serverType_(serverType), + processor_(processor), + inputProtocol_(inputProtocol), + outputProtocol_(outputProtocol), + eventHandler_(eventHandler), + client_(client), + opaqueContext_(0) {} + +TConnectedClient::~TConnectedClient() {} + +void TConnectedClient::run() { + if (eventHandler_) { + opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_); + } + + for (;;) { + if (eventHandler_) { + eventHandler_->processContext(opaqueContext_, client_); + } + + try { + if (!processor_->process(inputProtocol_, outputProtocol_, opaqueContext_)) { + break; + } + } catch (const TTransportException& ttx) { + if (ttx.getType() == TTransportException::TIMED_OUT) { + // Receive timeout - continue processing. + continue; + } else if (ttx.getType() == TTransportException::END_OF_FILE || + ttx.getType() == TTransportException::INTERRUPTED) { + // Client disconnected or was interrupted. No logging needed. Done. + break; + } else { + // All other transport exceptions are logged. + // State of connection is unknown. Done. + string errStr = (serverType_ + " client died: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + break; + } + } catch (const TException& tex) { + // Some protocols throw this after they send an error response to the client + // They should be trained to return true instead and if they want to log, + // then they should log. + string errStr = (serverType_ + " processing exception: ") + tex.what(); + GlobalOutput(errStr.c_str()); + // Continue processing + } + } + + cleanup(); +} + +void TConnectedClient::cleanup() +{ + if (eventHandler_) { + eventHandler_->deleteContext(opaqueContext_, inputProtocol_, outputProtocol_); + } + + try { + inputProtocol_->getTransport()->close(); + } catch (const TTransportException& ttx) { + string errStr = string(serverType_ + " input close failed: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + } + try { + outputProtocol_->getTransport()->close(); + } catch (const TTransportException& ttx) { + string errStr = string(serverType_ + " output close failed: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + } + try { + client_->close(); + } catch (const TTransportException& ttx) { + string errStr = string(serverType_ + " client close failed: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + } +} + +} +} +} // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TConnectedClient.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TConnectedClient.h b/lib/cpp/src/thrift/server/TConnectedClient.h new file mode 100644 index 0000000..6304398 --- /dev/null +++ b/lib/cpp/src/thrift/server/TConnectedClient.h @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_ +#define _THRIFT_SERVER_TCONNECTEDCLIENT_H_ 1 + +#include <boost/shared_ptr.hpp> +#include <thrift/TProcessor.h> +#include <thrift/protocol/TProtocol.h> +#include <thrift/server/TServer.h> +#include <thrift/transport/TTransport.h> + +namespace apache { +namespace thrift { +namespace server { + +/** + * This represents a client connected to a TServer. The + * processing loop for a client must provide some required + * functionality common to all implementations so it is + * encapsulated here. + */ + +class TConnectedClient : public apache::thrift::concurrency::Runnable +{ + public: + /** + * Constructor. + * + * @param[in] serverType the server type as a string, used + * for logging output. + * @param[in] processor the TProcessor + * @param[in] inputProtocol the input TProtocol + * @param[in] outputProtocol the output TProtocol + * @param[in] eventHandler the server event handler + * @param[in] client the TTransport representing the client + */ + TConnectedClient( + const std::string& serverType, + const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol, + const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol, + const boost::shared_ptr<apache::thrift::server::TServerEventHandler>& eventHandler, + const boost::shared_ptr<apache::thrift::transport::TTransport>& client); + + /** + * Destructor. + */ + virtual ~TConnectedClient(); + + /** + * Drive the client until it is done. + * The client processing loop is: + * + * [optional] call eventHandler->createContext once + * [optional] call eventHandler->processContext per request + * call processor->process per request + * handle expected transport exceptions: + * END_OF_FILE means the client is gone + * INTERRUPTED means the client was interrupted + * by TServerTransport::interruptChildren() + * handle unexpected transport exceptions by logging + * handle standard exceptions by logging + * handle unexpected exceptions by logging + * cleanup() + */ + virtual void run() /* override */; + + protected: + /** + * Cleanup after a client. This happens if the client disconnects, + * or if the server is stopped, or if an exception occurs. + * + * The cleanup processing is: + * [optional] call eventHandler->deleteContext once + * close the inputProtocol's TTransport + * close the outputProtocol's TTransport + * close the client + */ + virtual void cleanup(); + + private: + std::string serverType_; + boost::shared_ptr<apache::thrift::TProcessor> processor_; + boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_; + boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_; + boost::shared_ptr<apache::thrift::server::TServerEventHandler> eventHandler_; + boost::shared_ptr<apache::thrift::transport::TTransport> client_; + + /** + * Context acquired from the eventHandler_ if one exists. + */ + void *opaqueContext_; +}; + +} +} +} + +#endif // #ifndef _THRIFT_SERVER_TCONNECTEDCLIENT_H_ http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TSimpleServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp index 19f44ac..b63c45e 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.cpp +++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp @@ -17,6 +17,7 @@ * under the License. */ +#include <thrift/server/TConnectedClient.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TTransportException.h> #include <string> @@ -103,58 +104,9 @@ void TSimpleServer::serve() { break; } - // Get the processor - shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client); - - void* connectionContext = NULL; - if (eventHandler_) { - connectionContext = eventHandler_->createContext(inputProtocol, outputProtocol); - } - try { - for (;;) { - if (eventHandler_) { - eventHandler_->processContext(connectionContext, client); - } - if (!processor->process(inputProtocol, outputProtocol, connectionContext) || - // Peek ahead, is the remote side closed? - !inputProtocol->getTransport()->peek()) { - break; - } - } - } catch (const TTransportException& ttx) { - if (ttx.getType() != TTransportException::END_OF_FILE && - ttx.getType() != TTransportException::INTERRUPTED) - { - string errStr = string("TSimpleServer client died: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - } catch (const std::exception& x) { - GlobalOutput.printf("TSimpleServer exception: %s: %s", typeid(x).name(), x.what()); - } catch (...) { - GlobalOutput("TSimpleServer uncaught exception."); - } - if (eventHandler_) { - eventHandler_->deleteContext(connectionContext, inputProtocol, outputProtocol); - } - - try { - inputTransport->close(); - } catch (const TTransportException& ttx) { - string errStr = string("TSimpleServer input close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - try { - outputTransport->close(); - } catch (const TTransportException& ttx) { - string errStr = string("TSimpleServer output close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - try { - client->close(); - } catch (const TTransportException& ttx) { - string errStr = string("TSimpleServer client close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } + TConnectedClient("TSimpleServer", + getProcessor(inputProtocol, outputProtocol, client), + inputProtocol, outputProtocol, eventHandler_, client).run(); } if (stop_) { http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TSimpleServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h index 941f12b..7b8677d 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.h +++ b/lib/cpp/src/thrift/server/TSimpleServer.h @@ -94,7 +94,7 @@ public: void serve(); /** - * Interrupt serve() so that it meets post-conditions. + * Interrupt serve() so that it meets post-conditions and returns. */ void stop(); http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadPoolServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp index 58cfe3e..f8ed6cf 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp @@ -19,12 +19,14 @@ #include <thrift/thrift-config.h> +#include <thrift/server/TConnectedClient.h> #include <thrift/server/TThreadPoolServer.h> #include <thrift/transport/TTransportException.h> #include <thrift/concurrency/Thread.h> #include <thrift/concurrency/ThreadManager.h> #include <string> #include <iostream> +#include <boost/make_shared.hpp> namespace apache { namespace thrift { @@ -37,78 +39,6 @@ using namespace apache::thrift::concurrency; using namespace apache::thrift::protocol; using namespace apache::thrift::transport; -class TThreadPoolServer::Task : public Runnable { - -public: - Task(TThreadPoolServer& server, - shared_ptr<TProcessor> processor, - shared_ptr<TProtocol> input, - shared_ptr<TProtocol> output, - shared_ptr<TTransport> transport) - : server_(server), - processor_(processor), - input_(input), - output_(output), - transport_(transport) {} - - ~Task() {} - - void run() { - boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler(); - void* connectionContext = NULL; - if (eventHandler) { - connectionContext = eventHandler->createContext(input_, output_); - } - try { - for (;;) { - if (eventHandler) { - eventHandler->processContext(connectionContext, transport_); - } - if (!processor_->process(input_, output_, connectionContext) - || !input_->getTransport()->peek()) { - break; - } - } - } catch (const TTransportException& ttx) { - if (ttx.getType() != TTransportException::END_OF_FILE && - ttx.getType() != TTransportException::INTERRUPTED) { - string errStr = string("TThreadPoolServer::Task client died: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - } catch (const std::exception& x) { - GlobalOutput.printf("TThreadPoolServer exception %s: %s", typeid(x).name(), x.what()); - } catch (...) { - GlobalOutput( - "TThreadPoolServer, unexpected exception in " - "TThreadPoolServer::Task::run()"); - } - - if (eventHandler) { - eventHandler->deleteContext(connectionContext, input_, output_); - } - - try { - input_->getTransport()->close(); - } catch (TTransportException& ttx) { - string errStr = string("TThreadPoolServer input close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - try { - output_->getTransport()->close(); - } catch (TTransportException& ttx) { - string errStr = string("TThreadPoolServer output close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - } - -private: - TServer& server_; - shared_ptr<TProcessor> processor_; - shared_ptr<TProtocol> input_; - shared_ptr<TProtocol> output_; - shared_ptr<TTransport> transport_; -}; - TThreadPoolServer::~TThreadPoolServer() {} void TThreadPoolServer::serve() { @@ -146,9 +76,13 @@ void TThreadPoolServer::serve() { shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client); // Add to threadmanager pool - shared_ptr<TThreadPoolServer::Task> task( - new TThreadPoolServer::Task(*this, processor, inputProtocol, outputProtocol, client)); - threadManager_->add(task, timeout_, taskExpiration_); + threadManager_->add( + boost::make_shared<TConnectedClient>( + "TThreadPoolServer", + getProcessor(inputProtocol, outputProtocol, client), + inputProtocol, outputProtocol, eventHandler_, client), + timeout_, + taskExpiration_); } catch (TTransportException& ttx) { if (inputTransport) { http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadPoolServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h index 1696700..2f93463 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.h +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h @@ -37,8 +37,6 @@ using apache::thrift::transport::TTransportFactory; class TThreadPoolServer : public TServer { public: - class Task; - template <typename ProcessorFactory> TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, const boost::shared_ptr<TServerTransport>& serverTransport, @@ -107,8 +105,19 @@ public: virtual ~TThreadPoolServer(); + /** + * Process all connections that arrive using a thread pool. + * Call stop() on another thread to interrupt processing and + * return control to the caller. + * Post-conditions (return guarantees): + * The serverTransport will be closed. + * There will be no connected clients. + */ virtual void serve(); + /** + * Interrupt serve() so that it meets post-conditions and returns. + */ virtual void stop(); virtual int64_t getTimeout() const; http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadedServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index 118c9cb..4dcdb44 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -17,6 +17,8 @@ * under the License. */ +#include <boost/bind.hpp> +#include <thrift/server/TConnectedClient.h> #include <thrift/server/TThreadedServer.h> #include <thrift/transport/TTransportException.h> #include <thrift/concurrency/PlatformThreadFactory.h> @@ -39,94 +41,6 @@ using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; -class TThreadedServer::Task : public Runnable { - -public: - Task(TThreadedServer& server, - shared_ptr<TProcessor> processor, - shared_ptr<TProtocol> input, - shared_ptr<TProtocol> output, - shared_ptr<TTransport> transport) - : server_(server), - processor_(processor), - input_(input), - output_(output), - transport_(transport) {} - - ~Task() {} - - void run() { - boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler(); - void* connectionContext = NULL; - if (eventHandler) { - connectionContext = eventHandler->createContext(input_, output_); - } - try { - for (;;) { - if (eventHandler) { - eventHandler->processContext(connectionContext, transport_); - } - if (!processor_->process(input_, output_, connectionContext) - || !input_->getTransport()->peek()) { - break; - } - } - } catch (const TTransportException& ttx) { - if (ttx.getType() != TTransportException::END_OF_FILE && - ttx.getType() != TTransportException::INTERRUPTED) { - string errStr = string("TThreadedServer client died: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - } catch (const std::exception& x) { - GlobalOutput.printf("TThreadedServer exception: %s: %s", typeid(x).name(), x.what()); - } catch (...) { - GlobalOutput("TThreadedServer uncaught exception."); - } - if (eventHandler) { - eventHandler->deleteContext(connectionContext, input_, output_); - } - - try { - input_->getTransport()->close(); - } catch (TTransportException& ttx) { - string errStr = string("TThreadedServer input close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - try { - output_->getTransport()->close(); - } catch (TTransportException& ttx) { - string errStr = string("TThreadedServer output close failed: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - - // Remove this task from parent bookkeeping - { - Synchronized s(server_.tasksMonitor_); - server_.tasks_.erase(this); - if (server_.tasks_.empty()) { - server_.tasksMonitor_.notify(); - } - } - } - -private: - TThreadedServer& server_; - friend class TThreadedServer; - - shared_ptr<TProcessor> processor_; - shared_ptr<TProtocol> input_; - shared_ptr<TProtocol> output_; - shared_ptr<TTransport> transport_; -}; - -void TThreadedServer::init() { - stop_ = false; - - if (!threadFactory_) { - threadFactory_.reset(new PlatformThreadFactory); - } -} - TThreadedServer::~TThreadedServer() {} void TThreadedServer::serve() { @@ -162,21 +76,19 @@ void TThreadedServer::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client); - - TThreadedServer::Task* task - = new TThreadedServer::Task(*this, processor, inputProtocol, outputProtocol, client); + shared_ptr<TConnectedClient> pClient( + new TConnectedClient("TThreadedServer", + getProcessor(inputProtocol, outputProtocol, client), + inputProtocol, outputProtocol, eventHandler_, client), + boost::bind(&TThreadedServer::disposeClient, this, _1)); - // Create a task - shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task); - - // Create a thread for this task - shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable)); + // Create a thread for this client + shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient)); // Insert thread into the set of threads { - Synchronized s(tasksMonitor_); - tasks_.insert(task); + Synchronized s(clientsMonitor_); + clients_.insert(pClient.get()); } // Start the thread! @@ -235,9 +147,9 @@ void TThreadedServer::serve() { GlobalOutput(errStr.c_str()); } try { - Synchronized s(tasksMonitor_); - while (!tasks_.empty()) { - tasksMonitor_.wait(); + Synchronized s(clientsMonitor_); + while (!clients_.empty()) { + clientsMonitor_.wait(); } } catch (TException& tx) { string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what(); @@ -254,6 +166,19 @@ void TThreadedServer::stop() { serverTransport_->interruptChildren(); } } + +void TThreadedServer::disposeClient(TConnectedClient *pClient) { + // Remove this task from parent bookkeeping + { + Synchronized s(clientsMonitor_); + clients_.erase(pClient); + if (clients_.empty()) { + clientsMonitor_.notify(); + } + } + delete pClient; +} + } } } // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/5ec805b2/lib/cpp/src/thrift/server/TThreadedServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h index b9b24fe..5d510d6 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.h +++ b/lib/cpp/src/thrift/server/TThreadedServer.h @@ -20,9 +20,11 @@ #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_ #define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1 +#include <set> #include <thrift/server/TServer.h> #include <thrift/transport/TServerTransport.h> #include <thrift/concurrency/Monitor.h> +#include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/Thread.h> #include <boost/shared_ptr.hpp> @@ -35,19 +37,22 @@ using apache::thrift::TProcessor; using apache::thrift::transport::TServerTransport; using apache::thrift::transport::TTransportFactory; using apache::thrift::concurrency::Monitor; +using apache::thrift::concurrency::PlatformThreadFactory; using apache::thrift::concurrency::ThreadFactory; -class TThreadedServer : public TServer { +class TConnectedClient; +class TThreadedServer : public TServer { public: - class Task; - template <typename ProcessorFactory> TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, const boost::shared_ptr<TServerTransport>& serverTransport, const boost::shared_ptr<TTransportFactory>& transportFactory, const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)); + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), + threadFactory_(new PlatformThreadFactory), + stop_(false) {} template <typename ProcessorFactory> TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, @@ -55,14 +60,20 @@ public: const boost::shared_ptr<TTransportFactory>& transportFactory, const boost::shared_ptr<TProtocolFactory>& protocolFactory, const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)); + THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) + : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), + threadFactory_(threadFactory), + stop_(false) {} template <typename Processor> TThreadedServer(const boost::shared_ptr<Processor>& processor, const boost::shared_ptr<TServerTransport>& serverTransport, const boost::shared_ptr<TTransportFactory>& transportFactory, const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)); + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor, serverTransport, transportFactory, protocolFactory), + threadFactory_(new PlatformThreadFactory), + stop_(false) {} template <typename Processor> TThreadedServer(const boost::shared_ptr<Processor>& processor, @@ -70,66 +81,43 @@ public: const boost::shared_ptr<TTransportFactory>& transportFactory, const boost::shared_ptr<TProtocolFactory>& protocolFactory, const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)); + THRIFT_OVERLOAD_IF(Processor, TProcessor)) + : TServer(processor, serverTransport, transportFactory, protocolFactory), + threadFactory_(threadFactory), + stop_(false) {} virtual ~TThreadedServer(); + /** + * Process all connections that arrive, each on their own + * dedicated thread. There is no limit to the number of + * threads or connections (see THRIFT-3084). + * Call stop() on another thread to interrupt processing and + * return control to the caller. + * Post-conditions (return guarantees): + * The serverTransport will be closed. + * There will be no connected clients. + */ virtual void serve(); - void stop(); + + /** + * Interrupt serve() so that it meets post-conditions and returns. + */ + virtual void stop(); protected: - void init(); + /** + * Smart pointer release method + */ + virtual void disposeClient(TConnectedClient *pClient); boost::shared_ptr<ThreadFactory> threadFactory_; volatile bool stop_; - Monitor tasksMonitor_; - std::set<Task*> tasks_; + Monitor clientsMonitor_; + std::set<TConnectedClient*> clients_; }; -template <typename ProcessorFactory> -TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) { - init(); -} - -template <typename ProcessorFactory> -TThreadedServer::TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), - threadFactory_(threadFactory) { - init(); -} - -template <typename Processor> -TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory) { - init(); -} - -template <typename Processor> -TThreadedServer::TThreadedServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory), - threadFactory_(threadFactory) { - init(); -} } } } // apache::thrift::server
