Author: roger Date: Tue Jun 7 17:59:07 2011 New Revision: 1133116 URL: http://svn.apache.org/viewvc?rev=1133116&view=rev Log: THRIFT-1198 C++ TestClient and Server Improvements (add Unix Domain Socket, HTTP, JSON)
Modified: thrift/trunk/test/cpp/Thrift-test.mk thrift/trunk/test/cpp/src/TestClient.cpp thrift/trunk/test/cpp/src/TestServer.cpp Modified: thrift/trunk/test/cpp/Thrift-test.mk URL: http://svn.apache.org/viewvc/thrift/trunk/test/cpp/Thrift-test.mk?rev=1133116&r1=1133115&r2=1133116&view=diff ============================================================================== --- thrift/trunk/test/cpp/Thrift-test.mk (original) +++ thrift/trunk/test/cpp/Thrift-test.mk Tue Jun 7 17:59:07 2011 @@ -46,8 +46,8 @@ CC = g++ LD = g++ # Compiler flags -DCFL = -Wall -O3 -g -I. -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -LFL = -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent +DCFL = -Wall -O3 -g -I. -I./gen-cpp $(include_flags) -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -lboost_program_options +LFL = -L$(thrift_home)/lib/cpp/.libs -lthrift -lthriftnb -levent -lboost_program_options CCFL = -Wall -O3 -I. -I./gen-cpp $(include_flags) CFL = $(CCFL) $(LFL) Modified: thrift/trunk/test/cpp/src/TestClient.cpp URL: http://svn.apache.org/viewvc/thrift/trunk/test/cpp/src/TestClient.cpp?rev=1133116&r1=1133115&r2=1133116&view=diff ============================================================================== --- thrift/trunk/test/cpp/src/TestClient.cpp (original) +++ thrift/trunk/test/cpp/src/TestClient.cpp Tue Jun 7 17:59:07 2011 @@ -17,15 +17,19 @@ * under the License. */ -#include <stdio.h> +#include <iostream> #include <unistd.h> #include <sys/time.h> #include <protocol/TBinaryProtocol.h> +#include <protocol/TJSONProtocol.h> +#include <transport/THttpClient.h> #include <transport/TTransportUtils.h> #include <transport/TSocket.h> #include <transport/TSSLSocket.h> #include <boost/shared_ptr.hpp> +#include <boost/program_options.hpp> + #include "ThriftTest.h" #define __STDC_FORMAT_MACROS @@ -56,30 +60,66 @@ int main(int argc, char** argv) { string host = "localhost"; int port = 9090; int numTests = 1; - bool framed = false; bool ssl = false; + string transport_type = "buffered"; + string protocol_type = "binary"; + string domain_socket = ""; + + program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("host", program_options::value<string>(&host)->default_value(host), "Host to connect") + ("port", program_options::value<int>(&port)->default_value(port), "Port number to connect") + ("domain-socket", program_options::value<string>(&domain_socket)->default_value(domain_socket), "Domain Socket (e.g. /tmp/ThriftTest.thrift), instead of host and port") + ("transport", program_options::value<string>(&transport_type)->default_value(transport_type), "Transport: buffered, framed, http") + ("protocol", program_options::value<string>(&protocol_type)->default_value(protocol_type), "Protocol: binary, json") + ("ssl", "Encrypted Transport using SSL") + ("testloops,n", program_options::value<int>(&numTests)->default_value(numTests), "Number of Tests") + ; + + program_options::variables_map vm; + program_options::store(program_options::parse_command_line(argc, argv, desc), vm); + program_options::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } - for (int i = 0; i < argc; ++i) { - if (strcmp(argv[i], "-h") == 0) { - char* pch = strtok(argv[++i], ":"); - if (pch != NULL) { - host = string(pch); + try { + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "json") { + } else { + throw invalid_argument("Unknown protocol type "+protocol_type); } - pch = strtok(NULL, ":"); - if (pch != NULL) { - port = atoi(pch); + } + + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { + } else { + throw invalid_argument("Unknown transport type "+transport_type); } - } else if (strcmp(argv[i], "-n") == 0) { - numTests = atoi(argv[++i]); - } else if (strcmp(argv[i], "-f") == 0) { - framed = true; - } else if (strcmp(argv[i], "--ssl") == 0) { - ssl = true; } + + } catch (std::exception& e) { + cerr << e.what() << endl; + cout << desc << "\n"; + return 1; } + if (vm.count("ssl")) { + ssl = true; + } + + shared_ptr<TTransport> transport; + shared_ptr<TProtocol> protocol; + shared_ptr<TSocket> socket; shared_ptr<TSSLSocketFactory> factory; + if (ssl) { factory = shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory()); factory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); @@ -87,22 +127,42 @@ int main(int argc, char** argv) { factory->authenticate(true); socket = factory->createSocket(host, port); } else { - socket = shared_ptr<TSocket>(new TSocket(host, port)); + if (domain_socket != "") { + socket = shared_ptr<TSocket>(new TSocket(domain_socket)); + port = 0; + } + else { + socket = shared_ptr<TSocket>(new TSocket(host, port)); + } } - shared_ptr<TBufferBase> transport; - - if (framed) { + if (transport_type.compare("http") == 0) { + shared_ptr<TTransport> httpSocket(new THttpClient(socket, host, "/service")); + transport = httpSocket; + } else if (transport_type.compare("framed") == 0){ shared_ptr<TFramedTransport> framedSocket(new TFramedTransport(socket)); transport = framedSocket; - } else { + } else{ shared_ptr<TBufferedTransport> bufferedSocket(new TBufferedTransport(socket)); transport = bufferedSocket; } - shared_ptr< TBinaryProtocolT<TBufferBase> > protocol( - new TBinaryProtocolT<TBufferBase>(transport)); - ThriftTestClientT< TBinaryProtocolT<TBufferBase> > testClient(protocol); + if (protocol_type.compare("json") == 0) { + shared_ptr<TProtocol> jsonProtocol(new TJSONProtocol(transport)); + protocol = jsonProtocol; + } else{ + shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport)); + protocol = binaryProtocol; + } + + // Connection info + cout << "Connecting (" << transport_type << "/" << protocol_type << ") to: " << domain_socket; + if (port != 0) { + cout << host << ":" << port; + } + cout << endl; + + ThriftTestClient testClient(protocol); uint64_t time_min = 0; uint64_t time_max = 0; Modified: thrift/trunk/test/cpp/src/TestServer.cpp URL: http://svn.apache.org/viewvc/thrift/trunk/test/cpp/src/TestServer.cpp?rev=1133116&r1=1133115&r2=1133116&view=diff ============================================================================== --- thrift/trunk/test/cpp/src/TestServer.cpp (original) +++ thrift/trunk/test/cpp/src/TestServer.cpp Tue Jun 7 17:59:07 2011 @@ -20,6 +20,7 @@ #include <concurrency/ThreadManager.h> #include <concurrency/PosixThreadFactory.h> #include <protocol/TBinaryProtocol.h> +#include <protocol/TJSONProtocol.h> #include <server/TSimpleServer.h> #include <server/TThreadedServer.h> #include <server/TThreadPoolServer.h> @@ -27,6 +28,8 @@ #include <transport/TServerSocket.h> #include <transport/TSSLServerSocket.h> #include <transport/TSSLSocket.h> +#include <transport/THttpServer.h> +#include <transport/THttpTransport.h> #include <transport/TTransportUtils.h> #include "ThriftTest.h" @@ -34,6 +37,8 @@ #include <stdexcept> #include <sstream> +#include <boost/program_options.hpp> + #define __STDC_FORMAT_MACROS #include <inttypes.h> #include <signal.h> @@ -324,97 +329,100 @@ class TestProcessorEventHandler : public int main(int argc, char **argv) { - int port = 9090; - string serverType = "simple"; - string protocolType = "binary"; - size_t workerCount = 4; bool ssl = false; - - ostringstream usage; - - usage << - argv[0] << " [--port=<port number>] [--server-type=<server-type>] [--protocol-type=<protocol-type>] [--workers=<worker-count>] [--processor-events]" << endl << - - "\t\tserver-type\t\ttype of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\". Default is " << serverType << endl << - - "\t\tprotocol-type\t\ttype of protocol, \"binary\", \"ascii\", or \"xml\". Default is " << protocolType << endl << - - "\t\tworkers\t\tNumber of thread pools workers. Only valid for thread-pool server type. Default is " << workerCount << endl; - - map<string, string> args; - - for (int ix = 1; ix < argc; ix++) { - string arg(argv[ix]); - if (arg.compare(0,2, "--") == 0) { - size_t end = arg.find_first_of("=", 2); - if (end != string::npos) { - args[string(arg, 2, end - 2)] = string(arg, end + 1); - } else { - args[string(arg, 2)] = "true"; - } - } else { - throw invalid_argument("Unexcepted command line token: "+arg); - } + string transport_type = "buffered"; + string protocol_type = "binary"; + string server_type = "simple"; + string domain_socket = ""; + size_t workers = 4; + + + program_options::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message") + ("port", program_options::value<int>(&port)->default_value(port), "Port number to listen") + ("domain-socket", program_options::value<string>(&domain_socket)->default_value(domain_socket), + "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)") + ("server-type", program_options::value<string>(&server_type)->default_value(server_type), + "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"") + ("transport", program_options::value<string>(&transport_type)->default_value(transport_type), + "transport: buffered, framed, http") + ("protocol", program_options::value<string>(&protocol_type)->default_value(protocol_type), + "protocol: binary, json") + ("ssl", "Encrypted Transport using SSL") + ("processor-events", "processor-events") + ("workers,n", program_options::value<size_t>(&workers)->default_value(workers), + "Number of thread pools workers. Only valid for thread-pool server type") + ; + + program_options::variables_map vm; + program_options::store(program_options::parse_command_line(argc, argv, desc), vm); + program_options::notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; } - + try { - - if (!args["port"].empty()) { - port = atoi(args["port"].c_str()); + if (!server_type.empty()) { + if (server_type == "simple") { + } else if (server_type == "thread-pool") { + } else if (server_type == "threaded") { + } else if (server_type == "nonblocking") { + } else { + throw invalid_argument("Unknown server type "+server_type); + } } - - if (!args["server-type"].empty()) { - serverType = args["server-type"]; - if (serverType == "simple") { - } else if (serverType == "thread-pool") { - } else if (serverType == "threaded") { - } else if (serverType == "nonblocking") { + + if (!protocol_type.empty()) { + if (protocol_type == "binary") { + } else if (protocol_type == "json") { } else { - throw invalid_argument("Unknown server type "+serverType); + throw invalid_argument("Unknown protocol type "+protocol_type); } } - if (!args["protocol-type"].empty()) { - protocolType = args["protocol-type"]; - if (protocolType == "binary") { - } else if (protocolType == "ascii") { - throw invalid_argument("ASCII protocol not supported"); - } else if (protocolType == "xml") { - throw invalid_argument("XML protocol not supported"); + if (!transport_type.empty()) { + if (transport_type == "buffered") { + } else if (transport_type == "framed") { + } else if (transport_type == "http") { } else { - throw invalid_argument("Unknown protocol type "+protocolType); + throw invalid_argument("Unknown transport type "+transport_type); } } - if (!args["workers"].empty()) { - workerCount = atoi(args["workers"].c_str()); - } } catch (std::exception& e) { cerr << e.what() << endl; - cerr << usage; + cout << desc << "\n"; + return 1; } - if (args["ssl"] == "true") { + if (vm.count("ssl")) { ssl = true; signal(SIGPIPE, SIG_IGN); } // Dispatcher - shared_ptr<TProtocolFactory> protocolFactory( - new TBinaryProtocolFactoryT<TBufferBase>()); + shared_ptr<TProtocolFactory> protocolFactory; + if (protocol_type == "json") { + shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory()); + protocolFactory = jsonProtocolFactory; + } else { + shared_ptr<TProtocolFactory> binaryProtocolFactory(new TBinaryProtocolFactoryT<TBufferBase>()); + protocolFactory = binaryProtocolFactory; + } + // Processor shared_ptr<TestHandler> testHandler(new TestHandler()); - - shared_ptr<TProcessor> testProcessor( - new ThriftTestProcessorT< TBinaryProtocolT<TBufferBase> >(testHandler)); - - - if (!args["processor-events"].empty()) { + shared_ptr<ThriftTestProcessor> testProcessor(new ThriftTestProcessor(testHandler)); + + if (vm.count("processor-events")) { testProcessor->setEventHandler(shared_ptr<TProcessorEventHandler>( new TestProcessorEventHandler())); } - + // Transport shared_ptr<TSSLSocketFactory> sslSocketFactory; shared_ptr<TServerSocket> serverSocket; @@ -426,26 +434,51 @@ int main(int argc, char **argv) { sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"); serverSocket = shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory)); } else { - serverSocket = shared_ptr<TServerSocket>(new TServerSocket(port)); + if (domain_socket != "") { + unlink(domain_socket.c_str()); + serverSocket = shared_ptr<TServerSocket>(new TServerSocket(domain_socket)); + port = 0; + } + else { + serverSocket = shared_ptr<TServerSocket>(new TServerSocket(port)); + } } + // Factory - shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory()); + shared_ptr<TTransportFactory> transportFactory; + + if (transport_type == "http") { + shared_ptr<TTransportFactory> httpTransportFactory(new THttpServerTransportFactory()); + transportFactory = httpTransportFactory; + } else if (transport_type == "framed") { + shared_ptr<TTransportFactory> framedTransportFactory(new TFramedTransportFactory()); + transportFactory = framedTransportFactory; + } else { + shared_ptr<TTransportFactory> bufferedTransportFactory(new TBufferedTransportFactory()); + transportFactory = bufferedTransportFactory; + } - if (serverType == "simple") { + // Server Info + cout << "Starting \"" << server_type << "\" server (" + << transport_type << "/" << protocol_type << ") listen on: " << domain_socket; + if (port != 0) { + cout << port; + } + cout << endl; - // Server + // Server + if (server_type == "simple") { TSimpleServer simpleServer(testProcessor, - serverSocket, + serverSocket, transportFactory, protocolFactory); - printf("Starting the server on port %d...\n", port); simpleServer.serve(); - } else if (serverType == "thread-pool") { + } else if (server_type == "thread-pool") { shared_ptr<ThreadManager> threadManager = - ThreadManager::newSimpleThreadManager(workerCount); + ThreadManager::newSimpleThreadManager(workers); shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory()); @@ -455,30 +488,27 @@ int main(int argc, char **argv) { threadManager->start(); TThreadPoolServer threadPoolServer(testProcessor, - serverSocket, + serverSocket, transportFactory, protocolFactory, - threadManager); + threadManager); - printf("Starting the server on port %d...\n", port); threadPoolServer.serve(); - } else if (serverType == "threaded") { + } else if (server_type == "threaded") { TThreadedServer threadedServer(testProcessor, serverSocket, transportFactory, protocolFactory); - printf("Starting the server on port %d...\n", port); threadedServer.serve(); - } else if (serverType == "nonblocking") { + } else if (server_type == "nonblocking") { TNonblockingServer nonblockingServer(testProcessor, port); - printf("Starting the nonblocking server on port %d...\n", port); nonblockingServer.serve(); } - printf("done.\n"); + cout << "done." << endl; return 0; }