Repository: mesos Updated Branches: refs/heads/master 61f08f8c0 -> c0bf80e2b
Cleaned up the libprocess ping/pong benchmark. Review: https://reviews.apache.org/r/27113 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0bf80e2 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0bf80e2 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0bf80e2 Branch: refs/heads/master Commit: c0bf80e2b25046ac5cf17fd824d9d69983fb6fa9 Parents: 61f08f8 Author: Joris Van Remoortere <[email protected]> Authored: Fri Apr 17 14:19:23 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Apr 17 15:47:00 2015 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/tests/benchmarks.cpp | 398 +++++++++++----------- 1 file changed, 192 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c0bf80e2/3rdparty/libprocess/src/tests/benchmarks.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp b/3rdparty/libprocess/src/tests/benchmarks.cpp index a927e4e..0d67148 100644 --- a/3rdparty/libprocess/src/tests/benchmarks.cpp +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp @@ -22,25 +22,28 @@ #include <iostream> #include <memory> -#include <unordered_set> +#include <string> #include <vector> +#include <process/collect.hpp> +#include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> +#include <process/owned.hpp> #include <process/process.hpp> +#include <stout/duration.hpp> +#include <stout/gtest.hpp> +#include <stout/hashset.hpp> #include <stout/stopwatch.hpp> using namespace process; using std::cout; using std::endl; -using std::function; -using std::istringstream; +using std::list; using std::ostringstream; using std::string; -using std::unique_ptr; -using std::unordered_set; using std::vector; int main(int argc, char** argv) @@ -58,250 +61,233 @@ int main(int argc, char** argv) return RUN_ALL_TESTS(); } +// TODO(jmlvanre): Factor out the client / server behavior so that we +// can make separate binaries for the client and server. This is +// useful to attach performance tools to them separately. -class BenchmarkProcess : public Process<BenchmarkProcess> +// A process that emulates the 'client' side of a ping pong game. +// An HTTP '/run' request performs a run and returns the time elapsed. +class ClientProcess : public Process<ClientProcess> { public: - BenchmarkProcess( - int _iterations = 1, - int _maxOutstanding = 1, - const Option<UPID>& _other = Option<UPID>()) - : other(_other), - counter(0), - iterations(_iterations), - maxOutstanding(_maxOutstanding), - outstanding(0), - sent(0) - { - if (other.isSome()) { - setLink(other.get()); - } - } + ClientProcess() + : running(false), + requests(0), + responses(0), + totalRequests(0), + concurrency(0) {} - virtual ~BenchmarkProcess() {} + virtual ~ClientProcess() {} +protected: virtual void initialize() { - install("ping", &BenchmarkProcess::ping); - install("pong", &BenchmarkProcess::pong); + install("pong", &ClientProcess::pong); + + route("/run", None(), &ClientProcess::run); } - void setLink(const UPID& that) +private: + Future<http::Response> run(const http::Request& request) { - link(that); + if (duration.get() != NULL) { + return http::BadRequest("A run is already in progress"); + } + + hashmap<string, Option<string>> parameters { + {"server", request.query.get("server")}, + {"messageSize", request.query.get("messageSize")}, + {"requests", request.query.get("requests")}, + {"concurrency", request.query.get("concurrency")}, + }; + + // Ensure all parameters were provided. + foreachpair (const string& parameter, + const Option<string>& value, + parameters) { + if (value.isNone()) { + return http::BadRequest("Missing '" + parameter + "' parameter"); + } + } + + server = UPID(parameters["server"].get()); + link(server); + + Try<Bytes> messageSize = Bytes::parse(parameters["messageSize"].get()); + if (messageSize.isError()) { + return http::BadRequest("Invalid 'messageSize': " + messageSize.error()); + } + message = string(messageSize.get().bytes(), '1'); + + Try<size_t> numify_ = numify<size_t>(parameters["requests"].get()); + if (numify_.isError()) { + return http::BadRequest("Invalid 'requests': " + numify_.error()); + } + totalRequests = numify_.get(); + + numify_ = numify<size_t>(parameters["concurrency"].get()); + if (numify_.isError()) { + return http::BadRequest("Invalid 'concurrency': " + numify_.error()); + } + concurrency = numify_.get(); + + if (concurrency > totalRequests) { + concurrency = totalRequests; + } + + return _run() + .then(lambda::bind(&Self::__run, lambda::_1)); } - void start() + Future<Duration> _run() { + duration = Owned<Promise<Duration> >(new Promise<Duration>()); + watch.start(); - sendRemaining(); - } - // Returns the number of rpcs performed per second. - int await() - { - latch.await(); - double elapsed = watch.elapsed().secs(); - return iterations / elapsed; + while (requests < concurrency) { + send(server, "ping", message.c_str(), message.size()); + ++requests; + } + + return duration->future(); } -private: - void ping(const UPID& from, const string& body) + // TODO(jmlvanre): convert to c++11 lambda. + static Future<http::Response> __run(const Duration& duration) { - if (linkedPorts.find(from.address.port) == linkedPorts.end()) { - setLink(from); - linkedPorts.insert(from.address.port); - } - static const string message("hi"); - send(from, "pong", message.c_str(), message.size()); + return http::OK(stringify(duration)); } void pong(const UPID& from, const string& body) { - ++counter; - --outstanding; - if (counter >= iterations) { - latch.trigger(); - watch.stop(); + ++responses; + + if (responses == totalRequests) { + duration->set(watch.elapsed()); + duration.reset(); + } else if (requests < totalRequests) { + send(server, "ping", message.c_str(), message.size()); + ++requests; } - sendRemaining(); } - void sendRemaining() - { - static const string message("hi"); - for (; outstanding < maxOutstanding && sent < iterations; - ++outstanding, ++sent) { - send(other.get(), "ping", message.c_str(), message.size()); - } - } + bool running; - Option<UPID> other; + // The address of the ponger (server). + UPID server; - Latch latch; Stopwatch watch; - int counter; + Owned<Promise<Duration>> duration; - const int iterations; - const int maxOutstanding; - int outstanding; - int sent; - unordered_set<int> linkedPorts; -}; + string message; + size_t requests; + size_t responses; -typedef int pipes[2]; + size_t totalRequests; + size_t concurrency; +}; -void createPipes(pipes& _pipes) +// A process that emulates the 'server' side of a ping pong game. +// Note that the server links to any clients communicating to it. +class ServerProcess : public Process<ServerProcess> { - if (pipe(_pipes) < 0) { - perror("Pipe failed"); - abort(); - } - Try<Nothing> cloexec = os::cloexec(_pipes[0]); - if (cloexec.isError()) { - perror("Cloexec failed on pipe"); - abort(); +public: + virtual ~ServerProcess() {} + +protected: + virtual void initialize() + { + install("ping", &ServerProcess::ping); } - cloexec = os::cloexec(_pipes[1]); - if (cloexec.isError()) { - perror("Cloexec failed on pipe"); - abort(); + +private: + void ping(const UPID& from, const string& body) + { + if (!links.contains(from)) { + link(from); + links.insert(from); + } + + send(from, "pong", body.c_str(), body.size()); } -} + hashset<UPID> links; +}; + +// TODO(bmahler): Since there is no forking here, libprocess +// avoids going through sockets for local messages. Either fork +// or have the ability to disable local messages in libprocess. -// Launch numberOfProcesses processes, each with clients 'client' -// Actors. Play ping pong back and forth between these actors and the -// main 'server' actor. Each 'client' can have queueDepth ping -// requests outstanding to the 'server' actor. -TEST(Process, Process_BENCHMARK_Test) +// Launches many clients against a central server and measures +// client throughput. +TEST(Process, Process_BENCHMARK_ClientServer) { - const int iterations = 2500; - const int queueDepth = 250; - const int clients = 8; - const int numberOfProcesses = 4; - - vector<int> outPipes; - vector<int> inPipes; - vector<pid_t> pids; - for (int moreToLaunch = numberOfProcesses; - moreToLaunch > 0; --moreToLaunch) { - // fork in order to get numberOfProcesses seperate - // ProcessManagers. This avoids the short-circuit built into - // ProcessManager for processes communicating in the same manager. - int requestPipes[2]; - int resultPipes[2]; - pid_t pid = -1; - createPipes(requestPipes); - createPipes(resultPipes); - pid = fork(); - - if (pid < 0) { - perror("fork() failed"); - abort(); - } else if (pid == 0) { - // Child. - - // Read the number of bytes about to be parsed. - int stringSize = 0; - ssize_t result = read(requestPipes[0], &stringSize, sizeof(stringSize)); - EXPECT_EQ(result, sizeof(stringSize)); - char buffer[stringSize + 1]; - memset(&buffer, 0, stringSize + 1); - - // Read in the upid of the 'server' actor. - result = read(requestPipes[0], &buffer, stringSize); - EXPECT_EQ(result, stringSize); - istringstream inStream(buffer); - UPID other; - inStream >> other; - - // Launch a thread for each client that backs an actor. - vector<unique_ptr<BenchmarkProcess>> benchmarkProcesses; - for (int i = 0; i < clients; ++i) { - BenchmarkProcess* process = new BenchmarkProcess( - iterations, - queueDepth, - other); - benchmarkProcesses.push_back(unique_ptr<BenchmarkProcess>(process)); - spawn(process); - process->start(); - } + const size_t numRequests = 10000; + const size_t concurrency = 250; + const size_t numClients = 8; + const Bytes messageSize = Bytes(3); + + ServerProcess server; + const UPID serverPid = spawn(&server); + + // Launch the clients. + vector<Owned<ClientProcess>> clients; + for (size_t i = 0; i < numClients; i++) { + clients.push_back(Owned<ClientProcess>(new ClientProcess())); + spawn(clients.back().get()); + } - // Compute the total rpcs per second for this process, write the - // computation back to the server end of the fork. - int totalRpcPerSecond = 0; - foreach (const auto& process, benchmarkProcesses) { - int rpcPerSecond = process->await(); - totalRpcPerSecond += rpcPerSecond; - terminate(*process); - wait(*process); - } + // Start the ping / pongs! + const string query = strings::join( + "&", + "server=" + stringify(serverPid), + "requests=" + stringify(numRequests), + "concurrency=" + stringify(concurrency), + "messageSize=" + stringify(messageSize)); - result = write( - resultPipes[1], - &totalRpcPerSecond, - sizeof(totalRpcPerSecond)); - EXPECT_EQ(result, sizeof(totalRpcPerSecond)); - close(requestPipes[0]); - exit(0); - } else { - // Parent. - - // Keep track of the pipes to the child forks. This way the - // results of their rpc / sec computations can be read back and - // aggregated. - outPipes.push_back(requestPipes[1]); - inPipes.push_back(resultPipes[0]); - pids.push_back(pid); - - // If this is the last child launched, then let the parent - // become the 'server' actor. - if (moreToLaunch == 1) { - BenchmarkProcess process(iterations, queueDepth); - const UPID pid = spawn(&process); - - // Stringify the server pid to send to the child processes. - ostringstream outStream; - outStream << pid; - int stringSize = outStream.str().size(); - - // For each child, write the size of the stringified pid as - // well as the stringified pid to the pipe. - foreach (int fd, outPipes) { - ssize_t result = write(fd, &stringSize, sizeof(stringSize)); - EXPECT_EQ(result, sizeof(stringSize)); - result = write(fd, outStream.str().c_str(), stringSize); - EXPECT_EQ(result, stringSize); - close(fd); - } - - // Read the resulting rpcs / second from the child processes - // and aggregate the results. - int totalRpcsPerSecond = 0; - foreach (int fd, inPipes) { - int rpcs = 0; - ssize_t result = read(fd, &rpcs, sizeof(rpcs)); - EXPECT_EQ(result, sizeof(rpcs)); - if (result != sizeof(rpcs)) { - abort(); - } - totalRpcsPerSecond += rpcs; - } - - // Wait for all the child forks to terminately gracefully. - foreach (const auto& p, pids) { - ::waitpid(p, NULL, 0); - } - printf("Total: [%d] rpcs / s\n", totalRpcsPerSecond); - terminate(process); - wait(process); - } - } + Stopwatch watch; + watch.start(); + + list<Future<http::Response>> futures; + foreach (const Owned<ClientProcess>& client, clients) { + futures.push_back(http::get(client->self(), "run", query)); + } + + Future<list<http::Response>> responses = collect(futures); + AWAIT_READY(responses); + + Duration elapsed = watch.elapsed(); + + // Print the throughput of each client. + size_t i = 0; + foreach (const http::Response& response, responses.get()) { + ASSERT_EQ(http::statuses[200], response.status); + + Try<Duration> elapsed = Duration::parse(response.body); + ASSERT_SOME(elapsed); + double throughput = numRequests / elapsed.get().secs(); + + cout << "Client " << i << ": " << throughput << " rpcs / sec" << endl; + + i++; + } + + double throughput = (numRequests * numClients) / elapsed.secs(); + cout << "Estimated Total: " << throughput << " rpcs / sec" << endl; + + foreach (const Owned<ClientProcess>& client, clients) { + terminate(*client); + wait(*client); } + + terminate(server); + wait(server); + + return; }
