Repository: thrift Updated Branches: refs/heads/master 87bb771d8 -> 21b685240
THRIFT-3083 consolidate simple and threaded server run loops Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/21b68524 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/21b68524 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/21b68524 Branch: refs/heads/master Commit: 21b68524084cb47ada51701aa13061d8820d15e5 Parents: 87bb771 Author: Jim King <[email protected]> Authored: Sun Apr 26 18:30:26 2015 -0400 Committer: Roger Meier <[email protected]> Committed: Thu Apr 30 12:41:16 2015 +0200 ---------------------------------------------------------------------- lib/cpp/CMakeLists.txt | 1 + lib/cpp/Makefile.am | 1 + lib/cpp/src/thrift/server/TConnectedClient.cpp | 55 ++--- lib/cpp/src/thrift/server/TConnectedClient.h | 4 - lib/cpp/src/thrift/server/TServerFramework.cpp | 163 ++++++++++++++ lib/cpp/src/thrift/server/TServerFramework.h | 124 +++++++++++ lib/cpp/src/thrift/server/TSimpleServer.cpp | 148 +++++-------- lib/cpp/src/thrift/server/TSimpleServer.h | 91 +++----- lib/cpp/src/thrift/server/TThreadPoolServer.cpp | 192 ++++++---------- lib/cpp/src/thrift/server/TThreadPoolServer.h | 130 ++++------- lib/cpp/src/thrift/server/TThreadedServer.cpp | 221 +++++++------------ lib/cpp/src/thrift/server/TThreadedServer.h | 121 ++++------ 12 files changed, 642 insertions(+), 609 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index c11fc56..8ea0546 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -56,6 +56,7 @@ set( thriftcpp_SOURCES src/thrift/transport/TBufferTransports.cpp src/thrift/server/TConnectedClient.cpp src/thrift/server/TServer.cpp + src/thrift/server/TServerFramework.cpp src/thrift/server/TSimpleServer.cpp src/thrift/server/TThreadPoolServer.cpp src/thrift/server/TThreadedServer.cpp http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index cb30bda..28ff7c8 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -91,6 +91,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/transport/TBufferTransports.cpp \ src/thrift/server/TConnectedClient.cpp \ src/thrift/server/TServer.cpp \ + src/thrift/server/TServerFramework.cpp \ src/thrift/server/TSimpleServer.cpp \ src/thrift/server/TThreadPoolServer.cpp \ src/thrift/server/TThreadedServer.cpp \ http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TConnectedClient.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TConnectedClient.cpp b/lib/cpp/src/thrift/server/TConnectedClient.cpp index 630c28e..86a81e2 100644 --- a/lib/cpp/src/thrift/server/TConnectedClient.cpp +++ b/lib/cpp/src/thrift/server/TConnectedClient.cpp @@ -31,15 +31,13 @@ using apache::thrift::transport::TTransportException; using boost::shared_ptr; using std::string; -TConnectedClient::TConnectedClient(const string& serverType, - const shared_ptr<TProcessor>& processor, +TConnectedClient::TConnectedClient(const shared_ptr<TProcessor>& processor, const shared_ptr<TProtocol>& inputProtocol, const shared_ptr<TProtocol>& outputProtocol, const shared_ptr<TServerEventHandler>& eventHandler, const shared_ptr<TTransport>& client) - : serverType_(serverType), - processor_(processor), + : processor_(processor), inputProtocol_(inputProtocol), outputProtocol_(outputProtocol), eventHandler_(eventHandler), @@ -53,7 +51,7 @@ void TConnectedClient::run() { opaqueContext_ = eventHandler_->createContext(inputProtocol_, outputProtocol_); } - for (;;) { + for (bool done = false; !done; ) { if (eventHandler_) { eventHandler_->processContext(opaqueContext_, client_); } @@ -63,25 +61,30 @@ void TConnectedClient::run() { break; } } catch (const TTransportException& ttx) { - if (ttx.getType() == TTransportException::TIMED_OUT) { - // Receive timeout - continue processing. - continue; - } else if (ttx.getType() == TTransportException::END_OF_FILE || - ttx.getType() == TTransportException::INTERRUPTED) { - // Client disconnected or was interrupted. No logging needed. Done. - break; - } else { - // All other transport exceptions are logged. - // State of connection is unknown. Done. - string errStr = (serverType_ + " client died: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - break; + switch (ttx.getType()) + { + case TTransportException::TIMED_OUT: + // Receive timeout - continue processing. + continue; + + case TTransportException::END_OF_FILE: + case TTransportException::INTERRUPTED: + // Client disconnected or was interrupted. No logging needed. Done. + done = true; + break; + + default: + { + // All other transport exceptions are logged. + // State of connection is unknown. Done. + string errStr = string("TConnectedClient died: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + done = true; + break; + } } } catch (const TException& tex) { - // Some protocols throw this after they send an error response to the client - // They should be trained to return true instead and if they want to log, - // then they should log. - string errStr = (serverType_ + " processing exception: ") + tex.what(); + string errStr = string("TConnectedClient processing exception: ") + tex.what(); GlobalOutput(errStr.c_str()); // Continue processing } @@ -99,19 +102,21 @@ void TConnectedClient::cleanup() try { inputProtocol_->getTransport()->close(); } catch (const TTransportException& ttx) { - string errStr = string(serverType_ + " input close failed: ") + ttx.what(); + string errStr = string("TConnectedClient input close failed: ") + ttx.what(); GlobalOutput(errStr.c_str()); } + try { outputProtocol_->getTransport()->close(); } catch (const TTransportException& ttx) { - string errStr = string(serverType_ + " output close failed: ") + ttx.what(); + string errStr = string("TConnectedClient output close failed: ") + ttx.what(); GlobalOutput(errStr.c_str()); } + try { client_->close(); } catch (const TTransportException& ttx) { - string errStr = string(serverType_ + " client close failed: ") + ttx.what(); + string errStr = string("TConnectedClient client close failed: ") + ttx.what(); GlobalOutput(errStr.c_str()); } } http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TConnectedClient.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TConnectedClient.h b/lib/cpp/src/thrift/server/TConnectedClient.h index 6304398..8931335 100644 --- a/lib/cpp/src/thrift/server/TConnectedClient.h +++ b/lib/cpp/src/thrift/server/TConnectedClient.h @@ -43,8 +43,6 @@ class TConnectedClient : public apache::thrift::concurrency::Runnable /** * Constructor. * - * @param[in] serverType the server type as a string, used - * for logging output. * @param[in] processor the TProcessor * @param[in] inputProtocol the input TProtocol * @param[in] outputProtocol the output TProtocol @@ -52,7 +50,6 @@ class TConnectedClient : public apache::thrift::concurrency::Runnable * @param[in] client the TTransport representing the client */ TConnectedClient( - const std::string& serverType, const boost::shared_ptr<apache::thrift::TProcessor>& processor, const boost::shared_ptr<apache::thrift::protocol::TProtocol>& inputProtocol, const boost::shared_ptr<apache::thrift::protocol::TProtocol>& outputProtocol, @@ -96,7 +93,6 @@ class TConnectedClient : public apache::thrift::concurrency::Runnable virtual void cleanup(); private: - std::string serverType_; boost::shared_ptr<apache::thrift::TProcessor> processor_; boost::shared_ptr<apache::thrift::protocol::TProtocol> inputProtocol_; boost::shared_ptr<apache::thrift::protocol::TProtocol> outputProtocol_; http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TServerFramework.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp new file mode 100644 index 0000000..8adb29a --- /dev/null +++ b/lib/cpp/src/thrift/server/TServerFramework.cpp @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <boost/bind.hpp> +#include <thrift/server/TServerFramework.h> + +namespace apache { +namespace thrift { +namespace server { + +using apache::thrift::transport::TServerTransport; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; +using apache::thrift::transport::TTransportFactory; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TProtocolFactory; +using boost::bind; +using boost::shared_ptr; +using std::string; + +TServerFramework::TServerFramework( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory) + : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {} + +TServerFramework::TServerFramework( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory) + : TServer(processor, serverTransport, transportFactory, protocolFactory) {} + +TServerFramework::TServerFramework( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory) + : TServer(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) {} + +TServerFramework::TServerFramework( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory) + : TServer(processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) {} + +TServerFramework::~TServerFramework() {} + +template<typename T> +static void releaseOneDescriptor(const string& name, T& pTransport) { + if (pTransport) { + try { + pTransport->close(); + } catch (const TTransportException& ttx) { + string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + } + } +} + +void TServerFramework::serve() { + shared_ptr<TTransport> client; + shared_ptr<TTransport> inputTransport; + shared_ptr<TTransport> outputTransport; + shared_ptr<TProtocol> inputProtocol; + shared_ptr<TProtocol> outputProtocol; + + // Start the server listening + serverTransport_->listen(); + + // Run the preServe event to indicate server is now listening + // and that it is safe to connect. + if (eventHandler_) { + eventHandler_->preServe(); + } + + // Fetch client from server + for (;;) { + try { + // Dereference any resources from any previous client creation + // such that a blocking accept does not hold them indefinitely. + outputProtocol.reset(); + inputProtocol.reset(); + outputTransport.reset(); + inputTransport.reset(); + client.reset(); + + client = serverTransport_->accept(); + + inputTransport = inputTransportFactory_->getTransport(client); + outputTransport = outputTransportFactory_->getTransport(client); + inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + + onClientConnected( + shared_ptr<TConnectedClient>( + new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client), + inputProtocol, outputProtocol, eventHandler_, client), + bind(&TServerFramework::disposeConnectedClient, this, _1))); + } catch (TTransportException& ttx) { + releaseOneDescriptor("inputTransport", inputTransport); + releaseOneDescriptor("outputTransport", outputTransport); + releaseOneDescriptor("client", client); + if (ttx.getType() == TTransportException::TIMED_OUT) { + // Accept timeout - continue processing. + continue; + } else if (ttx.getType() == TTransportException::END_OF_FILE || + ttx.getType() == TTransportException::INTERRUPTED) { + // Server was interrupted. This only happens when stopping. + break; + } else { + // All other transport exceptions are logged. + // State of connection is unknown. Done. + string errStr = string("TServerTransport died: ") + ttx.what(); + GlobalOutput(errStr.c_str()); + break; + } + } + } + + releaseOneDescriptor("serverTransport", serverTransport_); +} + +void TServerFramework::stop() { + serverTransport_->interrupt(); + serverTransport_->interruptChildren(); +} + +void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) { + onClientDisconnected(pClient); + delete pClient; +} + +} +} +} // apache::thrift::server + http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TServerFramework.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h new file mode 100644 index 0000000..67d5420 --- /dev/null +++ b/lib/cpp/src/thrift/server/TServerFramework.h @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_SERVER_TSERVERFRAMEWORK_H_ +#define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1 + +#include <boost/shared_ptr.hpp> +#include <thrift/TProcessor.h> +#include <thrift/server/TConnectedClient.h> +#include <thrift/server/TServer.h> +#include <thrift/transport/TServerTransport.h> +#include <thrift/transport/TTransport.h> + +namespace apache { +namespace thrift { +namespace server { + +/** + * TServerFramework provides a single consolidated processing loop for + * servers. By having a single processing loop, behavior between servers + * is more predictable and maintenance cost is lowered. Implementations + * of TServerFramework must provide a method to deal with a client that + * connects and one that disconnects. + * + * While this functionality could be rolled directly into TServer, and + * probably should be, it would break the TServer interface contract so + * to maintain backwards compatibility for third party servers, no TServers + * were harmed in the making of this class. + */ +class TServerFramework : public TServer { +public: + TServerFramework( + const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory); + + TServerFramework( + const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory); + + TServerFramework( + const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory); + + TServerFramework( + const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory); + + virtual ~TServerFramework(); + + /** + * Accept clients from the TServerTransport and add them for processing. + * Call stop() on another thread to interrupt processing + * and return control to the caller. + * Post-conditions (return guarantees): + * The serverTransport will be closed. + */ + virtual void serve(); + + /** + * Interrupt serve() so that it meets post-conditions and returns. + */ + virtual void stop(); + +protected: + /** + * A client has connected. The implementation is responsible for storing + * and processing the client. This is called during the serve() thread, + * therefore a failure to return quickly will result in new client connection + * delays. + * + * \param[in] pClient the newly connected client + */ + virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) = 0; + + /** + * A client has disconnected. + * The client TTransport has already been closed. + * The implementation must not delete the pointer. + * + * \param[in] pClient the disconnected client + */ + virtual void onClientDisconnected(TConnectedClient *pClient) = 0; + +private: + /** + * Smart pointer client deletion. + * Calls onClientDisconnected and then deletes pClient. + */ + void disposeConnectedClient(TConnectedClient *pClient); +}; + +} +} +} // apache::thrift::server + +#endif // #ifndef _THRIFT_SERVER_TSERVERFRAMEWORK_H_ http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TSimpleServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp index b63c45e..a133c0d 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.cpp +++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp @@ -17,116 +17,74 @@ * under the License. */ -#include <thrift/server/TConnectedClient.h> #include <thrift/server/TSimpleServer.h> -#include <thrift/transport/TTransportException.h> -#include <string> -#include <iostream> namespace apache { namespace thrift { namespace server { -using namespace std; -using namespace apache::thrift; -using namespace apache::thrift::protocol; -using namespace apache::thrift::transport; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::transport::TServerTransport; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; +using apache::thrift::transport::TTransportFactory; using boost::shared_ptr; +using std::string; -/** - * A simple single-threaded application server. Perfect for unit tests! - * - */ -void TSimpleServer::serve() { +TSimpleServer::TSimpleServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory) + : TServerFramework(processorFactory, serverTransport, + transportFactory, protocolFactory) {} - shared_ptr<TTransport> client; - shared_ptr<TTransport> inputTransport; - shared_ptr<TTransport> outputTransport; - shared_ptr<TProtocol> inputProtocol; - shared_ptr<TProtocol> outputProtocol; +TSimpleServer::TSimpleServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory) + : TServerFramework(processor, serverTransport, + transportFactory, protocolFactory) {} - // Start the server listening - serverTransport_->listen(); +TSimpleServer::TSimpleServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory) + : TServerFramework(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) {} - // Run the preServe event - if (eventHandler_) { - eventHandler_->preServe(); - } +TSimpleServer::TSimpleServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory) + : TServerFramework(processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory) {} - // Fetch client from server - while (!stop_) { - try { - client = serverTransport_->accept(); - inputTransport = inputTransportFactory_->getTransport(client); - outputTransport = outputTransportFactory_->getTransport(client); - inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - } catch (TTransportException& ttx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - if (ttx.getType() != TTransportException::INTERRUPTED) { - string errStr = string("TServerTransport died on accept: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - if (stop_) break; else continue; - } catch (TException& tx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = string("Some kind of accept exception: ") + tx.what(); - GlobalOutput(errStr.c_str()); - continue; - } catch (const string& s) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = string("Some kind of accept exception: ") + s; - GlobalOutput(errStr.c_str()); - break; - } +TSimpleServer::~TSimpleServer() {} - TConnectedClient("TSimpleServer", - getProcessor(inputProtocol, outputProtocol, client), - inputProtocol, outputProtocol, eventHandler_, client).run(); - } - - if (stop_) { - try { - serverTransport_->close(); - } catch (TTransportException& ttx) { - string errStr = string("TServerTransport failed on close: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - stop_ = false; - } +/** + * The main body of customized implementation for TSimpleServer is quite simple: + * When a client connects, use the serve() thread to drive it to completion thus + * blocking new connections. + */ +void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) { + pClient->run(); } -void TSimpleServer::stop() { - if (!stop_) { - stop_ = true; - serverTransport_->interrupt(); - serverTransport_->interruptChildren(); - } -} +/** + * TSimpleServer does not track clients so there is nothing to do here. + */ +void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {} } } http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TSimpleServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h index 7b8677d..51b00e4 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.h +++ b/lib/cpp/src/thrift/server/TSimpleServer.h @@ -20,8 +20,7 @@ #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_ #define _THRIFT_SERVER_TSIMPLESERVER_H_ 1 -#include <thrift/server/TServer.h> -#include <thrift/transport/TServerTransport.h> +#include <thrift/server/TServerFramework.h> namespace apache { namespace thrift { @@ -30,77 +29,41 @@ namespace server { /** * This is the most basic simple server. It is single-threaded and runs a * continuous loop of accepting a single connection, processing requests on - * that connection until it closes, and then repeating. It is a good example - * of how to extend the TServer interface. + * that connection until it closes, and then repeating. */ -class TSimpleServer : public TServer { +class TSimpleServer : public TServerFramework { public: - template <typename ProcessorFactory> - TSimpleServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), stop_(false) {} + TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory); - template <typename Processor> - TSimpleServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory), stop_(false) {} + TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory); - template <typename ProcessorFactory> - TSimpleServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& inputTransportFactory, - const boost::shared_ptr<TTransportFactory>& outputTransportFactory, - const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory, - const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, - serverTransport, - inputTransportFactory, - outputTransportFactory, - inputProtocolFactory, - outputProtocolFactory), - stop_(false) {} + TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory); - template <typename Processor> - TSimpleServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& inputTransportFactory, - const boost::shared_ptr<TTransportFactory>& outputTransportFactory, - const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory, - const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, - serverTransport, - inputTransportFactory, - outputTransportFactory, - inputProtocolFactory, - outputProtocolFactory), - stop_(false) {} + TSimpleServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory); - /** - * Process one connection at a time using the caller's thread. - * Call stop() on another thread to interrupt processing and - * return control to the caller. - * Post-conditions (return guarantees): - * The serverTransport will be closed. - * There will be no connected client. - */ - void serve(); - - /** - * Interrupt serve() so that it meets post-conditions and returns. - */ - void stop(); + virtual ~TSimpleServer(); protected: - bool stop_; + virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */; + virtual void onClientDisconnected(TConnectedClient *pClient) /* override */; }; + } } } // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TThreadPoolServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp index f8ed6cf..a5f8c76 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp @@ -17,136 +17,83 @@ * under the License. */ -#include <thrift/thrift-config.h> - -#include <thrift/server/TConnectedClient.h> #include <thrift/server/TThreadPoolServer.h> -#include <thrift/transport/TTransportException.h> -#include <thrift/concurrency/Thread.h> -#include <thrift/concurrency/ThreadManager.h> -#include <string> -#include <iostream> -#include <boost/make_shared.hpp> namespace apache { namespace thrift { namespace server { +using apache::thrift::concurrency::ThreadManager; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::transport::TServerTransport; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; +using apache::thrift::transport::TTransportFactory; using boost::shared_ptr; -using namespace std; -using namespace apache::thrift; -using namespace apache::thrift::concurrency; -using namespace apache::thrift::protocol; -using namespace apache::thrift::transport; +using std::string; + +TThreadPoolServer::TThreadPoolServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory, + const shared_ptr<ThreadManager>& threadManager) + : TServerFramework(processorFactory, serverTransport, + transportFactory, protocolFactory), + threadManager_(threadManager), + timeout_(0), + taskExpiration_(0) {} + +TThreadPoolServer::TThreadPoolServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory, + const shared_ptr<ThreadManager>& threadManager) + : TServerFramework(processor, serverTransport, + transportFactory, protocolFactory), + threadManager_(threadManager), + timeout_(0), + taskExpiration_(0) {} + +TThreadPoolServer::TThreadPoolServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory, + const shared_ptr<ThreadManager>& threadManager) + : TServerFramework(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory), + threadManager_(threadManager), + stop_(false), + timeout_(0), + taskExpiration_(0) {} + +TThreadPoolServer::TThreadPoolServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory, + const shared_ptr<ThreadManager>& threadManager) + : TServerFramework(processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory), + threadManager_(threadManager), + stop_(false), + timeout_(0), + taskExpiration_(0) {} TThreadPoolServer::~TThreadPoolServer() {} void TThreadPoolServer::serve() { - shared_ptr<TTransport> client; - shared_ptr<TTransport> inputTransport; - shared_ptr<TTransport> outputTransport; - shared_ptr<TProtocol> inputProtocol; - shared_ptr<TProtocol> outputProtocol; - - // Start the server listening - serverTransport_->listen(); - - // Run the preServe event - if (eventHandler_) { - eventHandler_->preServe(); - } - - while (!stop_) { - try { - client.reset(); - inputTransport.reset(); - outputTransport.reset(); - inputProtocol.reset(); - outputProtocol.reset(); - - // Fetch client from server - client = serverTransport_->accept(); - - // Make IO transports - inputTransport = inputTransportFactory_->getTransport(client); - outputTransport = outputTransportFactory_->getTransport(client); - inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - - shared_ptr<TProcessor> processor = getProcessor(inputProtocol, outputProtocol, client); - - // Add to threadmanager pool - threadManager_->add( - boost::make_shared<TConnectedClient>( - "TThreadPoolServer", - getProcessor(inputProtocol, outputProtocol, client), - inputProtocol, outputProtocol, eventHandler_, client), - timeout_, - taskExpiration_); - - } catch (TTransportException& ttx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - if (ttx.getType() != TTransportException::INTERRUPTED) { - string errStr = string("TThreadPoolServer: TServerTransport died on accept: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - if (stop_) break; else continue; - } catch (TException& tx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = string("TThreadPoolServer: Caught TException: ") + tx.what(); - GlobalOutput(errStr.c_str()); - continue; - } catch (const string& s) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = "TThreadPoolServer: Unknown exception: " + s; - GlobalOutput(errStr.c_str()); - break; - } - } - - // If stopped manually, join the existing threads - if (stop_) { - try { - serverTransport_->close(); - threadManager_->join(); - } catch (TException& tx) { - string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what(); - GlobalOutput(errStr.c_str()); - } - stop_ = false; - } -} - -void TThreadPoolServer::stop() { - if (!stop_) { - stop_ = true; - serverTransport_->interrupt(); - serverTransport_->interruptChildren(); - } + TServerFramework::serve(); + threadManager_->join(); } int64_t TThreadPoolServer::getTimeout() const { @@ -164,6 +111,13 @@ int64_t TThreadPoolServer::getTaskExpiration() const { void TThreadPoolServer::setTaskExpiration(int64_t value) { taskExpiration_ = value; } + +void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) { + threadManager_->add(pClient, timeout_, taskExpiration_); +} + +void TThreadPoolServer::onClientDisconnected(TConnectedClient *pClient) {} + } } } // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TThreadPoolServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h index 2f93463..29e9aaf 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.h +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h @@ -21,115 +21,68 @@ #define _THRIFT_SERVER_TTHREADPOOLSERVER_H_ 1 #include <thrift/concurrency/ThreadManager.h> -#include <thrift/server/TServer.h> -#include <thrift/transport/TServerTransport.h> - -#include <boost/shared_ptr.hpp> +#include <thrift/server/TServerFramework.h> namespace apache { namespace thrift { namespace server { -using apache::thrift::concurrency::ThreadManager; -using apache::thrift::protocol::TProtocolFactory; -using apache::thrift::transport::TServerTransport; -using apache::thrift::transport::TTransportFactory; - -class TThreadPoolServer : public TServer { +/** + * Manage clients using a thread pool. + */ +class TThreadPoolServer : public TServerFramework { public: - template <typename ProcessorFactory> - TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadManager>& threadManager, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), - threadManager_(threadManager), - stop_(false), - timeout_(0), - taskExpiration_(0) {} - - template <typename Processor> - TThreadPoolServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadManager>& threadManager, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory), - threadManager_(threadManager), - stop_(false), - timeout_(0), - taskExpiration_(0) {} - - template <typename ProcessorFactory> - TThreadPoolServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& inputTransportFactory, - const boost::shared_ptr<TTransportFactory>& outputTransportFactory, - const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory, - const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory, - const boost::shared_ptr<ThreadManager>& threadManager, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, - serverTransport, - inputTransportFactory, - outputTransportFactory, - inputProtocolFactory, - outputProtocolFactory), - threadManager_(threadManager), - stop_(false), - timeout_(0), - taskExpiration_(0) {} - - template <typename Processor> - TThreadPoolServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& inputTransportFactory, - const boost::shared_ptr<TTransportFactory>& outputTransportFactory, - const boost::shared_ptr<TProtocolFactory>& inputProtocolFactory, - const boost::shared_ptr<TProtocolFactory>& outputProtocolFactory, - const boost::shared_ptr<ThreadManager>& threadManager, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, - serverTransport, - inputTransportFactory, - outputTransportFactory, - inputProtocolFactory, - outputProtocolFactory), - threadManager_(threadManager), - stop_(false), - timeout_(0), - taskExpiration_(0) {} + TThreadPoolServer( + const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + + TThreadPoolServer( + const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + + TThreadPoolServer( + const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + + TThreadPoolServer( + const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); virtual ~TThreadPoolServer(); /** - * Process all connections that arrive using a thread pool. - * Call stop() on another thread to interrupt processing and - * return control to the caller. * Post-conditions (return guarantees): - * The serverTransport will be closed. - * There will be no connected clients. + * There will be no clients connected. */ virtual void serve(); - /** - * Interrupt serve() so that it meets post-conditions and returns. - */ - virtual void stop(); - virtual int64_t getTimeout() const; - virtual void setTimeout(int64_t value); virtual int64_t getTaskExpiration() const; - virtual void setTaskExpiration(int64_t value); protected: - boost::shared_ptr<ThreadManager> threadManager_; + virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */; + virtual void onClientDisconnected(TConnectedClient *pClient) /* override */; + + boost::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager_; volatile bool stop_; @@ -137,6 +90,7 @@ protected: volatile int64_t taskExpiration_; }; + } } } // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TThreadedServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index 4dcdb44..440cede 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -17,166 +17,109 @@ * under the License. */ -#include <boost/bind.hpp> -#include <thrift/server/TConnectedClient.h> -#include <thrift/server/TThreadedServer.h> -#include <thrift/transport/TTransportException.h> #include <thrift/concurrency/PlatformThreadFactory.h> - -#include <string> -#include <iostream> - -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif +#include <thrift/server/TThreadedServer.h> namespace apache { namespace thrift { namespace server { +using apache::thrift::concurrency::Synchronized; +using apache::thrift::concurrency::Thread; +using apache::thrift::concurrency::ThreadFactory; +using apache::thrift::protocol::TProtocol; +using apache::thrift::protocol::TProtocolFactory; +using apache::thrift::transport::TServerTransport; +using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; +using apache::thrift::transport::TTransportFactory; using boost::shared_ptr; -using namespace std; -using namespace apache::thrift; -using namespace apache::thrift::protocol; -using namespace apache::thrift::transport; -using namespace apache::thrift::concurrency; +using std::string; + +TThreadedServer::TThreadedServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory, + const shared_ptr<ThreadFactory>& threadFactory) + : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory), + threadFactory_(threadFactory) {} + + +TThreadedServer::TThreadedServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& transportFactory, + const shared_ptr<TProtocolFactory>& protocolFactory, + const shared_ptr<ThreadFactory>& threadFactory) + : TServerFramework(processor, serverTransport, transportFactory, protocolFactory), + threadFactory_(threadFactory) {} + +TThreadedServer::TThreadedServer( + const shared_ptr<TProcessorFactory>& processorFactory, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory, + const shared_ptr<ThreadFactory>& threadFactory) + : TServerFramework(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory), + threadFactory_(threadFactory) {} + +TThreadedServer::TThreadedServer( + const shared_ptr<TProcessor>& processor, + const shared_ptr<TServerTransport>& serverTransport, + const shared_ptr<TTransportFactory>& inputTransportFactory, + const shared_ptr<TTransportFactory>& outputTransportFactory, + const shared_ptr<TProtocolFactory>& inputProtocolFactory, + const shared_ptr<TProtocolFactory>& outputProtocolFactory, + const shared_ptr<ThreadFactory>& threadFactory) + : TServerFramework(processor, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory), + threadFactory_(threadFactory) {} TThreadedServer::~TThreadedServer() {} void TThreadedServer::serve() { + TServerFramework::serve(); - shared_ptr<TTransport> client; - shared_ptr<TTransport> inputTransport; - shared_ptr<TTransport> outputTransport; - shared_ptr<TProtocol> inputProtocol; - shared_ptr<TProtocol> outputProtocol; - - // Start the server listening - serverTransport_->listen(); - - // Run the preServe event - if (eventHandler_) { - eventHandler_->preServe(); - } - - while (!stop_) { - try { - client.reset(); - inputTransport.reset(); - outputTransport.reset(); - inputProtocol.reset(); - outputProtocol.reset(); - - // Fetch client from server - client = serverTransport_->accept(); - - // Make IO transports - inputTransport = inputTransportFactory_->getTransport(client); - outputTransport = outputTransportFactory_->getTransport(client); - inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - - shared_ptr<TConnectedClient> pClient( - new TConnectedClient("TThreadedServer", - getProcessor(inputProtocol, outputProtocol, client), - inputProtocol, outputProtocol, eventHandler_, client), - boost::bind(&TThreadedServer::disposeClient, this, _1)); - - // Create a thread for this client - shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient)); - - // Insert thread into the set of threads - { - Synchronized s(clientsMonitor_); - clients_.insert(pClient.get()); - } - - // Start the thread! - thread->start(); - - } catch (TTransportException& ttx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - if (ttx.getType() != TTransportException::INTERRUPTED) { - string errStr = string("TThreadedServer: TServerTransport died on accept: ") + ttx.what(); - GlobalOutput(errStr.c_str()); - } - if (stop_) break; else continue; - } catch (TException& tx) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = string("TThreadedServer: Caught TException: ") + tx.what(); - GlobalOutput(errStr.c_str()); - continue; - } catch (const string& s) { - if (inputTransport) { - inputTransport->close(); - } - if (outputTransport) { - outputTransport->close(); - } - if (client) { - client->close(); - } - string errStr = "TThreadedServer: Unknown exception: " + s; - GlobalOutput(errStr.c_str()); - break; - } - } - - // If stopped manually, make sure to close server transport - if (stop_) { - try { - serverTransport_->close(); - } catch (TException& tx) { - string errStr = string("TThreadedServer: Exception shutting down: ") + tx.what(); - GlobalOutput(errStr.c_str()); - } - try { - Synchronized s(clientsMonitor_); - while (!clients_.empty()) { - clientsMonitor_.wait(); - } - } catch (TException& tx) { - string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what(); - GlobalOutput(errStr.c_str()); + // Drain all clients - no more will arrive + try { + Synchronized s(clientsMonitor_); + while (!clients_.empty()) { + clientsMonitor_.wait(); } - stop_ = false; + } catch (TException& tx) { + string errStr = string("TThreadedServer: Exception joining workers: ") + tx.what(); + GlobalOutput(errStr.c_str()); } } -void TThreadedServer::stop() { - if (!stop_) { - stop_ = true; - serverTransport_->interrupt(); - serverTransport_->interruptChildren(); +void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) +{ + // Create a thread for this client + shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient)); + + // Insert thread into the set of threads + { + Synchronized s(clientsMonitor_); + clients_.insert(pClient.get()); } + + // Start the thread! + thread->start(); } -void TThreadedServer::disposeClient(TConnectedClient *pClient) { +void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) { // Remove this task from parent bookkeeping - { - Synchronized s(clientsMonitor_); - clients_.erase(pClient); - if (clients_.empty()) { - clientsMonitor_.notify(); - } + Synchronized s(clientsMonitor_); + clients_.erase(pClient); + if (clients_.empty()) { + clientsMonitor_.notify(); } - delete pClient; } } http://git-wip-us.apache.org/repos/asf/thrift/blob/21b68524/lib/cpp/src/thrift/server/TThreadedServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h index 5d510d6..7b66f1d 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.h +++ b/lib/cpp/src/thrift/server/TThreadedServer.h @@ -20,101 +20,72 @@ #ifndef _THRIFT_SERVER_TTHREADEDSERVER_H_ #define _THRIFT_SERVER_TTHREADEDSERVER_H_ 1 -#include <set> -#include <thrift/server/TServer.h> -#include <thrift/transport/TServerTransport.h> #include <thrift/concurrency/Monitor.h> #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/concurrency/Thread.h> - -#include <boost/shared_ptr.hpp> +#include <thrift/server/TServerFramework.h> namespace apache { namespace thrift { namespace server { -using apache::thrift::TProcessor; -using apache::thrift::transport::TServerTransport; -using apache::thrift::transport::TTransportFactory; -using apache::thrift::concurrency::Monitor; -using apache::thrift::concurrency::PlatformThreadFactory; -using apache::thrift::concurrency::ThreadFactory; - -class TConnectedClient; +#define THRIFT_DEFAULT_THREAD_FACTORY -class TThreadedServer : public TServer { +/** + * Manage clients using a thread pool. + */ +class TThreadedServer : public TServerFramework { public: - template <typename ProcessorFactory> - TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), - threadFactory_(new PlatformThreadFactory), - stop_(false) {} - - template <typename ProcessorFactory> - TThreadedServer(const boost::shared_ptr<ProcessorFactory>& processorFactory, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory)) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), - threadFactory_(threadFactory), - stop_(false) {} - - template <typename Processor> - TThreadedServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory), - threadFactory_(new PlatformThreadFactory), - stop_(false) {} - - template <typename Processor> - TThreadedServer(const boost::shared_ptr<Processor>& processor, - const boost::shared_ptr<TServerTransport>& serverTransport, - const boost::shared_ptr<TTransportFactory>& transportFactory, - const boost::shared_ptr<TProtocolFactory>& protocolFactory, - const boost::shared_ptr<ThreadFactory>& threadFactory, - THRIFT_OVERLOAD_IF(Processor, TProcessor)) - : TServer(processor, serverTransport, transportFactory, protocolFactory), - threadFactory_(threadFactory), - stop_(false) {} + TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); + + TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); + + TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); + + TThreadedServer(const boost::shared_ptr<apache::thrift::TProcessor>& processor, + const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& inputTransportFactory, + const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, + const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, + const boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>& threadFactory = + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); virtual ~TThreadedServer(); /** - * Process all connections that arrive, each on their own - * dedicated thread. There is no limit to the number of - * threads or connections (see THRIFT-3084). - * Call stop() on another thread to interrupt processing and - * return control to the caller. * Post-conditions (return guarantees): - * The serverTransport will be closed. - * There will be no connected clients. + * There will be no clients connected. */ virtual void serve(); - /** - * Interrupt serve() so that it meets post-conditions and returns. - */ - virtual void stop(); - protected: - /** - * Smart pointer release method - */ - virtual void disposeClient(TConnectedClient *pClient); - - boost::shared_ptr<ThreadFactory> threadFactory_; - volatile bool stop_; + virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */; + virtual void onClientDisconnected(TConnectedClient *pClient) /* override */; - Monitor clientsMonitor_; + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_; + apache::thrift::concurrency::Monitor clientsMonitor_; std::set<TConnectedClient*> clients_; };
