http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess-test.cc 
b/be/src/kudu/util/subprocess-test.cc
new file mode 100644
index 0000000..fb3d183
--- /dev/null
+++ b/be/src/kudu/util/subprocess-test.cc
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <unistd.h>
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+class SubprocessTest : public KuduTest {};
+
+TEST_F(SubprocessTest, TestSimplePipe) {
+  Subprocess p({ "/usr/bin/tr", "a-z", "A-Z" });
+  p.ShareParentStdout(false);
+  ASSERT_OK(p.Start());
+
+  FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+  PCHECK(out);
+  FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+  PCHECK(in);
+
+  fprintf(out, "hello world\n");
+  // We have to close 'out' or else tr won't write any output, since
+  // it enters a buffered mode if it detects that its input is a FIFO.
+  fclose(out);
+
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("HELLO WORLD\n", &buf[0]);
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFEXITED(wait_status));
+  ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestErrPipe) {
+  Subprocess p({ "/usr/bin/tee", "/dev/stderr" });
+  p.ShareParentStderr(false);
+  ASSERT_OK(p.Start());
+
+  FILE* out = fdopen(p.ReleaseChildStdinFd(), "w");
+  PCHECK(out);
+
+  fprintf(out, "Hello, World\n");
+  fclose(out); // same reasoning as above, flush to prevent tee buffering
+
+  FILE* in = fdopen(p.from_child_stderr_fd(), "r");
+  PCHECK(in);
+
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("Hello, World\n", &buf[0]);
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFEXITED(wait_status));
+  ASSERT_EQ(0, WEXITSTATUS(wait_status));
+}
+
+TEST_F(SubprocessTest, TestKill) {
+  Subprocess p({ "/bin/cat" });
+  ASSERT_OK(p.Start());
+
+  ASSERT_OK(p.Kill(SIGKILL));
+
+  int wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFSIGNALED(wait_status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+
+  // Test that calling Wait() a second time returns the same
+  // cached value instead of trying to wait on some other process
+  // that was assigned the same pid.
+  wait_status = 0;
+  ASSERT_OK(p.Wait(&wait_status));
+  ASSERT_TRUE(WIFSIGNALED(wait_status));
+  ASSERT_EQ(SIGKILL, WTERMSIG(wait_status));
+}
+
+// Writes enough bytes to stdout and stderr concurrently that if Call() were
+// fully reading them one at a time, the test would deadlock.
+TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) {
+  // Set an alarm to break out of any potential deadlocks (if the 
implementation
+  // regresses).
+  alarm(60);
+
+  string stdout;
+  string stderr;
+  ASSERT_OK(Subprocess::Call({
+    "/bin/bash",
+    "-c",
+    "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &"
+    "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &"
+    "wait"
+  }, "", &stdout, &stderr));
+}
+
+// Test that environment variables can be passed to the subprocess.
+TEST_F(SubprocessTest, TestEnvVars) {
+  Subprocess p({ "/bin/bash", "-c", "echo $FOO" });
+  p.SetEnvVars({{"FOO", "bar"}});
+  p.ShareParentStdout(false);
+  ASSERT_OK(p.Start());
+  FILE* in = fdopen(p.from_child_stdout_fd(), "r");
+  PCHECK(in);
+  char buf[1024];
+  ASSERT_EQ(buf, fgets(buf, sizeof(buf), in));
+  ASSERT_STREQ("bar\n", &buf[0]);
+  ASSERT_OK(p.Wait());
+}
+
+// Tests writing to the subprocess stdin.
+TEST_F(SubprocessTest, TestCallWithStdin) {
+  string stdout;
+  ASSERT_OK(Subprocess::Call({ "/bin/bash" },
+                             "echo \"quick brown fox\"",
+                             &stdout));
+  EXPECT_EQ("quick brown fox\n", stdout);
+}
+
+// Test KUDU-1674: '/bin/bash -c "echo"' command below is expected to
+// capture a string on stderr. This test validates that passing
+// stderr alone doesn't result in SIGSEGV as reported in the bug and
+// also check for sanity of stderr in the output.
+TEST_F(SubprocessTest, TestReadSingleFD) {
+  string stderr;
+  const string str = "ApacheKudu";
+  const string cmd_str = Substitute("/bin/echo -n $0 1>&2", str);
+  ASSERT_OK(Subprocess::Call({"/bin/sh", "-c", cmd_str}, "", nullptr, 
&stderr));
+  ASSERT_EQ(stderr, str);
+
+  // Also sanity check other combinations.
+  string stdout;
+  ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/null"}, "", &stdout, nullptr));
+  ASSERT_STR_CONTAINS(stdout, "/dev/null");
+
+  ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/zero"}, "", nullptr, nullptr));
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
+  Subprocess p({ "/bin/sh", "-c", "exit 0" });
+  ASSERT_OK(p.Start());
+  ASSERT_OK(p.Wait());
+  int exit_status;
+  string exit_info;
+  ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+  ASSERT_EQ(0, exit_status);
+  ASSERT_STR_CONTAINS(exit_info, "process successfully exited");
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusExitFailure) {
+  static const vector<int> kStatusCodes = { 1, 255 };
+  for (auto code : kStatusCodes) {
+    Subprocess p({ "/bin/sh", "-c", Substitute("exit $0", code) });
+    ASSERT_OK(p.Start());
+    ASSERT_OK(p.Wait());
+    int exit_status;
+    string exit_info;
+    ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+    ASSERT_EQ(code, exit_status);
+    ASSERT_STR_CONTAINS(exit_info,
+                        Substitute("process exited with non-zero status $0",
+                                   exit_status));
+  }
+}
+
+TEST_F(SubprocessTest, TestGetExitStatusSignaled) {
+  static const vector<int> kSignals = {
+    SIGHUP,
+    SIGABRT,
+    SIGKILL,
+    SIGTERM,
+    SIGUSR2,
+  };
+  for (auto signum : kSignals) {
+    Subprocess p({ "/bin/cat" });
+    ASSERT_OK(p.Start());
+    ASSERT_OK(p.Kill(signum));
+    ASSERT_OK(p.Wait());
+    int exit_status;
+    string exit_info;
+    ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info));
+    EXPECT_EQ(signum, exit_status);
+    ASSERT_STR_CONTAINS(exit_info, Substitute("process exited on signal $0",
+                                              signum));
+  }
+}
+
+TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) {
+  string kTestFile = GetTestPath("foo");
+
+  // Start a subprocess that creates kTestFile immediately and deletes it on 
exit.
+  //
+  // Note: it's important that the shell not invoke a command while waiting
+  // to be killed (i.e. "sleep 60"); if it did, the signal could be delivered
+  // just after the command starts but just before the shell decides to forward
+  // signals to it, and we wind up with a deadlock.
+  vector<string> argv = {
+      "/bin/bash",
+      "-c",
+      Substitute(
+          // Delete kTestFile on exit.
+          "trap \"rm $0\" EXIT;"
+          // Create kTestFile on start.
+          "touch $0;"
+          // Spin in a tight loop waiting to be killed.
+          "while true;"
+          "  do FOO=$$((FOO + 1));"
+          "done", kTestFile)
+  };
+
+  {
+    Subprocess s(argv);
+    ASSERT_OK(s.Start());
+    AssertEventually([&]{
+        ASSERT_TRUE(env_->FileExists(kTestFile));
+    });
+  }
+
+  // The subprocess went out of scope and was killed with SIGKILL, so it left
+  // kTestFile behind.
+  ASSERT_TRUE(env_->FileExists(kTestFile));
+
+  ASSERT_OK(env_->DeleteFile(kTestFile));
+  {
+    Subprocess s(argv, SIGTERM);
+    ASSERT_OK(s.Start());
+    AssertEventually([&]{
+        ASSERT_TRUE(env_->FileExists(kTestFile));
+    });
+  }
+
+  // The subprocess was killed with SIGTERM, giving it a chance to delete 
kTestFile.
+  ASSERT_FALSE(env_->FileExists(kTestFile));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.cc b/be/src/kudu/util/subprocess.cc
new file mode 100644
index 0000000..ec032cd
--- /dev/null
+++ b/be/src/kudu/util/subprocess.cc
@@ -0,0 +1,707 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/subprocess.h"
+
+#include <dirent.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdio.h>
+#if defined(__linux__)
+#include <sys/prctl.h>
+#endif
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <unistd.h>
+
+#include <functional>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <ev++.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Split;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
+
+namespace kudu {
+
+// Make glog's STL-compatible operators visible inside this namespace.
+using ::operator<<;
+
+namespace {
+
+static double kProcessWaitTimeoutSeconds = 5.0;
+
+static const char* kProcSelfFd =
+#if defined(__APPLE__)
+  "/dev/fd";
+#else
+  "/proc/self/fd";
+#endif // defined(__APPLE__)
+
+#if defined(__linux__)
+#define READDIR readdir64
+#define DIRENT dirent64
+#else
+#define READDIR readdir
+#define DIRENT dirent
+#endif
+
+// Since opendir() calls malloc(), this must be called before fork().
+// This function is not async-signal-safe.
+Status OpenProcFdDir(DIR** dir) {
+  *dir = opendir(kProcSelfFd);
+  if (PREDICT_FALSE(dir == nullptr)) {
+    return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd),
+                           ErrnoToString(errno), errno);
+  }
+  return Status::OK();
+}
+
+// Close the directory stream opened by OpenProcFdDir().
+// This function is not async-signal-safe.
+void CloseProcFdDir(DIR* dir) {
+  if (PREDICT_FALSE(closedir(dir) == -1)) {
+    LOG(WARNING) << "Unable to close fd dir: "
+                 << Status::IOError(Substitute("closedir(\"$0\") failed", 
kProcSelfFd),
+                                    ErrnoToString(errno), errno).ToString();
+  }
+}
+
+// Close all open file descriptors other than stdin, stderr, stdout.
+// Expects a directory stream created by OpenProdFdDir() as a parameter.
+// This function is called after fork() and must not call malloc().
+// The rule of thumb is to only call async-signal-safe functions in such cases
+// if at all possible.
+void CloseNonStandardFDs(DIR* fd_dir) {
+  // This is implemented by iterating over the open file descriptors
+  // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone
+  // since it may not represent the highest open fd if the fd soft limit
+  // has changed since the process started. This should also be faster
+  // since iterating over all possible fds is likely to cause 64k+ syscalls
+  // in typical configurations.
+  //
+  // Note also that this doesn't use any of the Env utility functions, to
+  // make it as lean and mean as possible -- this runs in the subprocess
+  // after a fork, so there's some possibility that various global locks
+  // inside malloc() might be held, so allocating memory is a no-no.
+  PCHECK(fd_dir != nullptr);
+  int dir_fd = dirfd(fd_dir);
+
+  struct DIRENT* ent;
+  // readdir64() is not reentrant (it uses a static buffer) and it also
+  // locks fd_dir->lock, so it must not be called in a multi-threaded
+  // environment and is certainly not async-signal-safe.
+  // However, it appears to be safe to call right after fork(), since only one
+  // thread exists in the child process at that time. It also does not call
+  // malloc() or free(). We could use readdir64_r() instead, but all that
+  // buys us is reentrancy, and not async-signal-safety, due to the use of
+  // dir->lock, so seems not worth the added complexity in lifecycle & 
plumbing.
+  while ((ent = READDIR(fd_dir)) != nullptr) {
+    uint32_t fd;
+    if (!safe_strtou32(ent->d_name, &fd)) continue;
+    if (!(fd == STDIN_FILENO  ||
+          fd == STDOUT_FILENO ||
+          fd == STDERR_FILENO ||
+          fd == dir_fd))  {
+      close(fd);
+    }
+  }
+}
+
+void RedirectToDevNull(int fd) {
+  // We must not close stderr or stdout, because then when a new file 
descriptor
+  // gets opened, it might get that fd number.  (We always allocate the lowest
+  // available file descriptor number.)  Instead, we reopen that fd as
+  // /dev/null.
+  int dev_null = open("/dev/null", O_WRONLY);
+  if (dev_null < 0) {
+    PLOG(WARNING) << "failed to open /dev/null";
+  } else {
+    PCHECK(dup2(dev_null, fd));
+  }
+}
+
+// Stateful libev watcher to help ReadFdsFully().
+class ReadFdsFullyHelper {
+ public:
+  ReadFdsFullyHelper(const string& progname, ev::dynamic_loop* loop, int fd)
+      : progname_(progname) {
+    // Bind the watcher to the provided loop, to this functor, and to the
+    // readable fd.
+    watcher_.set(*loop);
+    watcher_.set(this);
+    watcher_.set(fd, ev::READ);
+
+    // The watcher will now be polled when its loop is run.
+    watcher_.start();
+  }
+
+  void operator() (ev::io &w, int revents) {
+    DCHECK_EQ(ev::READ, revents);
+
+    char buf[1024];
+    ssize_t n = read(w.fd, buf, arraysize(buf));
+    if (n == 0) {
+      // EOF, stop watching.
+      w.stop();
+    } else if (n < 0) {
+      // Interrupted by a signal, do nothing.
+      if (errno == EINTR) return;
+
+      // A fatal error. Store it and stop watching.
+      status_ = Status::IOError("IO error reading from " + progname_,
+                                ErrnoToString(errno), errno);
+      w.stop();
+    } else {
+      // Add our bytes and keep watching.
+      output_.append(buf, n);
+    }
+  }
+
+  const Status& status() const { return status_; }
+  const string& output() const { return output_; }
+
+ private:
+  const string progname_;
+
+  ev::io watcher_;
+  string output_;
+  Status status_;
+};
+
+// Reads from all descriptors in 'fds' until EOF on all of them. If any read
+// yields an error, it is returned. Otherwise, 'out' contains the bytes read
+// for each fd, in the same order as was in 'fds'.
+Status ReadFdsFully(const string& progname,
+                    const vector<int>& fds,
+                    vector<string>* out) {
+  ev::dynamic_loop loop;
+
+  // Set up a watcher for each fd.
+  vector<unique_ptr<ReadFdsFullyHelper>> helpers;
+  for (int fd : fds) {
+    helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd));
+  }
+
+  // This will read until all fds return EOF.
+  loop.run();
+
+  // Check for failures.
+  for (const auto& h : helpers) {
+    if (!h->status().ok()) {
+      return h->status();
+    }
+  }
+
+  // No failures; write the output to the caller.
+  for (const auto& h : helpers) {
+    out->push_back(h->output());
+  }
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+Subprocess::Subprocess(vector<string> argv, int sig_on_destruct)
+    : program_(argv[0]),
+      argv_(std::move(argv)),
+      state_(kNotStarted),
+      child_pid_(-1),
+      fd_state_(),
+      child_fds_(),
+      sig_on_destruct_(sig_on_destruct) {
+  // By convention, the first argument in argv is the base name of the program.
+  argv_[0] = BaseName(argv_[0]);
+
+  fd_state_[STDIN_FILENO]   = PIPED;
+  fd_state_[STDOUT_FILENO]  = SHARED;
+  fd_state_[STDERR_FILENO]  = SHARED;
+  child_fds_[STDIN_FILENO]  = -1;
+  child_fds_[STDOUT_FILENO] = -1;
+  child_fds_[STDERR_FILENO] = -1;
+}
+
+Subprocess::~Subprocess() {
+  if (state_ == kRunning) {
+    LOG(WARNING) << Substitute(
+        "Child process $0 ($1) was orphaned. Sending signal $2...",
+        child_pid_, JoinStrings(argv_, " "), sig_on_destruct_);
+    WARN_NOT_OK(KillAndWait(sig_on_destruct_),
+                Substitute("Failed to KillAndWait() with signal $0",
+                           sig_on_destruct_));
+  }
+
+  for (int i = 0; i < 3; ++i) {
+    if (fd_state_[i] == PIPED && child_fds_[i] >= 0) {
+      close(child_fds_[i]);
+    }
+  }
+}
+
+void Subprocess::DisableStderr() {
+  CHECK_EQ(state_, kNotStarted);
+  fd_state_[STDERR_FILENO] = DISABLED;
+}
+
+void Subprocess::DisableStdout() {
+  CHECK_EQ(state_, kNotStarted);
+  fd_state_[STDOUT_FILENO] = DISABLED;
+}
+
+#if defined(__APPLE__)
+static int pipe2(int pipefd[2], int flags) {
+  DCHECK_EQ(O_CLOEXEC, flags);
+
+  int new_fds[2];
+  if (pipe(new_fds) == -1) {
+    return -1;
+  }
+  if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) {
+    close(new_fds[0]);
+    close(new_fds[1]);
+    return -1;
+  }
+  if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) {
+    close(new_fds[0]);
+    close(new_fds[1]);
+    return -1;
+  }
+  pipefd[0] = new_fds[0];
+  pipefd[1] = new_fds[1];
+  return 0;
+}
+#endif
+
+Status Subprocess::Start() {
+  VLOG(2) << "Invoking command: " << argv_;
+  if (state_ != kNotStarted) {
+    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  if (argv_.empty()) {
+    return Status::InvalidArgument("argv must have at least one elem");
+  }
+
+  // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes.
+  IgnoreSigPipe();
+
+  vector<char*> argv_ptrs;
+  for (const string& arg : argv_) {
+    argv_ptrs.push_back(const_cast<char*>(arg.c_str()));
+  }
+  argv_ptrs.push_back(nullptr);
+
+  // Pipe from caller process to child's stdin
+  // [0] = stdin for child, [1] = how parent writes to it
+  int child_stdin[2] = {-1, -1};
+  if (fd_state_[STDIN_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0);
+  }
+  // Pipe from child's stdout back to caller process
+  // [0] = how parent reads from child's stdout, [1] = how child writes to it
+  int child_stdout[2] = {-1, -1};
+  if (fd_state_[STDOUT_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0);
+  }
+  // Pipe from child's stderr back to caller process
+  // [0] = how parent reads from child's stderr, [1] = how child writes to it
+  int child_stderr[2] = {-1, -1};
+  if (fd_state_[STDERR_FILENO] == PIPED) {
+    PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0);
+  }
+  // The synchronization pipe: this trick is to make sure the parent returns
+  // control only after the child process has invoked execvp().
+  int sync_pipe[2];
+  PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0);
+
+  DIR* fd_dir = nullptr;
+  RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir");
+  unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir,
+                                                           CloseProcFdDir);
+  int ret = fork();
+  if (ret == -1) {
+    return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno);
+  }
+  if (ret == 0) { // We are the child
+    // Send the child a SIGTERM when the parent dies. This is done as early
+    // as possible in the child's life to prevent any orphaning whatsoever
+    // (e.g. from KUDU-402).
+#if defined(__linux__)
+    // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways
+    // to prevent orphans when parent is killed.
+    prctl(PR_SET_PDEATHSIG, SIGKILL);
+#endif
+
+    // stdin
+    if (fd_state_[STDIN_FILENO] == PIPED) {
+      PCHECK(dup2(child_stdin[0], STDIN_FILENO) == STDIN_FILENO);
+    }
+
+    // stdout
+    switch (fd_state_[STDOUT_FILENO]) {
+      case PIPED: {
+        PCHECK(dup2(child_stdout[1], STDOUT_FILENO) == STDOUT_FILENO);
+        break;
+      }
+      case DISABLED: {
+        RedirectToDevNull(STDOUT_FILENO);
+        break;
+      }
+      default:
+        break;
+    }
+
+    // stderr
+    switch (fd_state_[STDERR_FILENO]) {
+      case PIPED: {
+        PCHECK(dup2(child_stderr[1], STDERR_FILENO) == STDERR_FILENO);
+        break;
+      }
+      case DISABLED: {
+        RedirectToDevNull(STDERR_FILENO);
+        break;
+      }
+      default:
+        break;
+    }
+
+    // Close the read side of the sync pipe;
+    // the write side should be closed upon execvp().
+    PCHECK(close(sync_pipe[0]) == 0);
+
+    CloseNonStandardFDs(fd_dir);
+
+    // Ensure we are not ignoring or blocking signals in the child process.
+    ResetAllSignalMasksToUnblocked();
+
+    // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its
+    // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we
+    // don't explicitly ignore any other signals in Kudu.
+    ResetSigPipeHandlerToDefault();
+
+    // Set the environment for the subprocess. This is more portable than
+    // using execvpe(), which doesn't exist on OS X. We rely on the 'p'
+    // variant of exec to do $PATH searching if the executable specified
+    // by the caller isn't an absolute path.
+    for (const auto& env : env_) {
+      ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* 
overwrite */));
+    }
+
+    execvp(program_.c_str(), &argv_ptrs[0]);
+    int err = errno;
+    PLOG(ERROR) << "Couldn't exec " << program_;
+    _exit(err);
+  } else {
+    // We are the parent
+    child_pid_ = ret;
+    // Close child's side of the pipes
+    if (fd_state_[STDIN_FILENO]  == PIPED) close(child_stdin[0]);
+    if (fd_state_[STDOUT_FILENO] == PIPED) close(child_stdout[1]);
+    if (fd_state_[STDERR_FILENO] == PIPED) close(child_stderr[1]);
+    // Keep parent's side of the pipes
+    child_fds_[STDIN_FILENO]  = child_stdin[1];
+    child_fds_[STDOUT_FILENO] = child_stdout[0];
+    child_fds_[STDERR_FILENO] = child_stderr[0];
+
+    // Wait for the child process to invoke execvp(). The trick involves
+    // a pipe with O_CLOEXEC option for its descriptors. The parent process
+    // performs blocking read from the pipe while the write side of the pipe
+    // is kept open by the child (it does not write any data, though). The 
write
+    // side of the pipe is closed when the child invokes execvp(). At that
+    // point, the parent should receive EOF, i.e. read() should return 0.
+    {
+      // Close the write side of the sync pipe. It's crucial to make sure
+      // it succeeds otherwise the blocking read() below might wait forever
+      // even if the child process has closed the pipe.
+      PCHECK(close(sync_pipe[1]) == 0);
+      while (true) {
+        uint8_t buf;
+        int err = 0;
+        const int rc = read(sync_pipe[0], &buf, 1);
+        if (rc == -1) {
+          err = errno;
+          if (err == EINTR) {
+            // Retry in case of a signal.
+            continue;
+          }
+        }
+        PCHECK(close(sync_pipe[0]) == 0);
+        if (rc == 0) {
+          // That's OK -- expecting EOF from the other side of the pipe.
+          break;
+        } else if (rc == -1) {
+          // Other errors besides EINTR are not expected.
+          return Status::RuntimeError("Unexpected error from the sync pipe",
+                                      ErrnoToString(err), err);
+        }
+        // No data is expected from the sync pipe.
+        LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc);
+      }
+    }
+  }
+
+  state_ = kRunning;
+  return Status::OK();
+}
+
+Status Subprocess::Wait(int* wait_status) {
+  return DoWait(wait_status, BLOCKING);
+}
+
+Status Subprocess::WaitNoBlock(int* wait_status) {
+  return DoWait(wait_status, NON_BLOCKING);
+}
+
+Status Subprocess::Kill(int signal) {
+  if (state_ != kRunning) {
+    const string err_str = "Sub-process is not running";
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  if (kill(child_pid_, signal) != 0) {
+    return Status::RuntimeError("Unable to kill",
+                                ErrnoToString(errno),
+                                errno);
+  }
+  return Status::OK();
+}
+
+Status Subprocess::KillAndWait(int signal) {
+  string procname = Substitute("$0 (pid $1)", argv0(), pid());
+
+  // This is a fatal error because all errors in Kill() are signal-independent,
+  // so Kill(SIGKILL) is just as likely to fail if this did.
+  RETURN_NOT_OK_PREPEND(
+      Kill(signal), Substitute("Failed to send signal $0 to $1",
+                               signal, procname));
+  if (signal == SIGKILL) {
+    RETURN_NOT_OK_PREPEND(
+        Wait(), Substitute("Failed to wait on $0", procname));
+  } else {
+    Status s;
+    Stopwatch sw;
+    sw.start();
+    do {
+      s = WaitNoBlock();
+      if (s.ok()) {
+        break;
+      } else if (!s.IsTimedOut()) {
+        // An unexpected error in WaitNoBlock() is likely to manifest 
repeatedly,
+        // so there's no point in retrying this.
+        RETURN_NOT_OK_PREPEND(
+            s, Substitute("Unexpected failure while waiting on $0", procname));
+      }
+      SleepFor(MonoDelta::FromMilliseconds(10));
+    } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds);
+    if (s.IsTimedOut()) {
+      return KillAndWait(SIGKILL);
+    }
+  }
+  return Status::OK();
+}
+
+Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const {
+  if (state_ != kExited) {
+    const string err_str = "Sub-process termination hasn't yet been detected";
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+  string info;
+  int status;
+  if (WIFEXITED(wait_status_)) {
+    status = WEXITSTATUS(wait_status_);
+    if (status == 0) {
+      info = Substitute("$0: process successfully exited", program_);
+    } else {
+      info = Substitute("$0: process exited with non-zero status $1",
+                        program_, status);
+    }
+  } else if (WIFSIGNALED(wait_status_)) {
+    // Using signal number as exit status.
+    status = WTERMSIG(wait_status_);
+    info = Substitute("$0: process exited on signal $1", program_, status);
+#if defined(WCOREDUMP)
+    if (WCOREDUMP(wait_status_)) {
+      SubstituteAndAppend(&info, " (core dumped)");
+    }
+#endif
+  } else {
+    status = -1;
+    info = Substitute("$0: process reported unexpected wait status $1",
+                      program_, wait_status_);
+    LOG(DFATAL) << info;
+  }
+  if (exit_status) {
+    *exit_status = status;
+  }
+  if (info_str) {
+    *info_str = info;
+  }
+  return Status::OK();
+}
+
+Status Subprocess::Call(const string& arg_str) {
+  vector<string> argv = Split(arg_str, " ");
+  return Call(argv, "", nullptr, nullptr);
+}
+
+Status Subprocess::Call(const vector<string>& argv,
+                        const string& stdin_in,
+                        string* stdout_out,
+                        string* stderr_out) {
+  Subprocess p(argv);
+
+  if (stdout_out) {
+    p.ShareParentStdout(false);
+  }
+  if (stderr_out) {
+    p.ShareParentStderr(false);
+  }
+  RETURN_NOT_OK_PREPEND(p.Start(),
+                        "Unable to fork " + argv[0]);
+
+  if (!stdin_in.empty() &&
+      write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size()) < 
stdin_in.size()) {
+    return Status::IOError("Unable to write to child process stdin", 
ErrnoToString(errno), errno);
+  }
+
+  int err = close(p.ReleaseChildStdinFd());
+  if (PREDICT_FALSE(err != 0)) {
+    return Status::IOError("Unable to close child process stdin", 
ErrnoToString(errno), errno);
+  }
+
+  vector<int> fds;
+  if (stdout_out) {
+    fds.push_back(p.from_child_stdout_fd());
+  }
+  if (stderr_out) {
+    fds.push_back(p.from_child_stderr_fd());
+  }
+  vector<string> outv;
+  RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv));
+
+  // Given that ReadFdsFully captures the strings in the order in which we
+  // had installed 'fds' above, it can be assured that we can receive
+  // as many strings as there were 'fds' in the vector and in that order.
+  CHECK_EQ(outv.size(), fds.size());
+  if (stdout_out) {
+    *stdout_out = std::move(outv.front());
+  }
+  if (stderr_out) {
+    *stderr_out = std::move(outv.back());
+  }
+
+  RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]);
+  int exit_status;
+  string exit_info_str;
+  RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str));
+  if (exit_status != 0) {
+    return Status::RuntimeError(exit_info_str);
+  }
+  return Status::OK();
+}
+
+pid_t Subprocess::pid() const {
+  CHECK_EQ(state_, kRunning);
+  return child_pid_;
+}
+
+Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
+  if (state_ == kExited) {
+    if (wait_status) {
+      *wait_status = wait_status_;
+    }
+    return Status::OK();
+  }
+  if (state_ != kRunning) {
+    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    LOG(DFATAL) << err_str;
+    return Status::IllegalState(err_str);
+  }
+
+  const int options = (mode == NON_BLOCKING) ? WNOHANG : 0;
+  int status;
+  const int rc = waitpid(child_pid_, &status, options);
+  if (rc == -1) {
+    return Status::RuntimeError("Unable to wait on child",
+                                ErrnoToString(errno), errno);
+  }
+  if (mode == NON_BLOCKING && rc == 0) {
+    return Status::TimedOut("");
+  }
+  CHECK_EQ(rc, child_pid_);
+  CHECK(WIFEXITED(status) || WIFSIGNALED(status));
+
+  child_pid_ = -1;
+  wait_status_ = status;
+  state_ = kExited;
+  if (wait_status) {
+    *wait_status = status;
+  }
+  return Status::OK();
+}
+
+void Subprocess::SetEnvVars(std::map<std::string, std::string> env) {
+  env_ = std::move(env);
+}
+
+void Subprocess::SetFdShared(int stdfd, bool share) {
+  CHECK_EQ(state_, kNotStarted);
+  CHECK_NE(fd_state_[stdfd], DISABLED);
+  fd_state_[stdfd] = share? SHARED : PIPED;
+}
+
+int Subprocess::CheckAndOffer(int stdfd) const {
+  CHECK_EQ(state_, kRunning);
+  CHECK_EQ(fd_state_[stdfd], PIPED);
+  return child_fds_[stdfd];
+}
+
+int Subprocess::ReleaseChildFd(int stdfd) {
+  CHECK_EQ(state_, kRunning);
+  CHECK_GE(child_fds_[stdfd], 0);
+  CHECK_EQ(fd_state_[stdfd], PIPED);
+  int ret = child_fds_[stdfd];
+  child_fds_[stdfd] = -1;
+  return ret;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.h b/be/src/kudu/util/subprocess.h
new file mode 100644
index 0000000..9834e3a
--- /dev/null
+++ b/be/src/kudu/util/subprocess.h
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SUBPROCESS_H
+#define KUDU_UTIL_SUBPROCESS_H
+
+#include <signal.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wrapper around a spawned subprocess.
+//
+// program will be treated as an absolute path unless it begins with a dot or a
+// slash.
+//
+// This takes care of creating pipes to/from the subprocess and offers
+// basic functionality to wait on it or send signals.
+// By default, child process only has stdin captured and separate from the 
parent.
+// The stdout/stderr streams are shared with the parent by default.
+//
+// The process may only be started and waited on/killed once.
+//
+// Optionally, user may change parent/child stream sharing. Also, a user may 
disable
+// a subprocess stream. A user cannot do both.
+//
+// Note that, when the Subprocess object is destructed, the child process
+// will be forcibly SIGKILLed to avoid orphaning processes.
+class Subprocess {
+ public:
+  // Constructs a new Subprocess that will execute 'argv' on Start().
+  //
+  // If the process isn't explicitly killed, 'sig_on_destroy' will be delivered
+  // to it when the Subprocess goes out of scope.
+  explicit Subprocess(std::vector<std::string> argv, int sig_on_destruct = 
SIGKILL);
+  ~Subprocess();
+
+  // Disable subprocess stream output.  Must be called before subprocess 
starts.
+  void DisableStderr();
+  void DisableStdout();
+
+  // Share a stream with parent. Must be called before subprocess starts.
+  // Cannot set sharing at all if stream is disabled
+  void ShareParentStdin(bool  share = true) { SetFdShared(STDIN_FILENO,  
share); }
+  void ShareParentStdout(bool share = true) { SetFdShared(STDOUT_FILENO, 
share); }
+  void ShareParentStderr(bool share = true) { SetFdShared(STDERR_FILENO, 
share); }
+
+  // Add environment variables to be set before executing the subprocess.
+  //
+  // These environment variables are merged into the existing environment
+  // of the parent process. In other words, there is no need to prime this
+  // map with the current environment; instead, just specify any variables
+  // that should be overridden.
+  //
+  // Repeated calls to this function replace earlier calls.
+  void SetEnvVars(std::map<std::string, std::string> env);
+
+  // Start the subprocess. Can only be called once.
+  //
+  // This returns a bad Status if the fork() fails. However,
+  // note that if the executable path was incorrect such that
+  // exec() fails, this will still return Status::OK. You must
+  // use Wait() to check for failure.
+  Status Start() WARN_UNUSED_RESULT;
+
+  // Wait for the subprocess to exit. The return value is the same as
+  // that of the waitpid() syscall. Only call after starting.
+  //
+  // NOTE: unlike the standard wait(2) call, this may be called multiple
+  // times. If the process has exited, it will repeatedly return the same
+  // exit code.
+  Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+  // Like the above, but does not block. This returns Status::TimedOut
+  // immediately if the child has not exited. Otherwise returns Status::OK
+  // and sets *ret. Only call after starting.
+  //
+  // NOTE: unlike the standard wait(2) call, this may be called multiple
+  // times. If the process has exited, it will repeatedly return the same
+  // exit code.
+  Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT;
+
+  // Send a signal to the subprocess.
+  // Note that this does not reap the process -- you must still Wait()
+  // in order to reap it. Only call after starting.
+  Status Kill(int signal) WARN_UNUSED_RESULT;
+
+  // Sends a signal to the subprocess and waits for it to exit.
+  //
+  // If the signal is not SIGKILL and the process doesn't appear to be exiting,
+  // retries with SIGKILL.
+  Status KillAndWait(int signal);
+
+  // Retrieve exit status of the process awaited by Wait() and/or WaitNoBlock()
+  // methods. Must be called only after calling Wait()/WaitNoBlock().
+  Status GetExitStatus(int* exit_status, std::string* info_str = nullptr) const
+      WARN_UNUSED_RESULT;
+
+  // Helper method that creates a Subprocess, issues a Start() then a Wait().
+  // Expects a blank-separated list of arguments, with the first being the
+  // full path to the executable.
+  // The returned Status will only be OK if all steps were successful and
+  // the return code was 0.
+  static Status Call(const std::string& arg_str) WARN_UNUSED_RESULT;
+
+  // Same as above, but accepts a vector that includes the path to the
+  // executable as argv[0] and the arguments to the program in argv[1..n].
+  //
+  // Writes the value of 'stdin_in' to the subprocess' stdin. The length of
+  // 'stdin_in' should be limited to 64kib.
+  //
+  // Also collects the output from the child process stdout and stderr into
+  // 'stdout_out' and 'stderr_out' respectively.
+  static Status Call(const std::vector<std::string>& argv,
+                     const std::string& stdin_in = "",
+                     std::string* stdout_out = nullptr,
+                     std::string* stderr_out = nullptr) WARN_UNUSED_RESULT;
+
+  // Return the pipe fd to the child's standard stream.
+  // Stream should not be disabled or shared.
+  int to_child_stdin_fd()    const { return CheckAndOffer(STDIN_FILENO); }
+  int from_child_stdout_fd() const { return CheckAndOffer(STDOUT_FILENO); }
+  int from_child_stderr_fd() const { return CheckAndOffer(STDERR_FILENO); }
+
+  // Release control of the file descriptor for the child's stream, only if 
piped.
+  // Writes to this FD show up on stdin in the subprocess
+  int ReleaseChildStdinFd()  { return ReleaseChildFd(STDIN_FILENO ); }
+  // Reads from this FD come from stdout of the subprocess
+  int ReleaseChildStdoutFd() { return ReleaseChildFd(STDOUT_FILENO); }
+  // Reads from this FD come from stderr of the subprocess
+  int ReleaseChildStderrFd() { return ReleaseChildFd(STDERR_FILENO); }
+
+  pid_t pid() const;
+  const std::string& argv0() const { return argv_[0]; }
+
+ private:
+  enum State {
+    kNotStarted,
+    kRunning,
+    kExited
+  };
+  enum StreamMode {SHARED, DISABLED, PIPED};
+  enum WaitMode {BLOCKING, NON_BLOCKING};
+
+  Status DoWait(int* wait_status, WaitMode mode) WARN_UNUSED_RESULT;
+  void SetFdShared(int stdfd, bool share);
+  int CheckAndOffer(int stdfd) const;
+  int ReleaseChildFd(int stdfd);
+
+  std::string program_;
+  std::vector<std::string> argv_;
+  std::map<std::string, std::string> env_;
+  State state_;
+  int child_pid_;
+  enum StreamMode fd_state_[3];
+  int child_fds_[3];
+
+  // The cached wait status if Wait()/WaitNoBlock() has been called.
+  // Only valid if state_ == kExited.
+  int wait_status_;
+
+  // Custom signal to deliver when the subprocess goes out of scope, provided
+  // the process hasn't already been killed.
+  int sig_on_destruct_;
+
+  DISALLOW_COPY_AND_ASSIGN(Subprocess);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_SUBPROCESS_H */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_graph.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.cc b/be/src/kudu/util/test_graph.cc
new file mode 100644
index 0000000..052ad9c
--- /dev/null
+++ b/be/src/kudu/util/test_graph.cc
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_graph.h"
+
+#include <glog/logging.h>
+#include <mutex>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::string;
+
+namespace kudu {
+
+void TimeSeries::AddValue(double val) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  val_ += val;
+}
+
+void TimeSeries::SetValue(double val) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  val_ = val;
+}
+
+double TimeSeries::value() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return val_;
+}
+
+TimeSeriesCollector::~TimeSeriesCollector() {
+  if (started_) {
+    StopDumperThread();
+  }
+}
+
+shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string &key) {
+  MutexLock l(series_lock_);
+  SeriesMap::const_iterator it = series_map_.find(key);
+  if (it == series_map_.end()) {
+    shared_ptr<TimeSeries> ts(new TimeSeries());
+    series_map_[key] = ts;
+    return ts;
+  } else {
+    return (*it).second;
+  }
+}
+
+void TimeSeriesCollector::StartDumperThread() {
+  LOG(INFO) << "Starting metrics dumper";
+  CHECK(!started_);
+  exit_latch_.Reset(1);
+  started_ = true;
+  CHECK_OK(kudu::Thread::Create("time series", "dumper",
+      &TimeSeriesCollector::DumperThread, this, &dumper_thread_));
+}
+
+void TimeSeriesCollector::StopDumperThread() {
+  CHECK(started_);
+  exit_latch_.CountDown();
+  CHECK_OK(ThreadJoiner(dumper_thread_.get()).Join());
+  started_ = false;
+}
+
+void TimeSeriesCollector::DumperThread() {
+  CHECK(started_);
+  WallTime start_time = WallTime_Now();
+
+  faststring metrics_str;
+  while (true) {
+    metrics_str.clear();
+    metrics_str.append("metrics: ");
+    BuildMetricsString(WallTime_Now() - start_time, &metrics_str);
+    LOG(INFO) << metrics_str.ToString();
+
+    // Sleep until next dump time, or return if we should exit
+    if (exit_latch_.WaitFor(MonoDelta::FromMilliseconds(250))) {
+      return;
+    }
+  }
+}
+
+void TimeSeriesCollector::BuildMetricsString(
+  WallTime time_since_start, faststring *dst_buf) const {
+  MutexLock l(series_lock_);
+
+  dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f",
+                               scope_.c_str(), time_since_start));
+
+  for (SeriesMap::const_reference entry : series_map_) {
+    dst_buf->append(StringPrintf(", \"%s\": %.3f",
+                                 entry.first.c_str(),  entry.second->value()));
+  }
+  dst_buf->append("}");
+}
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_graph.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_graph.h b/be/src/kudu/util/test_graph.h
new file mode 100644
index 0000000..6ea49ca
--- /dev/null
+++ b/be/src/kudu/util/test_graph.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_TEST_GRAPH_COLLECTOR_H
+#define KUDU_TEST_GRAPH_COLLECTOR_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+class TimeSeries {
+ public:
+  void AddValue(double val);
+  void SetValue(double val);
+
+  double value() const;
+
+ private:
+  friend class TimeSeriesCollector;
+
+  DISALLOW_COPY_AND_ASSIGN(TimeSeries);
+
+  TimeSeries() :
+    val_(0)
+  {}
+
+  mutable simple_spinlock lock_;
+  double val_;
+};
+
+class TimeSeriesCollector {
+ public:
+  explicit TimeSeriesCollector(std::string scope)
+      : scope_(std::move(scope)), exit_latch_(0), started_(false) {}
+
+  ~TimeSeriesCollector();
+
+  std::shared_ptr<TimeSeries> GetTimeSeries(const std::string &key);
+  void StartDumperThread();
+  void StopDumperThread();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(TimeSeriesCollector);
+
+  void DumperThread();
+  void BuildMetricsString(WallTime time_since_start, faststring *dst_buf) 
const;
+
+  std::string scope_;
+
+  typedef std::unordered_map<std::string, std::shared_ptr<TimeSeries> > 
SeriesMap;
+  SeriesMap series_map_;
+  mutable Mutex series_lock_;
+
+  scoped_refptr<kudu::Thread> dumper_thread_;
+
+  // Latch used to stop the dumper_thread_. When the thread is started,
+  // this is set to 1, and when the thread should exit, it is counted down.
+  CountDownLatch exit_latch_;
+
+  bool started_;
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_macros.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_macros.h b/be/src/kudu/util/test_macros.h
new file mode 100644
index 0000000..63cae5a
--- /dev/null
+++ b/be/src/kudu/util/test_macros.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_TEST_MACROS_H
+#define KUDU_UTIL_TEST_MACROS_H
+
+#include <gmock/gmock.h>
+#include <string>
+
+// ASSERT_NO_FATAL_FAILURE is just too long to type.
+#define NO_FATALS(expr) \
+  ASSERT_NO_FATAL_FAILURE(expr)
+
+// Detect fatals in the surrounding scope. NO_FATALS() only checks for fatals
+// in the expression passed to it.
+#define NO_PENDING_FATALS() \
+  if (testing::Test::HasFatalFailure()) { return; }
+
+#define ASSERT_OK(status) do { \
+  const Status& _s = status;        \
+  if (_s.ok()) { \
+    SUCCEED(); \
+  } else { \
+    FAIL() << "Bad status: " << _s.ToString();  \
+  } \
+} while (0);
+
+#define EXPECT_OK(status) do { \
+  const Status& _s = status; \
+  if (_s.ok()) { \
+    SUCCEED(); \
+  } else { \
+    ADD_FAILURE() << "Bad status: " << _s.ToString();  \
+  } \
+} while (0);
+
+// Like the above, but doesn't record successful
+// tests.
+#define ASSERT_OK_FAST(status) do { \
+  const Status& _s = status; \
+  if (!_s.ok()) { \
+    FAIL() << "Bad status: " << _s.ToString(); \
+  } \
+} while (0);
+
+// Substring matches.
+#define ASSERT_STR_CONTAINS(str, substr) \
+  ASSERT_THAT(str, testing::HasSubstr(substr))
+
+#define ASSERT_STR_NOT_CONTAINS(str, substr) \
+  ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr)))
+
+// Substring regular expressions in extended regex (POSIX) syntax.
+#define ASSERT_STR_MATCHES(str, pattern) \
+  ASSERT_THAT(str, testing::ContainsRegex(pattern))
+
+#define ASSERT_STR_NOT_MATCHES(str, pattern) \
+  ASSERT_THAT(str, testing::Not(testing::ContainsRegex(pattern)))
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// All strings must match the pattern.
+#define ASSERT_STRINGS_ALL_MATCH(strings, pattern) do { \
+  const auto& _strings = (strings); \
+  const auto& _pattern = (pattern); \
+  int _str_idx = 0; \
+  for (const auto& str : _strings) { \
+    ASSERT_STR_MATCHES(str, _pattern) \
+        << "string " << _str_idx << ": pattern " << _pattern \
+        << " does not match string " << str; \
+    _str_idx++; \
+  } \
+} while (0)
+
+// Batched substring regular expressions in extended regex (POSIX) syntax.
+//
+// At least one string must match the pattern.
+#define ASSERT_STRINGS_ANY_MATCH(strings, pattern) do { \
+  const auto& _strings = (strings); \
+  const auto& _pattern = (pattern); \
+  bool matched = false; \
+  for (const auto& str : _strings) { \
+    if (testing::internal::RE::PartialMatch(str, 
testing::internal::RE(_pattern))) { \
+      matched = true; \
+      break; \
+    } \
+  } \
+  ASSERT_TRUE(matched) \
+      << "not one string matched pattern " << _pattern; \
+} while (0)
+
+#define ASSERT_FILE_EXISTS(env, path) do { \
+  const std::string& _s = path; \
+  ASSERT_TRUE(env->FileExists(_s)) \
+    << "Expected file to exist: " << _s; \
+} while (0);
+
+#define ASSERT_FILE_NOT_EXISTS(env, path) do { \
+  const std::string& _s = path; \
+  ASSERT_FALSE(env->FileExists(_s)) \
+    << "Expected file not to exist: " << _s; \
+} while (0);
+
+#define CURRENT_TEST_NAME() \
+  ::testing::UnitTest::GetInstance()->current_test_info()->name()
+
+#define CURRENT_TEST_CASE_NAME() \
+  ::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_main.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_main.cc b/be/src/kudu/util/test_main.cc
new file mode 100644
index 0000000..fa88f21
--- /dev/null
+++ b/be/src/kudu/util/test_main.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/pstack_watcher.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/minidump.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_timeout_after, 0,
+             "Maximum total seconds allowed for all unit tests in the suite. 
Default: disabled");
+
+DEFINE_int32(stress_cpu_threads, 0,
+             "Number of threads to start that burn CPU in an attempt to "
+             "stimulate race conditions");
+
+namespace kudu {
+
+// Start thread that kills the process if --test_timeout_after is exceeded 
before
+// the tests complete.
+static void CreateAndStartTimeoutThread() {
+  if (FLAGS_test_timeout_after == 0) return;
+
+  // KUDU-1995: if running death tests using EXPECT_EXIT()/ASSERT_EXIT(), LSAN
+  // reports leaks in CreateAndStartTimeoutThread(). Adding a couple of scoped
+  // leak check disablers as a workaround since right now it's not clear what
+  // is going on exactly: LSAN does not report those leaks for tests which run
+  // ASSERT_DEATH(). This does not seem harmful or hiding any potential leaks
+  // since it's scoped and targeted only for this utility thread.
+  debug::ScopedLeakCheckDisabler disabler;
+  std::thread([=](){
+      debug::ScopedLeakCheckDisabler disabler;
+      SleepFor(MonoDelta::FromSeconds(FLAGS_test_timeout_after));
+      // Dump a pstack to stdout.
+      WARN_NOT_OK(PstackWatcher::DumpStacks(), "Unable to print pstack");
+
+      // ...and abort.
+      LOG(FATAL) << "Maximum unit test time exceeded (" << 
FLAGS_test_timeout_after << " sec)";
+    }).detach();
+}
+} // namespace kudu
+
+
+static void StartStressThreads() {
+  for (int i = 0; i < FLAGS_stress_cpu_threads; i++) {
+    std::thread([]{
+        while (true) {
+          // Do something which won't be optimized out.
+          base::subtle::MemoryBarrier();
+        }
+      }).detach();
+  }
+}
+
+int main(int argc, char **argv) {
+  google::InstallFailureSignalHandler();
+
+  // We don't use InitGoogleLoggingSafe() because gtest initializes glog, so we
+  // need to block SIGUSR1 explicitly in order to test minidump generation.
+  CHECK_OK(kudu::BlockSigUSR1());
+
+  // Ignore SIGPIPE for all tests so that threads writing to TLS
+  // sockets do not crash when writing to a closed socket. See KUDU-1910.
+  kudu::IgnoreSigPipe();
+
+  // InitGoogleTest() must precede ParseCommandLineFlags(), as the former
+  // removes gtest-related flags from argv that would trip up the latter.
+  ::testing::InitGoogleTest(&argc, argv);
+  kudu::ParseCommandLineFlags(&argc, &argv, true);
+
+  // Create the test-timeout timer.
+  kudu::CreateAndStartTimeoutThread();
+
+  StartStressThreads();
+
+  // This is called by the KuduTest setup method, but in case we have
+  // any tests that don't inherit from KuduTest, it's helpful to
+  // cover our bases and call it here too.
+  kudu::KuduTest::OverrideKrb5Environment();
+
+  int ret = RUN_ALL_TESTS();
+
+  return ret;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc
new file mode 100644
index 0000000..50c80c6
--- /dev/null
+++ b/be/src/kudu/util/test_util.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/test_util.h"
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <glog/stl_logging.h>
+#include <gtest/gtest-spi.h>
+
+#include "kudu/gutil/strings/strcat.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/spinlock_profiling.h"
+
+DEFINE_string(test_leave_files, "on_failure",
+              "Whether to leave test files around after the test run. "
+              " Valid values are 'always', 'on_failure', or 'never'");
+
+DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests");
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+const char* kInvalidPath = "/dev/invalid-path-for-kudu-tests";
+static const char* const kSlowTestsEnvVariable = "KUDU_ALLOW_SLOW_TESTS";
+
+static const uint64 kTestBeganAtMicros = Env::Default()->NowMicros();
+
+// Global which production code can check to see if it is running
+// in a GTest environment (assuming the test binary links in this module,
+// which is typically a good assumption).
+//
+// This can be checked using the 'IsGTest()' function from test_util_prod.cc.
+bool g_is_gtest = true;
+
+///////////////////////////////////////////////////
+// KuduTest
+///////////////////////////////////////////////////
+
+KuduTest::KuduTest()
+  : env_(Env::Default()),
+    test_dir_(GetTestDataDirectory()) {
+  std::map<const char*, const char*> flags_for_tests = {
+    // Disabling fsync() speeds up tests dramatically, and it's safe to do as 
no
+    // tests rely on cutting power to a machine or equivalent.
+    {"never_fsync", "true"},
+    // Disable log redaction.
+    {"redact", "flag"},
+    // Reduce default RSA key length for faster tests. We are using strong/high
+    // TLS v1.2 cipher suites, so minimum possible for TLS-related RSA keys is
+    // 768 bits. However, for the external mini cluster we use 1024 bits 
because
+    // Java default security policies require at least 1024 bits for RSA keys
+    // used in certificates. For uniformity, here 1024 RSA bit keys are used
+    // as well. As for the TSK keys, 512 bits is the minimum since the SHA256
+    // digest is used for token signing/verification.
+    {"ipki_server_key_size", "1024"},
+    {"ipki_ca_key_size", "1024"},
+    {"tsk_num_rsa_bits", "512"},
+  };
+  for (const auto& e : flags_for_tests) {
+    // We don't check for errors here, because we have some default flags that
+    // only apply to certain tests.
+    google::SetCommandLineOptionWithMode(e.first, e.second, 
google::SET_FLAGS_DEFAULT);
+  }
+}
+
+KuduTest::~KuduTest() {
+  // Clean up the test directory in the destructor instead of a TearDown
+  // method. This is better because it ensures that the child-class
+  // dtor runs first -- so, if the child class is using a minicluster, etc,
+  // we will shut that down before we remove files underneath.
+  if (FLAGS_test_leave_files == "always") {
+    LOG(INFO) << "-----------------------------------------------";
+    LOG(INFO) << "--test_leave_files specified, leaving files in " << 
test_dir_;
+  } else if (FLAGS_test_leave_files == "on_failure" && HasFatalFailure()) {
+    LOG(INFO) << "-----------------------------------------------";
+    LOG(INFO) << "Had fatal failures, leaving test files at " << test_dir_;
+  } else {
+    VLOG(1) << "Cleaning up temporary test files...";
+    WARN_NOT_OK(env_->DeleteRecursively(test_dir_),
+                "Couldn't remove test files");
+  }
+}
+
+void KuduTest::SetUp() {
+  InitSpinLockContentionProfiling();
+  OverrideKrb5Environment();
+}
+
+string KuduTest::GetTestPath(const string& relative_path) {
+  return JoinPathSegments(test_dir_, relative_path);
+}
+
+void KuduTest::OverrideKrb5Environment() {
+  // Set these variables to paths that definitely do not exist and
+  // couldn't be accidentally created.
+  //
+  // Note that if we were to set these to /dev/null, we end up triggering a 
leak in krb5
+  // when it tries to read an empty file as a ticket cache, whereas 
non-existent files
+  // don't have this issue. See MIT krb5 bug #8509.
+  //
+  // NOTE: we don't simply *unset* the variables, because then we'd still pick 
up
+  // the user's /etc/krb5.conf and other default locations.
+  setenv("KRB5_CONFIG", kInvalidPath, 1);
+  setenv("KRB5_KTNAME", kInvalidPath, 1);
+  setenv("KRB5CCNAME", kInvalidPath, 1);
+  // Enable the workaround for MIT krb5 1.10 bugs from krb5_realm_override.cc.
+  setenv("KUDU_ENABLE_KRB5_REALM_FIX", "yes", 1);
+}
+
+///////////////////////////////////////////////////
+// Test utility functions
+///////////////////////////////////////////////////
+
+bool AllowSlowTests() {
+  char *e = getenv(kSlowTestsEnvVariable);
+  if ((e == nullptr) ||
+      (strlen(e) == 0) ||
+      (strcasecmp(e, "false") == 0) ||
+      (strcasecmp(e, "0") == 0) ||
+      (strcasecmp(e, "no") == 0)) {
+    return false;
+  }
+  if ((strcasecmp(e, "true") == 0) ||
+      (strcasecmp(e, "1") == 0) ||
+      (strcasecmp(e, "yes") == 0)) {
+    return true;
+  }
+  LOG(FATAL) << "Unrecognized value for " << kSlowTestsEnvVariable << ": " << 
e;
+  return false;
+}
+
+void OverrideFlagForSlowTests(const std::string& flag_name,
+                              const std::string& new_value) {
+  // Ensure that the flag is valid.
+  google::GetCommandLineFlagInfoOrDie(flag_name.c_str());
+
+  // If we're not running slow tests, don't override it.
+  if (!AllowSlowTests()) {
+    return;
+  }
+  google::SetCommandLineOptionWithMode(flag_name.c_str(), new_value.c_str(),
+                                       google::SET_FLAG_IF_DEFAULT);
+}
+
+int SeedRandom() {
+  int seed;
+  // Initialize random seed
+  if (FLAGS_test_random_seed == 0) {
+    // Not specified by user
+    seed = static_cast<int>(GetCurrentTimeMicros());
+  } else {
+    seed = FLAGS_test_random_seed;
+  }
+  LOG(INFO) << "Using random seed: " << seed;
+  srand(seed);
+  return seed;
+}
+
+string GetTestDataDirectory() {
+  const ::testing::TestInfo* const test_info =
+    ::testing::UnitTest::GetInstance()->current_test_info();
+  CHECK(test_info) << "Must be running in a gtest unit test to call this 
function";
+  string dir;
+  CHECK_OK(Env::Default()->GetTestDirectory(&dir));
+
+  // The directory name includes some strings for specific reasons:
+  // - program name: identifies the directory to the test invoker
+  // - timestamp and pid: disambiguates with prior runs of the same test
+  //
+  // e.g. "env-test.TestEnv.TestReadFully.1409169025392361-23600"
+  dir += Substitute("/$0.$1.$2.$3-$4",
+    StringReplace(google::ProgramInvocationShortName(), "/", "_", true),
+    StringReplace(test_info->test_case_name(), "/", "_", true),
+    StringReplace(test_info->name(), "/", "_", true),
+    kTestBeganAtMicros,
+    getpid());
+  Status s = Env::Default()->CreateDir(dir);
+  CHECK(s.IsAlreadyPresent() || s.ok())
+    << "Could not create directory " << dir << ": " << s.ToString();
+  if (s.ok()) {
+    string metadata;
+
+    StrAppend(&metadata, Substitute("PID=$0\n", getpid()));
+
+    StrAppend(&metadata, Substitute("PPID=$0\n", getppid()));
+
+    char* jenkins_build_id = getenv("BUILD_ID");
+    if (jenkins_build_id) {
+      StrAppend(&metadata, Substitute("BUILD_ID=$0\n", jenkins_build_id));
+    }
+
+    CHECK_OK(WriteStringToFile(Env::Default(), metadata,
+                               Substitute("$0/test_metadata", dir)));
+  }
+  return dir;
+}
+
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout) {
+  const MonoTime deadline = MonoTime::Now() + timeout;
+
+  for (int attempts = 0; MonoTime::Now() < deadline; attempts++) {
+    // Capture any assertion failures within this scope (i.e. from their 
function)
+    // into 'results'
+    testing::TestPartResultArray results;
+    testing::ScopedFakeTestPartResultReporter reporter(
+        
testing::ScopedFakeTestPartResultReporter::INTERCEPT_ONLY_CURRENT_THREAD,
+        &results);
+    f();
+
+    // Determine whether their function produced any new test failure results.
+    bool has_failures = false;
+    for (int i = 0; i < results.size(); i++) {
+      has_failures |= results.GetTestPartResult(i).failed();
+    }
+    if (!has_failures) {
+      return;
+    }
+
+    // If they had failures, sleep and try again.
+    int sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+    SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+  }
+
+  // If we ran out of time looping, run their function one more time
+  // without capturing its assertions. This way the assertions will
+  // propagate back out to the normal test reporter. Of course it's
+  // possible that it will pass on this last attempt, but that's OK
+  // too, since we aren't trying to be that strict about the deadline.
+  f();
+  if (testing::Test::HasFatalFailure()) {
+    ADD_FAILURE() << "Timed out waiting for assertion to pass.";
+  }
+}
+
+int CountOpenFds(Env* env) {
+  static const char* kProcSelfFd =
+#if defined(__APPLE__)
+    "/dev/fd";
+#else
+    "/proc/self/fd";
+#endif // defined(__APPLE__)
+
+  vector<string> children;
+  CHECK_OK(env->GetChildren(kProcSelfFd, &children));
+  int num_fds = 0;
+  for (const auto& c : children) {
+    // Skip '.' and '..'.
+    if (c == "." || c == "..") {
+      continue;
+    }
+    num_fds++;
+  }
+
+  // Exclude the fd opened to iterate over kProcSelfFd.
+  return num_fds - 1;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h
new file mode 100644
index 0000000..061fb3a
--- /dev/null
+++ b/be/src/kudu/util/test_util.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Base test class, with various utility functions.
+#ifndef KUDU_UTIL_TEST_UTIL_H
+#define KUDU_UTIL_TEST_UTIL_H
+
+#include <functional>
+#include <gtest/gtest.h>
+#include <string>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_macros.h"
+
+#define ASSERT_EVENTUALLY(expr) do { \
+  AssertEventually(expr); \
+  NO_PENDING_FATALS(); \
+} while (0)
+
+namespace kudu {
+
+extern const char* kInvalidPath;
+
+class KuduTest : public ::testing::Test {
+ public:
+  KuduTest();
+
+  virtual ~KuduTest();
+
+  virtual void SetUp() OVERRIDE;
+
+  // Tests assume that they run with no outside-provided kerberos credentials, 
and if the
+  // user happened to have some credentials available they might fail due to 
being already
+  // kinitted to a different realm, etc. This function overrides the relevant 
environment
+  // variables so that we don't pick up the user's credentials.
+  static void OverrideKrb5Environment();
+
+ protected:
+  // Returns absolute path based on a unit test-specific work directory, given
+  // a relative path. Useful for writing test files that should be deleted 
after
+  // the test ends.
+  std::string GetTestPath(const std::string& relative_path);
+
+  Env* env_;
+  google::FlagSaver flag_saver_;  // Reset flags on every test.
+  std::string test_dir_;
+};
+
+// Returns true if slow tests are runtime-enabled.
+bool AllowSlowTests();
+
+// Override the given gflag to the new value, only in the case that
+// slow tests are enabled and the user hasn't otherwise overridden
+// it on the command line.
+// Example usage:
+//
+// OverrideFlagForSlowTests(
+//     "client_inserts_per_thread",
+//     strings::Substitute("$0", FLAGS_client_inserts_per_thread * 100));
+//
+void OverrideFlagForSlowTests(const std::string& flag_name,
+                              const std::string& new_value);
+
+// Call srand() with a random seed based on the current time, reporting
+// that seed to the logs. The time-based seed may be overridden by passing
+// --test_random_seed= from the CLI in order to reproduce a failed randomized
+// test. Returns the seed.
+int SeedRandom();
+
+// Return a per-test directory in which to store test data. Guaranteed to
+// return the same directory every time for a given unit test.
+//
+// May only be called from within a gtest unit test. Prefer KuduTest::test_dir_
+// if a KuduTest instance is available.
+std::string GetTestDataDirectory();
+
+// Wait until 'f()' succeeds without adding any GTest 'fatal failures'.
+// For example:
+//
+//   AssertEventually([]() {
+//     ASSERT_GT(ReadValueOfMetric(), 10);
+//   });
+//
+// The function is run in a loop with exponential backoff, capped at once
+// a second.
+//
+// To check whether AssertEventually() eventually succeeded, call
+// NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs
+// this check automatically.
+void AssertEventually(const std::function<void(void)>& f,
+                      const MonoDelta& timeout = MonoDelta::FromSeconds(30));
+
+// Count the number of open file descriptors in use by this process.
+int CountOpenFds(Env* env);
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util_prod.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.cc 
b/be/src/kudu/util/test_util_prod.cc
new file mode 100644
index 0000000..5523bac
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.cc
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/test_util_prod.h"
+
+#include <dlfcn.h>
+
+namespace kudu {
+
+bool IsGTest() {
+  return dlsym(RTLD_DEFAULT, "_ZN4kudu10g_is_gtestE") != nullptr;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util_prod.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/test_util_prod.h 
b/be/src/kudu/util/test_util_prod.h
new file mode 100644
index 0000000..8b7ea61
--- /dev/null
+++ b/be/src/kudu/util/test_util_prod.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test-related utility methods that can be called from non-test
+// code. This module is part of the 'util' module and is built into
+// all binaries, not just tests, whereas 'test_util.cc' is linked
+// only into test binaries.
+
+#pragma once
+
+namespace kudu {
+
+// Return true if the current binary is a gtest. More specifically,
+// returns true if the 'test_util.cc' module has been linked in
+// (either dynamically or statically) to the running process.
+bool IsGTest();
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/thread-test.cc b/be/src/kudu/util/thread-test.cc
new file mode 100644
index 0000000..a52f2d4
--- /dev/null
+++ b/be/src/kudu/util/thread-test.cc
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/thread.h"
+
+#include <gtest/gtest.h>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/env.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread_restrictions.h"
+
+using std::string;
+
+namespace kudu {
+
+class ThreadTest : public KuduTest {};
+
+// Join with a thread and emit warnings while waiting to join.
+// This has to be manually verified.
+TEST_F(ThreadTest, TestJoinAndWarn) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+    return;
+  }
+
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, 
&holder));
+  ASSERT_OK(ThreadJoiner(holder.get())
+                   .warn_after_ms(10)
+                   .warn_every_ms(100)
+                   .Join());
+}
+
+TEST_F(ThreadTest, TestFailedJoin) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in quick test mode, since this sleeps";
+    return;
+  }
+
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, 
&holder));
+  Status s = ThreadJoiner(holder.get())
+    .give_up_after_ms(50)
+    .Join();
+  ASSERT_STR_CONTAINS(s.ToString(), "Timed out after 50ms joining on sleeper 
thread");
+}
+
+static void TryJoinOnSelf() {
+  Status s = ThreadJoiner(Thread::current_thread()).Join();
+  // Use CHECK instead of ASSERT because gtest isn't thread-safe.
+  CHECK(s.IsInvalidArgument());
+}
+
+// Try to join on the thread that is currently running.
+TEST_F(ThreadTest, TestJoinOnSelf) {
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "test", TryJoinOnSelf, &holder));
+  holder->Join();
+  // Actual assertion is done by the thread spawned above.
+}
+
+TEST_F(ThreadTest, TestDoubleJoinIsNoOp) {
+  scoped_refptr<Thread> holder;
+  ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder));
+  ThreadJoiner joiner(holder.get());
+  ASSERT_OK(joiner.Join());
+  ASSERT_OK(joiner.Join());
+}
+
+
+namespace {
+
+void ExitHandler(string* s, const char* to_append) {
+  *s += to_append;
+}
+
+void CallAtExitThread(string* s) {
+  Thread::current_thread()->CallAtExit(Bind(&ExitHandler, s, Unretained("hello 
1, ")));
+  Thread::current_thread()->CallAtExit(Bind(&ExitHandler, s, Unretained("hello 
2")));
+}
+
+} // anonymous namespace
+
+TEST_F(ThreadTest, TestCallOnExit) {
+  scoped_refptr<Thread> holder;
+  string s;
+  ASSERT_OK(Thread::Create("test", "TestCallOnExit", CallAtExitThread, &s, 
&holder));
+  holder->Join();
+  ASSERT_EQ("hello 1, hello 2", s);
+}
+
+// The following tests only run in debug mode, since thread restrictions are 
no-ops
+// in release builds.
+#ifndef NDEBUG
+TEST_F(ThreadTest, TestThreadRestrictions_IO) {
+  // Default should be to allow IO
+  ThreadRestrictions::AssertIOAllowed();
+
+  ThreadRestrictions::SetIOAllowed(false);
+  {
+    ThreadRestrictions::ScopedAllowIO allow_io;
+    ASSERT_TRUE(Env::Default()->FileExists("/"));
+  }
+  ThreadRestrictions::SetIOAllowed(true);
+
+  // Disallow IO - doing IO should crash the process.
+  ASSERT_DEATH({
+      ThreadRestrictions::SetIOAllowed(false);
+      ignore_result(Env::Default()->FileExists("/"));
+    },
+    "Function marked as IO-only was called from a thread that disallows IO");
+}
+
+TEST_F(ThreadTest, TestThreadRestrictions_Waiting) {
+  // Default should be to allow IO
+  ThreadRestrictions::AssertWaitAllowed();
+
+  ThreadRestrictions::SetWaitAllowed(false);
+  {
+    ThreadRestrictions::ScopedAllowWait allow_wait;
+    CountDownLatch l(0);
+    l.Wait();
+  }
+  ThreadRestrictions::SetWaitAllowed(true);
+
+  // Disallow waiting - blocking on a latch should crash the process.
+  ASSERT_DEATH({
+      ThreadRestrictions::SetWaitAllowed(false);
+      CountDownLatch l(0);
+      l.Wait();
+    },
+    "Waiting is not allowed to be used on this thread");
+}
+#endif // NDEBUG
+
+} // namespace kudu

Reply via email to