Repository: thrift Updated Branches: refs/heads/master 4bf9399ca -> 79c9911b8
THRIFT-3084 add optional concurrent client limit enforcement to lib/cpp threaded servers Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/79c9911b Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/79c9911b Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/79c9911b Branch: refs/heads/master Commit: 79c9911b8780d1f9d7c2c17623d269f0671d1723 Parents: 4bf9399 Author: Jim King <[email protected]> Authored: Thu Apr 30 07:10:08 2015 -0400 Committer: Roger Meier <[email protected]> Committed: Thu Apr 30 19:48:15 2015 +0200 ---------------------------------------------------------------------- lib/c_glib/test/testthrifttestclient.cpp | 2 + lib/cpp/CMakeLists.txt | 6 +- lib/cpp/Makefile.am | 8 +- lib/cpp/src/thrift/protocol/TDenseProtocol.cpp | 1 - lib/cpp/src/thrift/server/TServerFramework.cpp | 78 ++++++++- lib/cpp/src/thrift/server/TServerFramework.h | 60 +++++++ lib/cpp/src/thrift/server/TSimpleServer.cpp | 23 ++- lib/cpp/src/thrift/server/TSimpleServer.h | 3 + lib/cpp/src/thrift/server/TThreadPoolServer.cpp | 4 + lib/cpp/src/thrift/server/TThreadPoolServer.h | 14 +- lib/cpp/src/thrift/server/TThreadedServer.cpp | 23 +-- lib/cpp/src/thrift/server/TThreadedServer.h | 3 - lib/cpp/test/Makefile.am | 2 +- lib/cpp/test/TServerIntegrationTest.cpp | 174 +++++++++++++++---- lib/cpp/test/ZlibTest.cpp | 1 - 15 files changed, 328 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/c_glib/test/testthrifttestclient.cpp ---------------------------------------------------------------------- diff --git a/lib/c_glib/test/testthrifttestclient.cpp b/lib/c_glib/test/testthrifttestclient.cpp index 4f7bc08..d387396 100755 --- a/lib/c_glib/test/testthrifttestclient.cpp +++ b/lib/c_glib/test/testthrifttestclient.cpp @@ -317,6 +317,8 @@ class TestHandler : public ThriftTestIf { // C CLIENT extern "C" { +#undef THRIFT_SOCKET /* from lib/cpp */ + #include "t_test_thrift_test.h" #include "t_test_thrift_test_types.h" #include <thrift/c_glib/transport/thrift_socket.h> http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index 8ea0546..b444c35 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -35,9 +35,11 @@ set( thriftcpp_SOURCES src/thrift/Thrift.cpp src/thrift/TApplicationException.cpp src/thrift/VirtualProfiling.cpp + src/thrift/async/TAsyncChannel.cpp src/thrift/concurrency/ThreadManager.cpp src/thrift/concurrency/TimerManager.cpp src/thrift/concurrency/Util.cpp + src/thrift/processor/PeekProcessor.cpp src/thrift/protocol/TDebugProtocol.cpp src/thrift/protocol/TDenseProtocol.cpp src/thrift/protocol/TJSONProtocol.cpp @@ -60,8 +62,6 @@ set( thriftcpp_SOURCES src/thrift/server/TSimpleServer.cpp src/thrift/server/TThreadPoolServer.cpp src/thrift/server/TThreadedServer.cpp - src/thrift/async/TAsyncChannel.cpp - src/thrift/processor/PeekProcessor.cpp ) # This files don't work on Windows CE as there is no pipe support @@ -185,6 +185,8 @@ if(MSVC) add_definitions("-DUNICODE -D_UNICODE") endif() +add_definitions("-D__STDC_LIMIT_MACROS") + # Install the headers install(DIRECTORY "src/thrift" DESTINATION "${INCLUDE_INSTALL_DIR}" FILES_MATCHING PATTERN "*.h" PATTERN "*.tcc") http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 28ff7c8..0de8dc7 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -57,7 +57,7 @@ pkgconfig_DATA += thrift-qt5.pc endif AM_CXXFLAGS = -Wall -Wextra -pedantic -AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src +AM_CPPFLAGS = $(BOOST_CPPFLAGS) $(OPENSSL_INCLUDES) -I$(srcdir)/src -D__STDC_LIMIT_MACROS AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS) # Define the source files for the module @@ -65,9 +65,11 @@ AM_LDFLAGS = $(BOOST_LDFLAGS) $(OPENSSL_LDFLAGS) libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/TApplicationException.cpp \ src/thrift/VirtualProfiling.cpp \ + src/thrift/async/TAsyncChannel.cpp \ src/thrift/concurrency/ThreadManager.cpp \ src/thrift/concurrency/TimerManager.cpp \ src/thrift/concurrency/Util.cpp \ + src/thrift/processor/PeekProcessor.cpp \ src/thrift/protocol/TDebugProtocol.cpp \ src/thrift/protocol/TDenseProtocol.cpp \ src/thrift/protocol/TJSONProtocol.cpp \ @@ -94,9 +96,7 @@ libthrift_la_SOURCES = src/thrift/Thrift.cpp \ src/thrift/server/TServerFramework.cpp \ src/thrift/server/TSimpleServer.cpp \ src/thrift/server/TThreadPoolServer.cpp \ - src/thrift/server/TThreadedServer.cpp \ - src/thrift/async/TAsyncChannel.cpp \ - src/thrift/processor/PeekProcessor.cpp + src/thrift/server/TThreadedServer.cpp if WITH_BOOSTTHREADS libthrift_la_SOURCES += src/thrift/concurrency/BoostThreadFactory.cpp \ http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp index 583b630..259c68e 100644 --- a/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp +++ b/lib/cpp/src/thrift/protocol/TDenseProtocol.cpp @@ -87,7 +87,6 @@ Optional fields are a little tricky also. We write a zero byte if they are absent and prefix them with an 0x01 byte if they are present */ -#define __STDC_LIMIT_MACROS #include <stdint.h> #include <thrift/protocol/TDenseProtocol.h> #include <thrift/TReflectionLocal.h> http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp index 8adb29a..36dab5b 100644 --- a/lib/cpp/src/thrift/server/TServerFramework.cpp +++ b/lib/cpp/src/thrift/server/TServerFramework.cpp @@ -18,12 +18,15 @@ */ #include <boost/bind.hpp> +#include <stdexcept> +#include <stdint.h> #include <thrift/server/TServerFramework.h> namespace apache { namespace thrift { namespace server { +using apache::thrift::concurrency::Synchronized; using apache::thrift::transport::TServerTransport; using apache::thrift::transport::TTransport; using apache::thrift::transport::TTransportException; @@ -39,14 +42,20 @@ TServerFramework::TServerFramework( const shared_ptr<TServerTransport>& serverTransport, const shared_ptr<TTransportFactory>& transportFactory, const shared_ptr<TProtocolFactory>& protocolFactory) - : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {} + : TServer(processorFactory, serverTransport, transportFactory, protocolFactory), + clients_(0), + hwm_(0), + limit_(INT64_MAX) {} TServerFramework::TServerFramework( const shared_ptr<TProcessor>& processor, const shared_ptr<TServerTransport>& serverTransport, const shared_ptr<TTransportFactory>& transportFactory, const shared_ptr<TProtocolFactory>& protocolFactory) - : TServer(processor, serverTransport, transportFactory, protocolFactory) {} + : TServer(processor, serverTransport, transportFactory, protocolFactory), + clients_(0), + hwm_(0), + limit_(INT64_MAX) {} TServerFramework::TServerFramework( const shared_ptr<TProcessorFactory>& processorFactory, @@ -57,7 +66,10 @@ TServerFramework::TServerFramework( const shared_ptr<TProtocolFactory>& outputProtocolFactory) : TServer(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) {} + inputProtocolFactory, outputProtocolFactory), + clients_(0), + hwm_(0), + limit_(INT64_MAX) {} TServerFramework::TServerFramework( const shared_ptr<TProcessor>& processor, @@ -68,7 +80,10 @@ TServerFramework::TServerFramework( const shared_ptr<TProtocolFactory>& outputProtocolFactory) : TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) {} + inputProtocolFactory, outputProtocolFactory), + clients_(0), + hwm_(0), + limit_(INT64_MAX) {} TServerFramework::~TServerFramework() {} @@ -111,6 +126,16 @@ void TServerFramework::serve() { inputTransport.reset(); client.reset(); + // If we have reached the limit on the number of concurrent + // clients allowed, wait for one or more clients to drain before + // accepting another. + { + Synchronized sync(mon_); + while (clients_ >= limit_) { + mon_.wait(); + } + } + client = serverTransport_->accept(); inputTransport = inputTransportFactory_->getTransport(client); @@ -118,11 +143,12 @@ void TServerFramework::serve() { inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); - onClientConnected( + newlyConnectedClient( shared_ptr<TConnectedClient>( new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client), inputProtocol, outputProtocol, eventHandler_, client), bind(&TServerFramework::disposeConnectedClient, this, _1))); + } catch (TTransportException& ttx) { releaseOneDescriptor("inputTransport", inputTransport); releaseOneDescriptor("outputTransport", outputTransport); @@ -147,12 +173,54 @@ void TServerFramework::serve() { releaseOneDescriptor("serverTransport", serverTransport_); } +int64_t TServerFramework::getConcurrentClientLimit() const { + Synchronized sync(mon_); + return limit_; +} + +int64_t TServerFramework::getConcurrentClientCount() const { + Synchronized sync(mon_); + return clients_; +} + +int64_t TServerFramework::getConcurrentClientCountHWM() const { + Synchronized sync(mon_); + return hwm_; +} + +void TServerFramework::setConcurrentClientLimit(int64_t newLimit) { + if (newLimit < 1) { + throw std::invalid_argument("newLimit must be greater than zero"); + } + Synchronized sync(mon_); + limit_ = newLimit; + if (limit_ - clients_ > 0) { + mon_.notify(); + } +} + void TServerFramework::stop() { serverTransport_->interrupt(); serverTransport_->interruptChildren(); } +void TServerFramework::newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient) { + onClientConnected(pClient); + + // Count a concurrent client added. + Synchronized sync(mon_); + ++clients_; + hwm_ = std::max(hwm_, clients_); +} + void TServerFramework::disposeConnectedClient(TConnectedClient *pClient) { + { + // Count a concurrent client removed. + Synchronized sync(mon_); + if (limit_ - --clients_ > 0) { + mon_.notify(); + } + } onClientDisconnected(pClient); delete pClient; } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TServerFramework.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TServerFramework.h b/lib/cpp/src/thrift/server/TServerFramework.h index 67d5420..3f16dd1 100644 --- a/lib/cpp/src/thrift/server/TServerFramework.h +++ b/lib/cpp/src/thrift/server/TServerFramework.h @@ -21,7 +21,9 @@ #define _THRIFT_SERVER_TSERVERFRAMEWORK_H_ 1 #include <boost/shared_ptr.hpp> +#include <stdint.h> #include <thrift/TProcessor.h> +#include <thrift/concurrency/Monitor.h> #include <thrift/server/TConnectedClient.h> #include <thrift/server/TServer.h> #include <thrift/transport/TServerTransport.h> @@ -89,6 +91,36 @@ public: */ virtual void stop(); + /** + * Get the concurrent client limit. + * \returns the concurrent client limit + */ + virtual int64_t getConcurrentClientLimit() const; + + /** + * Get the number of currently connected clients. + * \returns the number of currently connected clients + */ + virtual int64_t getConcurrentClientCount() const; + + /** + * Get the highest number of concurrent clients. + * \returns the highest number of concurrent clients + */ + virtual int64_t getConcurrentClientCountHWM() const; + + /** + * Set the concurrent client limit. This can be changed while + * the server is serving however it will not necessarily be + * enforced until the next client is accepted and added. If the + * limit is lowered below the number of connected clients, no + * action is taken to disconnect the clients. + * The default value used if this is not called is INT64_MAX. + * \param[in] newLimit the new limit of concurrent clients + * \throws std::invalid_argument if newLimit is less than 1 + */ + virtual void setConcurrentClientLimit(int64_t newLimit); + protected: /** * A client has connected. The implementation is responsible for storing @@ -102,6 +134,7 @@ protected: /** * A client has disconnected. + * The server no longer tracks the client. * The client TTransport has already been closed. * The implementation must not delete the pointer. * @@ -111,10 +144,37 @@ protected: private: /** + * Common handling for new connected clients. Implements concurrent + * client rate limiting after onClientConnected returns by blocking the + * serve() thread if the limit has been reached. + */ + void newlyConnectedClient(const boost::shared_ptr<TConnectedClient>& pClient); + + /** * Smart pointer client deletion. * Calls onClientDisconnected and then deletes pClient. */ void disposeConnectedClient(TConnectedClient *pClient); + + /** + * Monitor for limiting the number of concurrent clients. + */ + apache::thrift::concurrency::Monitor mon_; + + /** + * The number of concurrent clients. + */ + int64_t clients_; + + /** + * The high water mark of concurrent clients. + */ + int64_t hwm_; + + /** + * The limit on the number of concurrent clients. + */ + int64_t limit_; }; } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.cpp b/lib/cpp/src/thrift/server/TSimpleServer.cpp index a133c0d..adcedc8 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.cpp +++ b/lib/cpp/src/thrift/server/TSimpleServer.cpp @@ -38,7 +38,9 @@ TSimpleServer::TSimpleServer( const shared_ptr<TTransportFactory>& transportFactory, const shared_ptr<TProtocolFactory>& protocolFactory) : TServerFramework(processorFactory, serverTransport, - transportFactory, protocolFactory) {} + transportFactory, protocolFactory) { + TServerFramework::setConcurrentClientLimit(1); +} TSimpleServer::TSimpleServer( const shared_ptr<TProcessor>& processor, @@ -46,7 +48,9 @@ TSimpleServer::TSimpleServer( const shared_ptr<TTransportFactory>& transportFactory, const shared_ptr<TProtocolFactory>& protocolFactory) : TServerFramework(processor, serverTransport, - transportFactory, protocolFactory) {} + transportFactory, protocolFactory) { + TServerFramework::setConcurrentClientLimit(1); +} TSimpleServer::TSimpleServer( const shared_ptr<TProcessorFactory>& processorFactory, @@ -57,7 +61,9 @@ TSimpleServer::TSimpleServer( const shared_ptr<TProtocolFactory>& outputProtocolFactory) : TServerFramework(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) {} + inputProtocolFactory, outputProtocolFactory) { + TServerFramework::setConcurrentClientLimit(1); +} TSimpleServer::TSimpleServer( const shared_ptr<TProcessor>& processor, @@ -68,7 +74,9 @@ TSimpleServer::TSimpleServer( const shared_ptr<TProtocolFactory>& outputProtocolFactory) : TServerFramework(processor, serverTransport, inputTransportFactory, outputTransportFactory, - inputProtocolFactory, outputProtocolFactory) {} + inputProtocolFactory, outputProtocolFactory) { + TServerFramework::setConcurrentClientLimit(1); +} TSimpleServer::~TSimpleServer() {} @@ -86,6 +94,13 @@ void TSimpleServer::onClientConnected(const shared_ptr<TConnectedClient>& pClien */ void TSimpleServer::onClientDisconnected(TConnectedClient *pClient) {} +/** + * This makes little sense to the simple server because it is not capable + * of having more than one client at a time, so we hide it. + */ +void TSimpleServer::setConcurrentClientLimit(int64_t newLimit) {} + + } } } // apache::thrift::server http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TSimpleServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TSimpleServer.h b/lib/cpp/src/thrift/server/TSimpleServer.h index 51b00e4..30d5046 100644 --- a/lib/cpp/src/thrift/server/TSimpleServer.h +++ b/lib/cpp/src/thrift/server/TSimpleServer.h @@ -62,6 +62,9 @@ public: protected: virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */; virtual void onClientDisconnected(TConnectedClient *pClient) /* override */; + +private: + void setConcurrentClientLimit(int64_t newLimit); // hide }; } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp index a5f8c76..5b9b01d 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp @@ -112,6 +112,10 @@ void TThreadPoolServer::setTaskExpiration(int64_t value) { taskExpiration_ = value; } +boost::shared_ptr<apache::thrift::concurrency::ThreadManager> TThreadPoolServer::getThreadManager() const { + return threadManager_; +} + void TThreadPoolServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) { threadManager_->add(pClient, timeout_, taskExpiration_); } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadPoolServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h index 29e9aaf..267dbad 100644 --- a/lib/cpp/src/thrift/server/TThreadPoolServer.h +++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h @@ -37,14 +37,16 @@ public: const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, - const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager = + apache::thrift::concurrency::ThreadManager::newSimpleThreadManager()); TThreadPoolServer( const boost::shared_ptr<apache::thrift::TProcessor>& processor, const boost::shared_ptr<apache::thrift::transport::TServerTransport>& serverTransport, const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& transportFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& protocolFactory, - const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager = + apache::thrift::concurrency::ThreadManager::newSimpleThreadManager()); TThreadPoolServer( const boost::shared_ptr<apache::thrift::TProcessorFactory>& processorFactory, @@ -53,7 +55,8 @@ public: const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, - const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager = + apache::thrift::concurrency::ThreadManager::newSimpleThreadManager()); TThreadPoolServer( const boost::shared_ptr<apache::thrift::TProcessor>& processor, @@ -62,7 +65,8 @@ public: const boost::shared_ptr<apache::thrift::transport::TTransportFactory>& outputTransportFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& inputProtocolFactory, const boost::shared_ptr<apache::thrift::protocol::TProtocolFactory>& outputProtocolFactory, - const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager); + const boost::shared_ptr<apache::thrift::concurrency::ThreadManager>& threadManager = + apache::thrift::concurrency::ThreadManager::newSimpleThreadManager()); virtual ~TThreadPoolServer(); @@ -78,6 +82,8 @@ public: virtual int64_t getTaskExpiration() const; virtual void setTaskExpiration(int64_t value); + virtual boost::shared_ptr<apache::thrift::concurrency::ThreadManager> getThreadManager() const; + protected: virtual void onClientConnected(const boost::shared_ptr<TConnectedClient>& pClient) /* override */; virtual void onClientDisconnected(TConnectedClient *pClient) /* override */; http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.cpp b/lib/cpp/src/thrift/server/TThreadedServer.cpp index 440cede..b0b22c3 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.cpp +++ b/lib/cpp/src/thrift/server/TThreadedServer.cpp @@ -89,7 +89,7 @@ void TThreadedServer::serve() { // Drain all clients - no more will arrive try { Synchronized s(clientsMonitor_); - while (!clients_.empty()) { + while (getConcurrentClientCount() > 0) { clientsMonitor_.wait(); } } catch (TException& tx) { @@ -98,27 +98,14 @@ void TThreadedServer::serve() { } } -void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) -{ - // Create a thread for this client - shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(pClient)); - - // Insert thread into the set of threads - { - Synchronized s(clientsMonitor_); - clients_.insert(pClient.get()); - } - - // Start the thread! - thread->start(); +void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) { + threadFactory_->newThread(pClient)->start(); } void TThreadedServer::onClientDisconnected(TConnectedClient *pClient) { - // Remove this task from parent bookkeeping Synchronized s(clientsMonitor_); - clients_.erase(pClient); - if (clients_.empty()) { - clientsMonitor_.notify(); + if (getConcurrentClientCount() == 0) { + clientsMonitor_.notify(); } } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/src/thrift/server/TThreadedServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TThreadedServer.h b/lib/cpp/src/thrift/server/TThreadedServer.h index 7b66f1d..21b6a28 100644 --- a/lib/cpp/src/thrift/server/TThreadedServer.h +++ b/lib/cpp/src/thrift/server/TThreadedServer.h @@ -29,8 +29,6 @@ namespace apache { namespace thrift { namespace server { -#define THRIFT_DEFAULT_THREAD_FACTORY - /** * Manage clients using a thread pool. */ @@ -86,7 +84,6 @@ protected: boost::shared_ptr<apache::thrift::concurrency::ThreadFactory> threadFactory_; apache::thrift::concurrency::Monitor clientsMonitor_; - std::set<TConnectedClient*> clients_; }; } http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/cpp/test/Makefile.am b/lib/cpp/test/Makefile.am index 0cd1c67..3470abb 100755 --- a/lib/cpp/test/Makefile.am +++ b/lib/cpp/test/Makefile.am @@ -323,7 +323,7 @@ gen-cpp/SecondService.cpp gen-cpp/ThriftTest_constants.cpp gen-cpp/ThriftTest.cp gen-cpp/ChildService.cpp gen-cpp/ChildService.h gen-cpp/ParentService.cpp gen-cpp/ParentService.h gen-cpp/proc_types.cpp gen-cpp/proc_types.h: processor/proc.thrift $(THRIFT) --gen cpp:templates,cob_style $< -AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src +AM_CPPFLAGS = $(BOOST_CPPFLAGS) -I$(top_srcdir)/lib/cpp/src -D__STDC_LIMIT_MACROS AM_LDFLAGS = $(BOOST_LDFLAGS) AM_CXXFLAGS = -Wall -Wextra -pedantic http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/TServerIntegrationTest.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/test/TServerIntegrationTest.cpp b/lib/cpp/test/TServerIntegrationTest.cpp index 9edeb19..73bcdba 100644 --- a/lib/cpp/test/TServerIntegrationTest.cpp +++ b/lib/cpp/test/TServerIntegrationTest.cpp @@ -20,9 +20,12 @@ #define BOOST_TEST_MODULE TServerIntegrationTest #include <boost/test/auto_unit_test.hpp> #include <boost/bind.hpp> +#include <boost/foreach.hpp> #include <boost/format.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> +#include <thrift/server/TSimpleServer.h> +#include <thrift/server/TThreadPoolServer.h> #include <thrift/server/TThreadedServer.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/transport/TServerSocket.h> @@ -44,12 +47,17 @@ using apache::thrift::transport::TServerSocket; using apache::thrift::transport::TServerTransport; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransport; +using apache::thrift::transport::TTransportException; using apache::thrift::transport::TTransportFactory; +using apache::thrift::server::TServer; using apache::thrift::server::TServerEventHandler; +using apache::thrift::server::TSimpleServer; +using apache::thrift::server::TThreadPoolServer; using apache::thrift::server::TThreadedServer; using apache::thrift::test::ParentServiceClient; using apache::thrift::test::ParentServiceIf; using apache::thrift::test::ParentServiceProcessor; +using boost::posix_time::milliseconds; /** * preServe runs after listen() is successful, when we can connect @@ -81,7 +89,10 @@ private: uint64_t accepted_; }; -class ParentHandler : virtual public ParentServiceIf { +/** + * Reusing another generated test, just something to serve up + */ +class ParentHandler : public ParentServiceIf { public: ParentHandler() : generation_(0) {} @@ -123,11 +134,17 @@ protected: std::vector<std::string> strings_; }; +void autoSocketCloser(TSocket *pSock) { + pSock->close(); + delete pSock; +} + +template<class TServerType> class TServerIntegrationTestFixture : public TestPortFixture { public: TServerIntegrationTestFixture() : - pServer(new TThreadedServer( + pServer(new TServerType( boost::shared_ptr<ParentServiceProcessor>(new ParentServiceProcessor( boost::shared_ptr<ParentServiceIf>(new ParentHandler))), boost::shared_ptr<TServerTransport>(new TServerSocket("localhost", m_serverPort)), @@ -139,7 +156,7 @@ public: } void startServer() { - pServerThread.reset(new boost::thread(boost::bind(&TThreadedServer::serve, pServer.get()))); + pServerThread.reset(new boost::thread(boost::bind(&TServerType::serve, pServer.get()))); // block until listen() completes so clients will be able to connect Synchronized sync(*(pEventHandler.get())); @@ -160,52 +177,117 @@ public: } void stopServer() { - pServer->stop(); - BOOST_MESSAGE("server stop completed"); - pServerThread->join(); - BOOST_MESSAGE("server thread joined"); + if (pServerThread) { + pServer->stop(); + BOOST_MESSAGE("server stop completed"); + + pServerThread->join(); + BOOST_MESSAGE("server thread joined"); + pServerThread.reset(); + } } ~TServerIntegrationTestFixture() { stopServer(); } - void delayClose(boost::shared_ptr<TTransport> toClose) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + void delayClose(boost::shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) { + boost::this_thread::sleep(after); toClose->close(); } - boost::shared_ptr<TThreadedServer> pServer; + void baseline(int64_t numToMake, int64_t expectedHWM) { + startServer(); + std::vector<boost::shared_ptr<TSocket> > holdSockets; + std::vector<boost::shared_ptr<boost::thread> > holdThreads; + + for (int64_t i = 0; i < numToMake; ++i) { + boost::shared_ptr<TSocket> pClientSock(new TSocket("localhost", m_serverPort), autoSocketCloser); + holdSockets.push_back(pClientSock); + boost::shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock)); + ParentServiceClient client(pClientProtocol); + pClientSock->open(); + client.incrementGeneration(); + holdThreads.push_back( + boost::shared_ptr<boost::thread>( + new boost::thread( + boost::bind(&TServerIntegrationTestFixture::delayClose, this, + pClientSock, milliseconds(100 * numToMake))))); + } + + BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM()); + stopServer(); + BOOST_FOREACH(boost::shared_ptr<boost::thread> pThread, holdThreads) { + pThread->join(); + } + holdThreads.clear(); + holdSockets.clear(); + } + + boost::shared_ptr<TServerType> pServer; boost::shared_ptr<TServerReadyEventHandler> pEventHandler; boost::shared_ptr<boost::thread> pServerThread; }; -BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture ) +BOOST_FIXTURE_TEST_SUITE( Baseline, TestPortFixture ) -BOOST_AUTO_TEST_CASE(test_execute_one_request_and_close) +BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationTestFixture<TSimpleServer>) { - // this test establishes some basic sanity + baseline(3, 1); +} - startServer(); - boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort)); - boost::shared_ptr<TProtocol> pClientProtocol1(new TBinaryProtocol(pClientSock1)); - ParentServiceClient client1(pClientProtocol1); - pClientSock1->open(); - client1.incrementGeneration(); - pClientSock1->close(); - stopServer(); +BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationTestFixture<TThreadedServer>) +{ + baseline(10, 10); +} + +BOOST_FIXTURE_TEST_CASE(test_threaded_bound, TServerIntegrationTestFixture<TThreadedServer>) +{ + pServer->setConcurrentClientLimit(4); + baseline(10, 4); +} + +BOOST_FIXTURE_TEST_CASE(test_threadpool, TServerIntegrationTestFixture<TThreadPoolServer>) +{ + pServer->getThreadManager()->threadFactory( + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); + pServer->getThreadManager()->start(); + + // thread factory has 4 threads as a default + // thread factory however is a bad way to limit concurrent clients + // as accept() will be called to grab a 5th client socket, in this case + // and then the thread factory will block adding the thread to manage + // that client. + baseline(10, 5); } +BOOST_FIXTURE_TEST_CASE(test_threadpool_bound, TServerIntegrationTestFixture<TThreadPoolServer>) +{ + pServer->getThreadManager()->threadFactory( + boost::shared_ptr<apache::thrift::concurrency::ThreadFactory>( + new apache::thrift::concurrency::PlatformThreadFactory)); + pServer->getThreadManager()->start(); + pServer->setConcurrentClientLimit(4); + + baseline(10, 4); +} + +BOOST_AUTO_TEST_SUITE_END() + + +BOOST_FIXTURE_TEST_SUITE ( TServerIntegrationTest, TServerIntegrationTestFixture<TThreadedServer> ) + BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) { // This tests THRIFT-2441 new behavior: stopping the server disconnects clients startServer(); - boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort)); + boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser); pClientSock1->open(); - boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort)); + boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser); pClientSock2->open(); // Ensure they have been accepted @@ -219,8 +301,6 @@ BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) uint8_t buf[1]; BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected - pClientSock1->close(); - pClientSock2->close(); } BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) @@ -230,24 +310,56 @@ BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) boost::dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())-> setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior + startServer(); - boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort)); + boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser); pClientSock1->open(); - boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort)); + boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser); pClientSock2->open(); // Ensure they have been accepted blockUntilAccepted(2); - boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1)); - boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2)); + boost::thread t1(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock1, milliseconds(250))); + boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250))); // Once the clients disconnect the server will stop stopServer(); + t1.join(); + t2.join(); +} + +BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) +{ + startServer(); + + BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit()); + pServer->setConcurrentClientLimit(2); + BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount()); + BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit()); + + boost::shared_ptr<TSocket> pClientSock1(new TSocket("localhost", m_serverPort), autoSocketCloser); + pClientSock1->open(); + blockUntilAccepted(1); + BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount()); + + boost::shared_ptr<TSocket> pClientSock2(new TSocket("localhost", m_serverPort), autoSocketCloser); + pClientSock2->open(); + blockUntilAccepted(2); + BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); + + // a third client cannot connect until one of the other two closes + boost::thread t2(boost::bind(&TServerIntegrationTestFixture::delayClose, this, pClientSock2, milliseconds(250))); + boost::shared_ptr<TSocket> pClientSock3(new TSocket("localhost", m_serverPort), autoSocketCloser); + pClientSock2->open(); + blockUntilAccepted(2); + BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount()); + BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM()); - pClientSock1->close(); - pClientSock2->close(); + stopServer(); + t2.join(); } + BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/thrift/blob/79c9911b/lib/cpp/test/ZlibTest.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp index bafacf9..465e12d 100644 --- a/lib/cpp/test/ZlibTest.cpp +++ b/lib/cpp/test/ZlibTest.cpp @@ -17,7 +17,6 @@ * under the License. */ -#define __STDC_LIMIT_MACROS #define __STDC_FORMAT_MACROS #ifndef _GNU_SOURCE
