subprocess: allow Call() to read both stdout and stderr

I'm going to use this in a new integration test for the tool.

Since the parent is now reading from two pipes, it needs to do so more
carefully. For example, if it read from stdout fully before looking at
stderr, both it and the child would deadlock if the child wrote 64k bytes
to the stderr pipe, hit the kernel limit, and got blocked.

I played around with an implementation based on poll(), but ultimately found
this one (based on libev) to be simpler.

Change-Id: If5f2be94c2e5cc0644a5bb2340adc4a71d844247
Reviewed-on: http://gerrit.cloudera.org:8080/4057
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/666ae1ed
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/666ae1ed
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/666ae1ed

Branch: refs/heads/master
Commit: 666ae1ed1f47fab7a67f0f53f48ccaeee7f11654
Parents: effcd2c
Author: Adar Dembo <[email protected]>
Authored: Thu Aug 18 20:45:55 2016 -0700
Committer: Todd Lipcon <[email protected]>
Committed: Thu Aug 25 05:32:48 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt     |   1 +
 src/kudu/util/subprocess-test.cc |  26 ++++-
 src/kudu/util/subprocess.cc      | 181 ++++++++++++++++++++++++----------
 src/kudu/util/subprocess.h       |  13 +--
 4 files changed, 160 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index a4527b9..82e7b91 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -197,6 +197,7 @@ set(UTIL_LIBS
   glog
   gutil
   histogram_proto
+  libev
   maintenance_manager_proto
   pb_util_proto
   protobuf

http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc
index 6397f7c..31b47fa 100644
--- a/src/kudu/util/subprocess-test.cc
+++ b/src/kudu/util/subprocess-test.cc
@@ -15,11 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <vector>
+#include "kudu/util/subprocess.h"
+
+#include <unistd.h>
+
 #include <string>
+#include <vector>
 
 #include <gtest/gtest.h>
-#include "kudu/util/subprocess.h"
+
 #include "kudu/util/test_util.h"
 
 using std::string;
@@ -105,4 +109,22 @@ TEST_F(SubprocessTest, TestKill) {
   ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
 }
 
+// Writes enough bytes to stdout and stderr concurrently that if Call() were
+// fully reading them one at a time, the test would deadlock.
+TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
+  // Set an alarm to break out of any potential deadlocks (if the 
implementation
+  // regresses).
+  alarm(60);
+
+  string stdout;
+  string stderr;
+  ASSERT_OK(Subprocess::Call({
+    "/bin/bash",
+    "-c",
+    "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &"
+    "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &"
+    "wait"
+  }, &stdout, &stderr));
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index a00b66c..01425a2 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -19,19 +19,21 @@
 
 #include <dirent.h>
 #include <fcntl.h>
-#include <glog/logging.h>
-#include <glog/stl_logging.h>
-#include <memory>
 #include <signal.h>
-#include <string>
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif
 #include <sys/types.h>
 #include <sys/wait.h>
 #include <unistd.h>
+
+#include <memory>
+#include <string>
 #include <vector>
 
-#if defined(__linux__)
-#include <sys/prctl.h>
-#endif
+#include <ev++.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
 
 #include "kudu/gutil/once.h"
 #include "kudu/gutil/port.h"
@@ -43,6 +45,7 @@
 #include "kudu/util/errno.h"
 #include "kudu/util/status.h"
 
+using std::unique_ptr;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -144,6 +147,98 @@ void CloseNonStandardFDs(DIR* fd_dir) {
   }
 }
 
+void RedirectToDevNull(int fd) {
+  // We must not close stderr or stdout, because then when a new file 
descriptor
+  // gets opened, it might get that fd number.  (We always allocate the lowest
+  // available file descriptor number.)  Instead, we reopen that fd as
+  // /dev/null.
+  int dev_null = open("/dev/null", O_WRONLY);
+  if (dev_null < 0) {
+    PLOG(WARNING) << "failed to open /dev/null";
+  } else {
+    PCHECK(dup2(dev_null, fd));
+  }
+}
+
+// Stateful libev watcher to help ReadFdsFully().
+class ReadFdsFullyHelper {
+ public:
+  ReadFdsFullyHelper(const string& progname, ev::dynamic_loop* loop, int fd)
+      : progname_(progname) {
+    // Bind the watcher to the provided loop, to this functor, and to the
+    // readable fd.
+    watcher_.set(*loop);
+    watcher_.set(this);
+    watcher_.set(fd, ev::READ);
+
+    // The watcher will now be polled when its loop is run.
+    watcher_.start();
+  }
+
+  void operator() (ev::io &w, int revents) {
+    DCHECK_EQ(ev::READ, revents);
+
+    char buf[1024];
+    ssize_t n = read(w.fd, buf, arraysize(buf));
+    if (n == 0) {
+      // EOF, stop watching.
+      w.stop();
+    } else if (n < 0) {
+      // Interrupted by a signal, do nothing.
+      if (errno == EINTR) return;
+
+      // A fatal error. Store it and stop watching.
+      status_ = Status::IOError("IO error reading from " + progname_,
+                                ErrnoToString(errno), errno);
+      w.stop();
+    } else {
+      // Add our bytes and keep watching.
+      output_.append(buf, n);
+    }
+  }
+
+  const Status& status() const { return status_; }
+  const string& output() const { return output_; }
+
+ private:
+  const string progname_;
+
+  ev::io watcher_;
+  string output_;
+  Status status_;
+};
+
+// Reads from all descriptors in 'fds' until EOF on all of them. If any read
+// yields an error, it is returned. Otherwise, 'out' contains the bytes read
+// for each fd, in the same order as was in 'fds'.
+Status ReadFdsFully(const string& progname,
+                    const vector<int>& fds,
+                    vector<string>* out) {
+  ev::dynamic_loop loop;
+
+  // Set up a watcher for each fd.
+  vector<unique_ptr<ReadFdsFullyHelper>> helpers;
+  for (int fd : fds) {
+    helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd));
+  }
+
+  // This will read until all fds return EOF.
+  loop.run();
+
+  // Check for failures.
+  for (const auto& h : helpers) {
+    if (!h->status().ok()) {
+      return h->status();
+    }
+  }
+
+  // No failures; write the output to the caller.
+  for (const auto& h : helpers) {
+    out->push_back(h->output());
+  }
+  return Status::OK();
+}
+
 } // anonymous namespace
 
 Subprocess::Subprocess(string program, vector<string> argv)
