Repository: mesos
Updated Branches:
  refs/heads/master 9f13e14b7 -> e52f43fe7


Cleanups to Subprocess usage in Linux perf sampling.

Review: https://reviews.apache.org/r/37045


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e52f43fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e52f43fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e52f43fe

Branch: refs/heads/master
Commit: e52f43fe7d3d0606b111b8a9c17212caecbde05e
Parents: 9f13e14
Author: Paul Brett <[email protected]>
Authored: Wed Aug 5 11:47:08 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Aug 5 12:23:50 2015 -0700

----------------------------------------------------------------------
 src/linux/perf.cpp | 112 +++++++++++++++++++++++-------------------------
 1 file changed, 54 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e52f43fe/src/linux/perf.cpp
----------------------------------------------------------------------
diff --git a/src/linux/perf.cpp b/src/linux/perf.cpp
index 697b75e..5d4cb61 100644
--- a/src/linux/perf.cpp
+++ b/src/linux/perf.cpp
@@ -26,6 +26,7 @@
 
 #include <list>
 #include <ostream>
+#include <tuple>
 #include <vector>
 
 #include <process/clock.hpp>
@@ -40,14 +41,19 @@
 
 #include <stout/os/signals.hpp>
 
+#include "common/status_utils.hpp"
+
 #include "linux/perf.hpp"
 
 using namespace process;
 
+using process::await;
+
 using std::list;
 using std::ostringstream;
 using std::set;
 using std::string;
+using std::tuple;
 using std::vector;
 
 namespace perf {
@@ -171,7 +177,7 @@ protected:
 
     if (duration < Seconds(0)) {
       promise.fail("Perf sample duration cannot be negative: '" +
-                    stringify(duration.secs()) + "'");
+                   stringify(duration.secs()) + "'");
       terminate(self());
       return;
     }
@@ -183,8 +189,6 @@ protected:
 
   virtual void finalize()
   {
-    discard(output);
-
     // Kill the perf process if it's still running.
     if (perf.isSome() && perf.get().status().isPending()) {
       kill(perf.get().pid(), SIGTERM);
@@ -288,75 +292,67 @@ private:
     }
     perf = _perf.get();
 
-    // Start reading from stdout and stderr now. We don't use stderr
-    // but must read from it to avoid the subprocess blocking on the
-    // pipe.
-    output.push_back(io::read(perf.get().out().get()));
-    output.push_back(io::read(perf.get().err().get()));
-
     // Wait for the process to exit.
-    perf.get().status()
-      .onAny(defer(self(), &Self::_sample, lambda::_1));
-  }
-
-  void _sample(const Future<Option<int>>& status)
-  {
-    if (!status.isReady()) {
-      promise.fail("Failed to get exit status of perf process: " +
-                   (status.isFailed() ? status.failure() : "discarded"));
-      terminate(self());
-      return;
-    }
+    await(perf.get().status(),
+          io::read(perf.get().out().get()),
+          io::read(perf.get().err().get()))
+    .onReady(defer(self(), [this](const tuple<
+        Future<Option<int>>,
+        Future<string>,
+        Future<string>>& results) {
+      Future<Option<int>> status = std::get<0>(results);
+      Future<string> output = std::get<1>(results);
+
+      Option<Error> error = None();
+
+      if (!status.isReady()) {
+        error = Error("Failed to execute perf: " +
+                      (status.isFailed() ? status.failure() : "discarded"));
+      } else if (status.get().isNone()) {
+        error = Error("Failed to execute perf: failed to reap");
+      } else if (status.get().get() != 0) {
+        error = Error("Failed to collect perf statistics: " +
+                      WSTRINGIFY(status.get().get()));
+      } else if (!output.isReady()) {
+        error = Error("Failed to read perf output: " +
+                      (output.isFailed() ? output.failure() : "discarded"));
+      }
 
-    if (status.get().get() != 0) {
-      promise.fail("Failed to execute perf, exit status: " +
-                    stringify(WEXITSTATUS(status.get().get())));
+      if (error.isSome()) {
+        promise.fail(error.get().message);
+        terminate(self());
+        return;
+      }
 
-      terminate(self());
-      return;
-    }
+      // Parse output from stdout.
+      Try<hashmap<string, mesos::PerfStatistics>> parse =
+          perf::parse(output.get());
 
-    // Wait until we collect all output.
-    collect(output).onAny(defer(self(), &Self::__sample, lambda::_1));
-  }
+      if (parse.isError()) {
+        promise.fail("Failed to parse perf output: " + parse.error());
+        terminate(self());
+        return;
+      }
 
-  void  __sample(const Future<list<string>>& future)
-  {
-    if (!future.isReady()) {
-      promise.fail("Failed to collect output of perf process: " +
-                   (future.isFailed() ? future.failure() : "discarded"));
-      terminate(self());
-      return;
-    }
+      // Create a non-const copy from the Try<> so we can set the
+      // timestamp and duration.
+      hashmap<string, mesos::PerfStatistics> statistics = parse.get();
+      foreachvalue (mesos::PerfStatistics& s, statistics) {
+        s.set_timestamp(start.secs());
+        s.set_duration(duration.secs());
+      }
 
-    // Parse output from stdout.
-    Try<hashmap<string, mesos::PerfStatistics>> parse =
-      perf::parse(output.front().get());
-    if (parse.isError()) {
-      promise.fail("Failed to parse perf output: " + parse.error());
+      promise.set(statistics);
       terminate(self());
       return;
-    }
-
-    // Create a non-const copy from the Try<> so we can set the
-    // timestamp and duration.
-    hashmap<string, mesos::PerfStatistics> statistics = parse.get();
-    foreachvalue (mesos::PerfStatistics& s, statistics) {
-      s.set_timestamp(start.secs());
-      s.set_duration(duration.secs());
-    }
-
-    promise.set(statistics);
-    terminate(self());
-    return;
-  }
+  }));
+}
 
   const vector<string> argv;
   const Duration duration;
   Time start;
   Option<Subprocess> perf;
   Promise<hashmap<string, mesos::PerfStatistics>> promise;
-  list<Future<string>> output;
 };
 
 

Reply via email to