Repository: mesos
Updated Branches:
  refs/heads/master 5348496aa -> 3c01efd9a


Added a libprocess network benchmark.

The benchmark forks and launches multiple "libprocess" processes in
each child. They are aware of the "master"'s (parent) address and all
play ping pong with it. This benchmark measures throughput in terms of
the number of RPCs handled per second using persistent (linked)
connections.

A new test file (benchmarks) is introduced because we want to fork
before libprocess is initialized. This allows us to prevent
short-circuiting of message passing between processes under the same
ProcessManager. This way we force the execution path of the underlying
event management system.

Review: https://reviews.apache.org/r/26150


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c01efd9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c01efd9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c01efd9

Branch: refs/heads/master
Commit: 3c01efd9a809e4e6bda6d18f13b4957a5e39dd36
Parents: 5348496
Author: Joris Van Remoortere <[email protected]>
Authored: Thu Oct 23 20:41:25 2014 +0000
Committer: Niklas Q. Nielsen <[email protected]>
Committed: Thu Oct 23 21:12:16 2014 +0000

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am              |  21 +-
 3rdparty/libprocess/src/tests/benchmarks.cpp | 283 ++++++++++++++++++++++
 2 files changed, 302 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3c01efd9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 616618e..a9f6178 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -87,7 +87,7 @@ libprocess_la_LIBADD += $(GPERFTOOLS)/libprofiler.la
 endif
 
 # Tests.
-check_PROGRAMS = tests
+check_PROGRAMS = tests benchmarks
 
 tests_SOURCES =                                                        \
   src/tests/decoder_tests.cpp                                  \
@@ -122,12 +122,29 @@ tests_LDADD =                             \
   $(HTTP_PARSER_LIB)                   \
   $(LIBEV_LIB)
 
+benchmarks_SOURCES =                   \
+  src/tests/benchmarks.cpp
+
+benchmarks_CPPFLAGS =                  \
+  -I$(top_srcdir)/src                  \
+  -I$(GTEST)/include                   \
+  -I$(GMOCK)/include                   \
+  $(libprocess_la_CPPFLAGS)
+
+benchmarks_LDADD =                     \
+  3rdparty/libgmock.la                 \
+  libprocess.la                                \
+  $(LIBGLOG)                           \
+  $(HTTP_PARSER_LIB)                   \
+  $(LIBEV_LIB)
+
 # We use a check-local target for now to avoid the parallel test
 # runner that ships with newer versions of autotools.
 # See the following discussion for the workaround:
 # http://lists.gnu.org/archive/html/automake/2013-01/msg00051.html
-check-local: tests
+check-local: tests benchmarks
        ./tests
