Repository: mesos Updated Branches: refs/heads/master 9f13e14b7 -> e52f43fe7
Cleanups to Subprocess usage in Linux perf sampling. Review: https://reviews.apache.org/r/37045 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e52f43fe Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e52f43fe Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e52f43fe Branch: refs/heads/master Commit: e52f43fe7d3d0606b111b8a9c17212caecbde05e Parents: 9f13e14 Author: Paul Brett <[email protected]> Authored: Wed Aug 5 11:47:08 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Aug 5 12:23:50 2015 -0700 ---------------------------------------------------------------------- src/linux/perf.cpp | 112 +++++++++++++++++++++++------------------------- 1 file changed, 54 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e52f43fe/src/linux/perf.cpp ---------------------------------------------------------------------- diff --git a/src/linux/perf.cpp b/src/linux/perf.cpp index 697b75e..5d4cb61 100644 --- a/src/linux/perf.cpp +++ b/src/linux/perf.cpp @@ -26,6 +26,7 @@ #include <list> #include <ostream> +#include <tuple> #include <vector> #include <process/clock.hpp> @@ -40,14 +41,19 @@ #include <stout/os/signals.hpp> +#include "common/status_utils.hpp" + #include "linux/perf.hpp" using namespace process; +using process::await; + using std::list; using std::ostringstream; using std::set; using std::string; +using std::tuple; using std::vector; namespace perf { @@ -171,7 +177,7 @@ protected: if (duration < Seconds(0)) { promise.fail("Perf sample duration cannot be negative: '" + - stringify(duration.secs()) + "'"); + stringify(duration.secs()) + "'"); terminate(self()); return; } @@ -183,8 +189,6 @@ protected: virtual void finalize() { - discard(output); - // Kill the perf process if it's still running. if (perf.isSome() && perf.get().status().isPending()) { kill(perf.get().pid(), SIGTERM); @@ -288,75 +292,67 @@ private: } perf = _perf.get(); - // Start reading from stdout and stderr now. We don't use stderr - // but must read from it to avoid the subprocess blocking on the - // pipe. - output.push_back(io::read(perf.get().out().get())); - output.push_back(io::read(perf.get().err().get())); - // Wait for the process to exit. - perf.get().status() - .onAny(defer(self(), &Self::_sample, lambda::_1)); - } - - void _sample(const Future<Option<int>>& status) - { - if (!status.isReady()) { - promise.fail("Failed to get exit status of perf process: " + - (status.isFailed() ? status.failure() : "discarded")); - terminate(self()); - return; - } + await(perf.get().status(), + io::read(perf.get().out().get()), + io::read(perf.get().err().get())) + .onReady(defer(self(), [this](const tuple< + Future<Option<int>>, + Future<string>, + Future<string>>& results) { + Future<Option<int>> status = std::get<0>(results); + Future<string> output = std::get<1>(results); + + Option<Error> error = None(); + + if (!status.isReady()) { + error = Error("Failed to execute perf: " + + (status.isFailed() ? status.failure() : "discarded")); + } else if (status.get().isNone()) { + error = Error("Failed to execute perf: failed to reap"); + } else if (status.get().get() != 0) { + error = Error("Failed to collect perf statistics: " + + WSTRINGIFY(status.get().get())); + } else if (!output.isReady()) { + error = Error("Failed to read perf output: " + + (output.isFailed() ? output.failure() : "discarded")); + } - if (status.get().get() != 0) { - promise.fail("Failed to execute perf, exit status: " + - stringify(WEXITSTATUS(status.get().get()))); + if (error.isSome()) { + promise.fail(error.get().message); + terminate(self()); + return; + } - terminate(self()); - return; - } + // Parse output from stdout. + Try<hashmap<string, mesos::PerfStatistics>> parse = + perf::parse(output.get()); - // Wait until we collect all output. - collect(output).onAny(defer(self(), &Self::__sample, lambda::_1)); - } + if (parse.isError()) { + promise.fail("Failed to parse perf output: " + parse.error()); + terminate(self()); + return; + } - void __sample(const Future<list<string>>& future) - { - if (!future.isReady()) { - promise.fail("Failed to collect output of perf process: " + - (future.isFailed() ? future.failure() : "discarded")); - terminate(self()); - return; - } + // Create a non-const copy from the Try<> so we can set the + // timestamp and duration. + hashmap<string, mesos::PerfStatistics> statistics = parse.get(); + foreachvalue (mesos::PerfStatistics& s, statistics) { + s.set_timestamp(start.secs()); + s.set_duration(duration.secs()); + } - // Parse output from stdout. - Try<hashmap<string, mesos::PerfStatistics>> parse = - perf::parse(output.front().get()); - if (parse.isError()) { - promise.fail("Failed to parse perf output: " + parse.error()); + promise.set(statistics); terminate(self()); return; - } - - // Create a non-const copy from the Try<> so we can set the - // timestamp and duration. - hashmap<string, mesos::PerfStatistics> statistics = parse.get(); - foreachvalue (mesos::PerfStatistics& s, statistics) { - s.set_timestamp(start.secs()); - s.set_duration(duration.secs()); - } - - promise.set(statistics); - terminate(self()); - return; - } + })); +} const vector<string> argv; const Duration duration; Time start; Option<Subprocess> perf; Promise<hashmap<string, mesos::PerfStatistics>> promise; - list<Future<string>> output; };
