subprocess: allow Call() to read both stdout and stderr I'm going to use this in a new integration test for the tool.
Since the parent is now reading from two pipes, it needs to do so more carefully. For example, if it read from stdout fully before looking at stderr, both it and the child would deadlock if the child wrote 64k bytes to the stderr pipe, hit the kernel limit, and got blocked. I played around with an implementation based on poll(), but ultimately found this one (based on libev) to be simpler. Change-Id: If5f2be94c2e5cc0644a5bb2340adc4a71d844247 Reviewed-on: http://gerrit.cloudera.org:8080/4057 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/666ae1ed Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/666ae1ed Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/666ae1ed Branch: refs/heads/master Commit: 666ae1ed1f47fab7a67f0f53f48ccaeee7f11654 Parents: effcd2c Author: Adar Dembo <[email protected]> Authored: Thu Aug 18 20:45:55 2016 -0700 Committer: Todd Lipcon <[email protected]> Committed: Thu Aug 25 05:32:48 2016 +0000 ---------------------------------------------------------------------- src/kudu/util/CMakeLists.txt | 1 + src/kudu/util/subprocess-test.cc | 26 ++++- src/kudu/util/subprocess.cc | 181 ++++++++++++++++++++++++---------- src/kudu/util/subprocess.h | 13 +-- 4 files changed, 160 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index a4527b9..82e7b91 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -197,6 +197,7 @@ set(UTIL_LIBS glog gutil histogram_proto + libev maintenance_manager_proto pb_util_proto protobuf http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc index 6397f7c..31b47fa 100644 --- a/src/kudu/util/subprocess-test.cc +++ b/src/kudu/util/subprocess-test.cc @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. -#include <vector> +#include "kudu/util/subprocess.h" + +#include <unistd.h> + #include <string> +#include <vector> #include <gtest/gtest.h> -#include "kudu/util/subprocess.h" + #include "kudu/util/test_util.h" using std::string; @@ -105,4 +109,22 @@ TEST_F(SubprocessTest, TestKill) { ASSERT_EQ(SIGKILL, WTERMSIG(wait_status)); } +// Writes enough bytes to stdout and stderr concurrently that if Call() were +// fully reading them one at a time, the test would deadlock. +TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) { + // Set an alarm to break out of any potential deadlocks (if the implementation + // regresses). + alarm(60); + + string stdout; + string stderr; + ASSERT_OK(Subprocess::Call({ + "/bin/bash", + "-c", + "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &" + "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &" + "wait" + }, &stdout, &stderr)); +} + } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess.cc ---------------------------------------------------------------------- diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc index a00b66c..01425a2 100644 --- a/src/kudu/util/subprocess.cc +++ b/src/kudu/util/subprocess.cc @@ -19,19 +19,21 @@ #include <dirent.h> #include <fcntl.h> -#include <glog/logging.h> -#include <glog/stl_logging.h> -#include <memory> #include <signal.h> -#include <string> +#if defined(__linux__) +#include <sys/prctl.h> +#endif #include <sys/types.h> #include <sys/wait.h> #include <unistd.h> + +#include <memory> +#include <string> #include <vector> -#if defined(__linux__) -#include <sys/prctl.h> -#endif +#include <ev++.h> +#include <glog/logging.h> +#include <glog/stl_logging.h> #include "kudu/gutil/once.h" #include "kudu/gutil/port.h" @@ -43,6 +45,7 @@ #include "kudu/util/errno.h" #include "kudu/util/status.h" +using std::unique_ptr; using std::shared_ptr; using std::string; using std::vector; @@ -144,6 +147,98 @@ void CloseNonStandardFDs(DIR* fd_dir) { } } +void RedirectToDevNull(int fd) { + // We must not close stderr or stdout, because then when a new file descriptor + // gets opened, it might get that fd number. (We always allocate the lowest + // available file descriptor number.) Instead, we reopen that fd as + // /dev/null. + int dev_null = open("/dev/null", O_WRONLY); + if (dev_null < 0) { + PLOG(WARNING) << "failed to open /dev/null"; + } else { + PCHECK(dup2(dev_null, fd)); + } +} + +// Stateful libev watcher to help ReadFdsFully(). +class ReadFdsFullyHelper { + public: + ReadFdsFullyHelper(const string& progname, ev::dynamic_loop* loop, int fd) + : progname_(progname) { + // Bind the watcher to the provided loop, to this functor, and to the + // readable fd. + watcher_.set(*loop); + watcher_.set(this); + watcher_.set(fd, ev::READ); + + // The watcher will now be polled when its loop is run. + watcher_.start(); + } + + void operator() (ev::io &w, int revents) { + DCHECK_EQ(ev::READ, revents); + + char buf[1024]; + ssize_t n = read(w.fd, buf, arraysize(buf)); + if (n == 0) { + // EOF, stop watching. + w.stop(); + } else if (n < 0) { + // Interrupted by a signal, do nothing. + if (errno == EINTR) return; + + // A fatal error. Store it and stop watching. + status_ = Status::IOError("IO error reading from " + progname_, + ErrnoToString(errno), errno); + w.stop(); + } else { + // Add our bytes and keep watching. + output_.append(buf, n); + } + } + + const Status& status() const { return status_; } + const string& output() const { return output_; } + + private: + const string progname_; + + ev::io watcher_; + string output_; + Status status_; +}; + +// Reads from all descriptors in 'fds' until EOF on all of them. If any read +// yields an error, it is returned. Otherwise, 'out' contains the bytes read +// for each fd, in the same order as was in 'fds'. +Status ReadFdsFully(const string& progname, + const vector<int>& fds, + vector<string>* out) { + ev::dynamic_loop loop; + + // Set up a watcher for each fd. + vector<unique_ptr<ReadFdsFullyHelper>> helpers; + for (int fd : fds) { + helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd)); + } + + // This will read until all fds return EOF. + loop.run(); + + // Check for failures. + for (const auto& h : helpers) { + if (!h->status().ok()) { + return h->status(); + } + } + + // No failures; write the output to the caller. + for (const auto& h : helpers) { + out->push_back(h->output()); + } + return Status::OK(); +} + } // anonymous namespace Subprocess::Subprocess(string program, vector<string> argv) @@ -194,19 +289,6 @@ void Subprocess::DisableStdout() { fd_state_[STDOUT_FILENO] = DISABLED; } -static void RedirectToDevNull(int fd) { - // We must not close stderr or stdout, because then when a new file descriptor - // gets opened, it might get that fd number. (We always allocate the lowest - // available file descriptor number.) Instead, we reopen that fd as - // /dev/null. - int dev_null = open("/dev/null", O_WRONLY); - if (dev_null < 0) { - PLOG(WARNING) << "failed to open /dev/null"; - } else { - PCHECK(dup2(dev_null, fd)); - } -} - #if defined(__APPLE__) static int pipe2(int pipefd[2], int flags) { DCHECK_EQ(O_CLOEXEC, flags); @@ -369,50 +451,43 @@ Status Subprocess::Kill(int signal) { Status Subprocess::Call(const string& arg_str) { vector<string> argv = Split(arg_str, " "); - return Call(argv); + return Call(argv, nullptr, nullptr); } -Status Subprocess::Call(const vector<string>& argv) { +Status Subprocess::Call(const vector<string>& argv, + string* stdout_out, + string* stderr_out) { VLOG(2) << "Invoking command: " << argv; - Subprocess proc(argv[0], argv); - RETURN_NOT_OK(proc.Start()); - int retcode; - RETURN_NOT_OK(proc.Wait(&retcode)); + Subprocess p(argv[0], argv); - if (retcode == 0) { - return Status::OK(); - } else { - return Status::RuntimeError(Substitute( - "Subprocess '$0' terminated with non-zero exit status $1", - argv[0], - retcode)); + if (stdout_out) { + p.ShareParentStdout(false); } -} - -Status Subprocess::Call(const vector<string>& argv, string* stdout_out) { - VLOG(2) << "Invoking command: " << argv; - Subprocess p(argv[0], argv); - p.ShareParentStdout(false); - RETURN_NOT_OK_PREPEND(p.Start(), "Unable to fork " + argv[0]); + if (stderr_out) { + p.ShareParentStderr(false); + } + RETURN_NOT_OK_PREPEND(p.Start(), + "Unable to fork " + argv[0]); int err = close(p.ReleaseChildStdinFd()); if (PREDICT_FALSE(err != 0)) { return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno); } - stdout_out->clear(); - char buf[1024]; - while (true) { - ssize_t n = read(p.from_child_stdout_fd(), buf, arraysize(buf)); - if (n == 0) { - // EOF - break; - } - if (n < 0) { - if (errno == EINTR) continue; - return Status::IOError("IO error reading from " + argv[0], ErrnoToString(errno), errno); - } + vector<int> fds; + if (stdout_out) { + fds.push_back(p.from_child_stdout_fd()); + } + if (stderr_out) { + fds.push_back(p.from_child_stderr_fd()); + } + vector<string> out; + RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &out)); - stdout_out->append(buf, n); + if (stdout_out) { + *stdout_out = std::move(out[0]); + } + if (stderr_out) { + *stderr_out = std::move(out[1]); } int retcode; http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess.h ---------------------------------------------------------------------- diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h index 6889b6a..1b70b1a 100644 --- a/src/kudu/util/subprocess.h +++ b/src/kudu/util/subprocess.h @@ -17,10 +17,11 @@ #ifndef KUDU_UTIL_SUBPROCESS_H #define KUDU_UTIL_SUBPROCESS_H -#include <glog/logging.h> #include <string> #include <vector> +#include <glog/logging.h> + #include "kudu/gutil/macros.h" #include "kudu/util/status.h" @@ -97,12 +98,12 @@ class Subprocess { // Same as above, but accepts a vector that includes the path to the // executable as argv[0] and the arguments to the program in argv[1..n]. - static Status Call(const std::vector<std::string>& argv); - - // Same as above, but collects the output from the child process stdout into - // 'stdout_out'. + // + // Also collects the output from the child process stdout and stderr into + // 'stdout_out' and 'stderr_out' respectively. static Status Call(const std::vector<std::string>& argv, - std::string* stdout_out); + std::string* stdout_out = nullptr, + std::string* stderr_out = nullptr); // Return the pipe fd to the child's standard stream. // Stream should not be disabled or shared.