@@ -194,19 +289,6 @@ void Subprocess::DisableStdout() {
   fd_state_[STDOUT_FILENO] = DISABLED;
 }
 
-static void RedirectToDevNull(int fd) {
-  // We must not close stderr or stdout, because then when a new file 
descriptor
-  // gets opened, it might get that fd number.  (We always allocate the lowest
-  // available file descriptor number.)  Instead, we reopen that fd as
-  // /dev/null.
-  int dev_null = open("/dev/null", O_WRONLY);
-  if (dev_null < 0) {
-    PLOG(WARNING) << "failed to open /dev/null";
-  } else {
-    PCHECK(dup2(dev_null, fd));
-  }
-}
-
 #if defined(__APPLE__)
 static int pipe2(int pipefd[2], int flags) {
   DCHECK_EQ(O_CLOEXEC, flags);
@@ -369,50 +451,43 @@ Status Subprocess::Kill(int signal) {
 
 Status Subprocess::Call(const string& arg_str) {
   vector<string> argv = Split(arg_str, " ");
-  return Call(argv);
+  return Call(argv, nullptr, nullptr);
 }
 
-Status Subprocess::Call(const vector<string>& argv) {
+Status Subprocess::Call(const vector<string>& argv,
+                        string* stdout_out,
+                        string* stderr_out) {
   VLOG(2) << "Invoking command: " << argv;
-  Subprocess proc(argv[0], argv);
-  RETURN_NOT_OK(proc.Start());
-  int retcode;
-  RETURN_NOT_OK(proc.Wait(&retcode));
+  Subprocess p(argv[0], argv);
 
-  if (retcode == 0) {
-    return Status::OK();
-  } else {
-    return Status::RuntimeError(Substitute(
-        "Subprocess '$0' terminated with non-zero exit status $1",
-        argv[0],
-        retcode));
+  if (stdout_out) {
+    p.ShareParentStdout(false);
   }
-}
-
-Status Subprocess::Call(const vector<string>& argv, string* stdout_out) {
-  VLOG(2) << "Invoking command: " << argv;
-  Subprocess p(argv[0], argv);
-  p.ShareParentStdout(false);
-  RETURN_NOT_OK_PREPEND(p.Start(), "Unable to fork " + argv[0]);
+  if (stderr_out) {
+    p.ShareParentStderr(false);
+  }
+  RETURN_NOT_OK_PREPEND(p.Start(),
+                        "Unable to fork " + argv[0]);
   int err = close(p.ReleaseChildStdinFd());
   if (PREDICT_FALSE(err != 0)) {
     return Status::IOError("Unable to close child process stdin", 
ErrnoToString(errno), errno);
   }
 
-  stdout_out->clear();
-  char buf[1024];
-  while (true) {
-    ssize_t n = read(p.from_child_stdout_fd(), buf, arraysize(buf));
-    if (n == 0) {
-      // EOF
-      break;
-    }
-    if (n < 0) {
-      if (errno == EINTR) continue;
-      return Status::IOError("IO error reading from " + argv[0], 
ErrnoToString(errno), errno);
-    }
+  vector<int> fds;
+  if (stdout_out) {
+    fds.push_back(p.from_child_stdout_fd());
+  }
+  if (stderr_out) {
+    fds.push_back(p.from_child_stderr_fd());
+  }
+  vector<string> out;
+  RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &out));
 
-    stdout_out->append(buf, n);
+  if (stdout_out) {
+    *stdout_out = std::move(out[0]);
+  }
+  if (stderr_out) {
+    *stderr_out = std::move(out[1]);
   }
 
   int retcode;

http://git-wip-us.apache.org/repos/asf/kudu/blob/666ae1ed/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index 6889b6a..1b70b1a 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -17,10 +17,11 @@
 #ifndef KUDU_UTIL_SUBPROCESS_H
 #define KUDU_UTIL_SUBPROCESS_H
 
-#include <glog/logging.h>
 #include <string>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
 
@@ -97,12 +98,12 @@ class Subprocess {
 
   // Same as above, but accepts a vector that includes the path to the
   // executable as argv[0] and the arguments to the program in argv[1..n].
-  static Status Call(const std::vector<std::string>& argv);
-
-  // Same as above, but collects the output from the child process stdout into
-  // 'stdout_out'.
+  //
+  // Also collects the output from the child process stdout and stderr into
+  // 'stdout_out' and 'stderr_out' respectively.
   static Status Call(const std::vector<std::string>& argv,
-                     std::string* stdout_out);
+                     std::string* stdout_out = nullptr,
+                     std::string* stderr_out = nullptr);
 
   // Return the pipe fd to the child's standard stream.
   // Stream should not be disabled or shared.

Reply via email to