+       ./benchmarks
 
 # TODO(benh): Fix shared builds (tests need libglog, libev, etc).
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3c01efd9/3rdparty/libprocess/src/tests/benchmarks.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp 
b/3rdparty/libprocess/src/tests/benchmarks.cpp
new file mode 100644
index 0000000..79a650b
--- /dev/null
+++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gtest/gtest.h>
+
+#include <gmock/gmock.h>
+
+#include <memory>
+#include <unordered_set>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/process.hpp>
+
+using namespace process;
+
+using std::function;
+using std::istringstream;
+using std::ostringstream;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+
+int main(int argc, char** argv)
+{
+  // Initialize Google Mock/Test.
+  testing::InitGoogleMock(&argc, argv);
+
+  // Add the libprocess test event listeners.
+  ::testing::TestEventListeners& listeners =
+    ::testing::UnitTest::GetInstance()->listeners();
+
+  listeners.Append(process::ClockTestEventListener::instance());
+  listeners.Append(process::FilterTestEventListener::instance());
+
+  return RUN_ALL_TESTS();
+}
+
+
+class BenchmarkProcess : public Process<BenchmarkProcess>
+{
+public:
+  BenchmarkProcess(
+      int _iterations = 1,
+      int _maxOutstanding = 1,
+      const Option<UPID>& _other = Option<UPID>())
+    : other(_other),
+      counter(0),
+      iterations(_iterations),
+      maxOutstanding(_maxOutstanding),
+      outstanding(0),
+      sent(0)
+  {
+    if (other.isSome()) {
+      setLink(other.get());
+    }
+  }
+
+  virtual ~BenchmarkProcess() {}
+
+  virtual void initialize()
+  {
+    install("ping", &BenchmarkProcess::ping);
+    install("pong", &BenchmarkProcess::pong);
+  }
+
+  void setLink(const UPID& that)
+  {
+    link(that);
+  }
+
+  void start()
+  {
+    watch.start();
+    sendRemaining();
+  }
+
+  // Returns the number of rpcs performed per second.
+  int await()
+  {
+    latch.await();
+    double elapsed = watch.elapsed().secs();
+    return iterations / elapsed;
+  }
+
+private:
+  void ping(const UPID& from, const string& body)
+  {
+    if (linkedPorts.find(from.port) == linkedPorts.end()) {
+      setLink(from);
+      linkedPorts.insert(from.port);
+    }
+    static const string message("hi");
+    send(from, "pong", message.c_str(), message.size());
+  }
+
+  void pong(const UPID& from, const string& body)
+  {
+    ++counter;
+    --outstanding;
+    if (counter >= iterations) {
+      latch.trigger();
+      watch.stop();
+    }
+    sendRemaining();
+  }
+
+  void sendRemaining()
+  {
+    static const string message("hi");
+    for (; outstanding < maxOutstanding && sent < iterations;
+         ++outstanding, ++sent) {
+      send(other.get(), "ping", message.c_str(), message.size());
+    }
+  }
+
+  Option<UPID> other;
+
+  Latch latch;
+  Stopwatch watch;
+
+  int counter;
+
+  const int iterations;
+  const int maxOutstanding;
+  int outstanding;
+  int sent;
+  unordered_set<int> linkedPorts;
+};
+
+
+// Launch numberOfProcesses processes, each with clients 'client'
+// Actors. Play ping pong back and forth between these actors and the
+// main 'server' actor. Each 'client' can have queueDepth ping
+// requests outstanding to the 'server' actor.
+TEST(Process, Process_BENCHMARK_Test)
+{
+  const int iterations = 2500;
+  const int queueDepth = 250;
+  const int clients = 8;
+  const int numberOfProcesses = 4;
+
+  vector<int> outPipes;
+  vector<int> inPipes;
+  vector<pid_t> pids;
+  for (int moreToLaunch = numberOfProcesses;
+       moreToLaunch > 0; --moreToLaunch) {
+    // fork in order to get numberOfProcesses seperate
+    // ProcessManagers. This avoids the short-circuit built into
+    // ProcessManager for processes communicating in the same manager.
+    int pipes[2];
+    int resultPipes[2];
+    pid_t pid = -1;
+    if(pipe2(pipes, O_CLOEXEC) < 0) {
+      perror("pipe failed");
+      abort();
+    }
+    if(pipe2(resultPipes, O_CLOEXEC) < 0) {
+      perror("pipe failed");
+      abort();
+    }
+    pid = fork();
+
+    if (pid < 0) {
+      perror("fork() failed");
+      abort();
+    } else if (pid == 0) {
+      // Child.
+
+      // Read the number of bytes about to be parsed.
+      int stringSize = 0;
+      ssize_t result = read(pipes[0], &stringSize, sizeof(stringSize));
+      EXPECT_EQ(result, sizeof(stringSize));
+      char buffer[stringSize + 1];
+      memset(&buffer, 0, stringSize + 1);
+
+      // Read in the upid of the 'server' actor.
+      result = read(pipes[0], &buffer, stringSize);
+      EXPECT_EQ(result, stringSize);
+      istringstream inStream(buffer);
+      UPID other;
+      inStream >> other;
+
+      // Launch a thread for each client that backs an actor.
+      vector<unique_ptr<BenchmarkProcess>> benchmarkProcesses;
+      for (int i = 0; i < clients; ++i) {
+        BenchmarkProcess* process = new BenchmarkProcess(
+            iterations,
+            queueDepth,
+            other);
+        benchmarkProcesses.push_back(unique_ptr<BenchmarkProcess>(process));
+        spawn(process);
+        process->start();
+      }
+
+      // Compute the total rpcs per second for this process, write the
+      // computation back to the server end of the fork.
+      int totalRpcPerSecond = 0;
+      foreach (const auto& process, benchmarkProcesses) {
+        int rpcPerSecond = process->await();
+        totalRpcPerSecond += rpcPerSecond;
+        terminate(*process);
+        wait(*process);
+      }
+
+      result = write(
+          resultPipes[1],
+          &totalRpcPerSecond,
+          sizeof(totalRpcPerSecond));
+      EXPECT_EQ(result, sizeof(totalRpcPerSecond));
+      close(pipes[0]);
+      exit(0);
+    } else {
+      // Parent.
+
+      // Keep track of the pipes to the child forks. This way the
+      // results of their rpc / sec computations can be read back and
+      // aggregated.
+      outPipes.push_back(pipes[1]);
+      inPipes.push_back(resultPipes[0]);
+      pids.push_back(pid);
+
+      // If this is the last child launched, then let the parent
+      // become the 'server' actor.
+      if (moreToLaunch == 1) {
+        BenchmarkProcess process(iterations, queueDepth);
+        const UPID pid = spawn(&process);
+
+        // Stringify the server pid to send to the child processes.
+        ostringstream outStream;
+        outStream << pid;
+        int stringSize = outStream.str().size();
+
+        // For each child, write the size of the stringified pid as
+        // well as the stringified pid to the pipe.
+        foreach (int fd, outPipes) {
+          ssize_t result = write(fd, &stringSize, sizeof(stringSize));
+          EXPECT_EQ(result, sizeof(stringSize));
+          result = write(fd, outStream.str().c_str(), stringSize);
+          EXPECT_EQ(result, stringSize);
+          close(fd);
+        }
+
+        // Read the resulting rpcs / second from the child processes
+        // and aggregate the results.
+        int totalRpcsPerSecond = 0;
+        foreach (int fd, inPipes) {
+          int rpcs = 0;
+          ssize_t result = read(fd, &rpcs, sizeof(rpcs));
+          EXPECT_EQ(result, sizeof(rpcs));
+          if (result != sizeof(rpcs)) {
+            abort();
+          }
+          totalRpcsPerSecond += rpcs;
+        }
+
+        // Wait for all the child forks to terminately gracefully.
+        foreach (const auto& p, pids) {
+          ::waitpid(p, NULL, 0);
+        }
+        printf("Total: [%d] rpcs / s\n", totalRpcsPerSecond);
+        terminate(process);
+        wait(process);
+      }
+    }
+  }
+}

Reply via email to