http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/subprocess-test.cc b/be/src/kudu/util/subprocess-test.cc new file mode 100644 index 0000000..fb3d183 --- /dev/null +++ b/be/src/kudu/util/subprocess-test.cc @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/subprocess.h" + +#include <unistd.h> + +#include <string> +#include <vector> + +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/test_util.h" + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { + +class SubprocessTest : public KuduTest {}; + +TEST_F(SubprocessTest, TestSimplePipe) { + Subprocess p({ "/usr/bin/tr", "a-z", "A-Z" }); + p.ShareParentStdout(false); + ASSERT_OK(p.Start()); + + FILE* out = fdopen(p.ReleaseChildStdinFd(), "w"); + PCHECK(out); + FILE* in = fdopen(p.from_child_stdout_fd(), "r"); + PCHECK(in); + + fprintf(out, "hello world\n"); + // We have to close 'out' or else tr won't write any output, since + // it enters a buffered mode if it detects that its input is a FIFO. + fclose(out); + + char buf[1024]; + ASSERT_EQ(buf, fgets(buf, sizeof(buf), in)); + ASSERT_STREQ("HELLO WORLD\n", &buf[0]); + + int wait_status = 0; + ASSERT_OK(p.Wait(&wait_status)); + ASSERT_TRUE(WIFEXITED(wait_status)); + ASSERT_EQ(0, WEXITSTATUS(wait_status)); +} + +TEST_F(SubprocessTest, TestErrPipe) { + Subprocess p({ "/usr/bin/tee", "/dev/stderr" }); + p.ShareParentStderr(false); + ASSERT_OK(p.Start()); + + FILE* out = fdopen(p.ReleaseChildStdinFd(), "w"); + PCHECK(out); + + fprintf(out, "Hello, World\n"); + fclose(out); // same reasoning as above, flush to prevent tee buffering + + FILE* in = fdopen(p.from_child_stderr_fd(), "r"); + PCHECK(in); + + char buf[1024]; + ASSERT_EQ(buf, fgets(buf, sizeof(buf), in)); + ASSERT_STREQ("Hello, World\n", &buf[0]); + + int wait_status = 0; + ASSERT_OK(p.Wait(&wait_status)); + ASSERT_TRUE(WIFEXITED(wait_status)); + ASSERT_EQ(0, WEXITSTATUS(wait_status)); +} + +TEST_F(SubprocessTest, TestKill) { + Subprocess p({ "/bin/cat" }); + ASSERT_OK(p.Start()); + + ASSERT_OK(p.Kill(SIGKILL)); + + int wait_status = 0; + ASSERT_OK(p.Wait(&wait_status)); + ASSERT_TRUE(WIFSIGNALED(wait_status)); + ASSERT_EQ(SIGKILL, WTERMSIG(wait_status)); + + // Test that calling Wait() a second time returns the same + // cached value instead of trying to wait on some other process + // that was assigned the same pid. + wait_status = 0; + ASSERT_OK(p.Wait(&wait_status)); + ASSERT_TRUE(WIFSIGNALED(wait_status)); + ASSERT_EQ(SIGKILL, WTERMSIG(wait_status)); +} + +// Writes enough bytes to stdout and stderr concurrently that if Call() were +// fully reading them one at a time, the test would deadlock. +TEST_F(SubprocessTest, TestReadFromStdoutAndStderr) { + // Set an alarm to break out of any potential deadlocks (if the implementation + // regresses). + alarm(60); + + string stdout; + string stderr; + ASSERT_OK(Subprocess::Call({ + "/bin/bash", + "-c", + "dd if=/dev/urandom of=/dev/stdout bs=512 count=2048 &" + "dd if=/dev/urandom of=/dev/stderr bs=512 count=2048 &" + "wait" + }, "", &stdout, &stderr)); +} + +// Test that environment variables can be passed to the subprocess. +TEST_F(SubprocessTest, TestEnvVars) { + Subprocess p({ "/bin/bash", "-c", "echo $FOO" }); + p.SetEnvVars({{"FOO", "bar"}}); + p.ShareParentStdout(false); + ASSERT_OK(p.Start()); + FILE* in = fdopen(p.from_child_stdout_fd(), "r"); + PCHECK(in); + char buf[1024]; + ASSERT_EQ(buf, fgets(buf, sizeof(buf), in)); + ASSERT_STREQ("bar\n", &buf[0]); + ASSERT_OK(p.Wait()); +} + +// Tests writing to the subprocess stdin. +TEST_F(SubprocessTest, TestCallWithStdin) { + string stdout; + ASSERT_OK(Subprocess::Call({ "/bin/bash" }, + "echo \"quick brown fox\"", + &stdout)); + EXPECT_EQ("quick brown fox\n", stdout); +} + +// Test KUDU-1674: '/bin/bash -c "echo"' command below is expected to +// capture a string on stderr. This test validates that passing +// stderr alone doesn't result in SIGSEGV as reported in the bug and +// also check for sanity of stderr in the output. +TEST_F(SubprocessTest, TestReadSingleFD) { + string stderr; + const string str = "ApacheKudu"; + const string cmd_str = Substitute("/bin/echo -n $0 1>&2", str); + ASSERT_OK(Subprocess::Call({"/bin/sh", "-c", cmd_str}, "", nullptr, &stderr)); + ASSERT_EQ(stderr, str); + + // Also sanity check other combinations. + string stdout; + ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/null"}, "", &stdout, nullptr)); + ASSERT_STR_CONTAINS(stdout, "/dev/null"); + + ASSERT_OK(Subprocess::Call({"/bin/ls", "/dev/zero"}, "", nullptr, nullptr)); +} + +TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) { + Subprocess p({ "/bin/sh", "-c", "exit 0" }); + ASSERT_OK(p.Start()); + ASSERT_OK(p.Wait()); + int exit_status; + string exit_info; + ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info)); + ASSERT_EQ(0, exit_status); + ASSERT_STR_CONTAINS(exit_info, "process successfully exited"); +} + +TEST_F(SubprocessTest, TestGetExitStatusExitFailure) { + static const vector<int> kStatusCodes = { 1, 255 }; + for (auto code : kStatusCodes) { + Subprocess p({ "/bin/sh", "-c", Substitute("exit $0", code) }); + ASSERT_OK(p.Start()); + ASSERT_OK(p.Wait()); + int exit_status; + string exit_info; + ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info)); + ASSERT_EQ(code, exit_status); + ASSERT_STR_CONTAINS(exit_info, + Substitute("process exited with non-zero status $0", + exit_status)); + } +} + +TEST_F(SubprocessTest, TestGetExitStatusSignaled) { + static const vector<int> kSignals = { + SIGHUP, + SIGABRT, + SIGKILL, + SIGTERM, + SIGUSR2, + }; + for (auto signum : kSignals) { + Subprocess p({ "/bin/cat" }); + ASSERT_OK(p.Start()); + ASSERT_OK(p.Kill(signum)); + ASSERT_OK(p.Wait()); + int exit_status; + string exit_info; + ASSERT_OK(p.GetExitStatus(&exit_status, &exit_info)); + EXPECT_EQ(signum, exit_status); + ASSERT_STR_CONTAINS(exit_info, Substitute("process exited on signal $0", + signum)); + } +} + +TEST_F(SubprocessTest, TestSubprocessDestroyWithCustomSignal) { + string kTestFile = GetTestPath("foo"); + + // Start a subprocess that creates kTestFile immediately and deletes it on exit. + // + // Note: it's important that the shell not invoke a command while waiting + // to be killed (i.e. "sleep 60"); if it did, the signal could be delivered + // just after the command starts but just before the shell decides to forward + // signals to it, and we wind up with a deadlock. + vector<string> argv = { + "/bin/bash", + "-c", + Substitute( + // Delete kTestFile on exit. + "trap \"rm $0\" EXIT;" + // Create kTestFile on start. + "touch $0;" + // Spin in a tight loop waiting to be killed. + "while true;" + " do FOO=$$((FOO + 1));" + "done", kTestFile) + }; + + { + Subprocess s(argv); + ASSERT_OK(s.Start()); + AssertEventually([&]{ + ASSERT_TRUE(env_->FileExists(kTestFile)); + }); + } + + // The subprocess went out of scope and was killed with SIGKILL, so it left + // kTestFile behind. + ASSERT_TRUE(env_->FileExists(kTestFile)); + + ASSERT_OK(env_->DeleteFile(kTestFile)); + { + Subprocess s(argv, SIGTERM); + ASSERT_OK(s.Start()); + AssertEventually([&]{ + ASSERT_TRUE(env_->FileExists(kTestFile)); + }); + } + + // The subprocess was killed with SIGTERM, giving it a chance to delete kTestFile. + ASSERT_FALSE(env_->FileExists(kTestFile)); +} + +} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/subprocess.cc b/be/src/kudu/util/subprocess.cc new file mode 100644 index 0000000..ec032cd --- /dev/null +++ b/be/src/kudu/util/subprocess.cc @@ -0,0 +1,707 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/subprocess.h" + +#include <dirent.h> +#include <fcntl.h> +#include <signal.h> +#include <stdio.h> +#if defined(__linux__) +#include <sys/prctl.h> +#endif +#include <sys/types.h> +#include <sys/wait.h> +#include <unistd.h> + +#include <functional> +#include <memory> +#include <sstream> +#include <string> +#include <vector> + +#include <ev++.h> +#include <glog/logging.h> +#include <glog/stl_logging.h> + +#include "kudu/gutil/once.h" +#include "kudu/gutil/port.h" +#include "kudu/gutil/strings/join.h" +#include "kudu/gutil/strings/numbers.h" +#include "kudu/gutil/strings/split.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/util/debug-util.h" +#include "kudu/util/errno.h" +#include "kudu/util/monotime.h" +#include "kudu/util/path_util.h" +#include "kudu/util/signal.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/status.h" + +using std::string; +using std::unique_ptr; +using std::vector; +using strings::Split; +using strings::Substitute; +using strings::SubstituteAndAppend; + +namespace kudu { + +// Make glog's STL-compatible operators visible inside this namespace. +using ::operator<<; + +namespace { + +static double kProcessWaitTimeoutSeconds = 5.0; + +static const char* kProcSelfFd = +#if defined(__APPLE__) + "/dev/fd"; +#else + "/proc/self/fd"; +#endif // defined(__APPLE__) + +#if defined(__linux__) +#define READDIR readdir64 +#define DIRENT dirent64 +#else +#define READDIR readdir +#define DIRENT dirent +#endif + +// Since opendir() calls malloc(), this must be called before fork(). +// This function is not async-signal-safe. +Status OpenProcFdDir(DIR** dir) { + *dir = opendir(kProcSelfFd); + if (PREDICT_FALSE(dir == nullptr)) { + return Status::IOError(Substitute("opendir(\"$0\") failed", kProcSelfFd), + ErrnoToString(errno), errno); + } + return Status::OK(); +} + +// Close the directory stream opened by OpenProcFdDir(). +// This function is not async-signal-safe. +void CloseProcFdDir(DIR* dir) { + if (PREDICT_FALSE(closedir(dir) == -1)) { + LOG(WARNING) << "Unable to close fd dir: " + << Status::IOError(Substitute("closedir(\"$0\") failed", kProcSelfFd), + ErrnoToString(errno), errno).ToString(); + } +} + +// Close all open file descriptors other than stdin, stderr, stdout. +// Expects a directory stream created by OpenProdFdDir() as a parameter. +// This function is called after fork() and must not call malloc(). +// The rule of thumb is to only call async-signal-safe functions in such cases +// if at all possible. +void CloseNonStandardFDs(DIR* fd_dir) { + // This is implemented by iterating over the open file descriptors + // rather than using sysconf(SC_OPEN_MAX) -- the latter is error prone + // since it may not represent the highest open fd if the fd soft limit + // has changed since the process started. This should also be faster + // since iterating over all possible fds is likely to cause 64k+ syscalls + // in typical configurations. + // + // Note also that this doesn't use any of the Env utility functions, to + // make it as lean and mean as possible -- this runs in the subprocess + // after a fork, so there's some possibility that various global locks + // inside malloc() might be held, so allocating memory is a no-no. + PCHECK(fd_dir != nullptr); + int dir_fd = dirfd(fd_dir); + + struct DIRENT* ent; + // readdir64() is not reentrant (it uses a static buffer) and it also + // locks fd_dir->lock, so it must not be called in a multi-threaded + // environment and is certainly not async-signal-safe. + // However, it appears to be safe to call right after fork(), since only one + // thread exists in the child process at that time. It also does not call + // malloc() or free(). We could use readdir64_r() instead, but all that + // buys us is reentrancy, and not async-signal-safety, due to the use of + // dir->lock, so seems not worth the added complexity in lifecycle & plumbing. + while ((ent = READDIR(fd_dir)) != nullptr) { + uint32_t fd; + if (!safe_strtou32(ent->d_name, &fd)) continue; + if (!(fd == STDIN_FILENO || + fd == STDOUT_FILENO || + fd == STDERR_FILENO || + fd == dir_fd)) { + close(fd); + } + } +} + +void RedirectToDevNull(int fd) { + // We must not close stderr or stdout, because then when a new file descriptor + // gets opened, it might get that fd number. (We always allocate the lowest + // available file descriptor number.) Instead, we reopen that fd as + // /dev/null. + int dev_null = open("/dev/null", O_WRONLY); + if (dev_null < 0) { + PLOG(WARNING) << "failed to open /dev/null"; + } else { + PCHECK(dup2(dev_null, fd)); + } +} + +// Stateful libev watcher to help ReadFdsFully(). +class ReadFdsFullyHelper { + public: + ReadFdsFullyHelper(const string& progname, ev::dynamic_loop* loop, int fd) + : progname_(progname) { + // Bind the watcher to the provided loop, to this functor, and to the + // readable fd. + watcher_.set(*loop); + watcher_.set(this); + watcher_.set(fd, ev::READ); + + // The watcher will now be polled when its loop is run. + watcher_.start(); + } + + void operator() (ev::io &w, int revents) { + DCHECK_EQ(ev::READ, revents); + + char buf[1024]; + ssize_t n = read(w.fd, buf, arraysize(buf)); + if (n == 0) { + // EOF, stop watching. + w.stop(); + } else if (n < 0) { + // Interrupted by a signal, do nothing. + if (errno == EINTR) return; + + // A fatal error. Store it and stop watching. + status_ = Status::IOError("IO error reading from " + progname_, + ErrnoToString(errno), errno); + w.stop(); + } else { + // Add our bytes and keep watching. + output_.append(buf, n); + } + } + + const Status& status() const { return status_; } + const string& output() const { return output_; } + + private: + const string progname_; + + ev::io watcher_; + string output_; + Status status_; +}; + +// Reads from all descriptors in 'fds' until EOF on all of them. If any read +// yields an error, it is returned. Otherwise, 'out' contains the bytes read +// for each fd, in the same order as was in 'fds'. +Status ReadFdsFully(const string& progname, + const vector<int>& fds, + vector<string>* out) { + ev::dynamic_loop loop; + + // Set up a watcher for each fd. + vector<unique_ptr<ReadFdsFullyHelper>> helpers; + for (int fd : fds) { + helpers.emplace_back(new ReadFdsFullyHelper(progname, &loop, fd)); + } + + // This will read until all fds return EOF. + loop.run(); + + // Check for failures. + for (const auto& h : helpers) { + if (!h->status().ok()) { + return h->status(); + } + } + + // No failures; write the output to the caller. + for (const auto& h : helpers) { + out->push_back(h->output()); + } + return Status::OK(); +} + +} // anonymous namespace + +Subprocess::Subprocess(vector<string> argv, int sig_on_destruct) + : program_(argv[0]), + argv_(std::move(argv)), + state_(kNotStarted), + child_pid_(-1), + fd_state_(), + child_fds_(), + sig_on_destruct_(sig_on_destruct) { + // By convention, the first argument in argv is the base name of the program. + argv_[0] = BaseName(argv_[0]); + + fd_state_[STDIN_FILENO] = PIPED; + fd_state_[STDOUT_FILENO] = SHARED; + fd_state_[STDERR_FILENO] = SHARED; + child_fds_[STDIN_FILENO] = -1; + child_fds_[STDOUT_FILENO] = -1; + child_fds_[STDERR_FILENO] = -1; +} + +Subprocess::~Subprocess() { + if (state_ == kRunning) { + LOG(WARNING) << Substitute( + "Child process $0 ($1) was orphaned. Sending signal $2...", + child_pid_, JoinStrings(argv_, " "), sig_on_destruct_); + WARN_NOT_OK(KillAndWait(sig_on_destruct_), + Substitute("Failed to KillAndWait() with signal $0", + sig_on_destruct_)); + } + + for (int i = 0; i < 3; ++i) { + if (fd_state_[i] == PIPED && child_fds_[i] >= 0) { + close(child_fds_[i]); + } + } +} + +void Subprocess::DisableStderr() { + CHECK_EQ(state_, kNotStarted); + fd_state_[STDERR_FILENO] = DISABLED; +} + +void Subprocess::DisableStdout() { + CHECK_EQ(state_, kNotStarted); + fd_state_[STDOUT_FILENO] = DISABLED; +} + +#if defined(__APPLE__) +static int pipe2(int pipefd[2], int flags) { + DCHECK_EQ(O_CLOEXEC, flags); + + int new_fds[2]; + if (pipe(new_fds) == -1) { + return -1; + } + if (fcntl(new_fds[0], F_SETFD, O_CLOEXEC) == -1) { + close(new_fds[0]); + close(new_fds[1]); + return -1; + } + if (fcntl(new_fds[1], F_SETFD, O_CLOEXEC) == -1) { + close(new_fds[0]); + close(new_fds[1]); + return -1; + } + pipefd[0] = new_fds[0]; + pipefd[1] = new_fds[1]; + return 0; +} +#endif + +Status Subprocess::Start() { + VLOG(2) << "Invoking command: " << argv_; + if (state_ != kNotStarted) { + const string err_str = Substitute("$0: illegal sub-process state", state_); + LOG(DFATAL) << err_str; + return Status::IllegalState(err_str); + } + if (argv_.empty()) { + return Status::InvalidArgument("argv must have at least one elem"); + } + + // We explicitly set SIGPIPE to SIG_IGN here because we are using UNIX pipes. + IgnoreSigPipe(); + + vector<char*> argv_ptrs; + for (const string& arg : argv_) { + argv_ptrs.push_back(const_cast<char*>(arg.c_str())); + } + argv_ptrs.push_back(nullptr); + + // Pipe from caller process to child's stdin + // [0] = stdin for child, [1] = how parent writes to it + int child_stdin[2] = {-1, -1}; + if (fd_state_[STDIN_FILENO] == PIPED) { + PCHECK(pipe2(child_stdin, O_CLOEXEC) == 0); + } + // Pipe from child's stdout back to caller process + // [0] = how parent reads from child's stdout, [1] = how child writes to it + int child_stdout[2] = {-1, -1}; + if (fd_state_[STDOUT_FILENO] == PIPED) { + PCHECK(pipe2(child_stdout, O_CLOEXEC) == 0); + } + // Pipe from child's stderr back to caller process + // [0] = how parent reads from child's stderr, [1] = how child writes to it + int child_stderr[2] = {-1, -1}; + if (fd_state_[STDERR_FILENO] == PIPED) { + PCHECK(pipe2(child_stderr, O_CLOEXEC) == 0); + } + // The synchronization pipe: this trick is to make sure the parent returns + // control only after the child process has invoked execvp(). + int sync_pipe[2]; + PCHECK(pipe2(sync_pipe, O_CLOEXEC) == 0); + + DIR* fd_dir = nullptr; + RETURN_NOT_OK_PREPEND(OpenProcFdDir(&fd_dir), "Unable to open fd dir"); + unique_ptr<DIR, std::function<void(DIR*)>> fd_dir_closer(fd_dir, + CloseProcFdDir); + int ret = fork(); + if (ret == -1) { + return Status::RuntimeError("Unable to fork", ErrnoToString(errno), errno); + } + if (ret == 0) { // We are the child + // Send the child a SIGTERM when the parent dies. This is done as early + // as possible in the child's life to prevent any orphaning whatsoever + // (e.g. from KUDU-402). +#if defined(__linux__) + // TODO: prctl(PR_SET_PDEATHSIG) is Linux-specific, look into portable ways + // to prevent orphans when parent is killed. + prctl(PR_SET_PDEATHSIG, SIGKILL); +#endif + + // stdin + if (fd_state_[STDIN_FILENO] == PIPED) { + PCHECK(dup2(child_stdin[0], STDIN_FILENO) == STDIN_FILENO); + } + + // stdout + switch (fd_state_[STDOUT_FILENO]) { + case PIPED: { + PCHECK(dup2(child_stdout[1], STDOUT_FILENO) == STDOUT_FILENO); + break; + } + case DISABLED: { + RedirectToDevNull(STDOUT_FILENO); + break; + } + default: + break; + } + + // stderr + switch (fd_state_[STDERR_FILENO]) { + case PIPED: { + PCHECK(dup2(child_stderr[1], STDERR_FILENO) == STDERR_FILENO); + break; + } + case DISABLED: { + RedirectToDevNull(STDERR_FILENO); + break; + } + default: + break; + } + + // Close the read side of the sync pipe; + // the write side should be closed upon execvp(). + PCHECK(close(sync_pipe[0]) == 0); + + CloseNonStandardFDs(fd_dir); + + // Ensure we are not ignoring or blocking signals in the child process. + ResetAllSignalMasksToUnblocked(); + + // Reset the disposition of SIGPIPE to SIG_DFL because we routinely set its + // disposition to SIG_IGN via IgnoreSigPipe(). At the time of writing, we + // don't explicitly ignore any other signals in Kudu. + ResetSigPipeHandlerToDefault(); + + // Set the environment for the subprocess. This is more portable than + // using execvpe(), which doesn't exist on OS X. We rely on the 'p' + // variant of exec to do $PATH searching if the executable specified + // by the caller isn't an absolute path. + for (const auto& env : env_) { + ignore_result(setenv(env.first.c_str(), env.second.c_str(), 1 /* overwrite */)); + } + + execvp(program_.c_str(), &argv_ptrs[0]); + int err = errno; + PLOG(ERROR) << "Couldn't exec " << program_; + _exit(err); + } else { + // We are the parent + child_pid_ = ret; + // Close child's side of the pipes + if (fd_state_[STDIN_FILENO] == PIPED) close(child_stdin[0]); + if (fd_state_[STDOUT_FILENO] == PIPED) close(child_stdout[1]); + if (fd_state_[STDERR_FILENO] == PIPED) close(child_stderr[1]); + // Keep parent's side of the pipes + child_fds_[STDIN_FILENO] = child_stdin[1]; + child_fds_[STDOUT_FILENO] = child_stdout[0]; + child_fds_[STDERR_FILENO] = child_stderr[0]; + + // Wait for the child process to invoke execvp(). The trick involves + // a pipe with O_CLOEXEC option for its descriptors. The parent process + // performs blocking read from the pipe while the write side of the pipe + // is kept open by the child (it does not write any data, though). The write + // side of the pipe is closed when the child invokes execvp(). At that + // point, the parent should receive EOF, i.e. read() should return 0. + { + // Close the write side of the sync pipe. It's crucial to make sure + // it succeeds otherwise the blocking read() below might wait forever + // even if the child process has closed the pipe. + PCHECK(close(sync_pipe[1]) == 0); + while (true) { + uint8_t buf; + int err = 0; + const int rc = read(sync_pipe[0], &buf, 1); + if (rc == -1) { + err = errno; + if (err == EINTR) { + // Retry in case of a signal. + continue; + } + } + PCHECK(close(sync_pipe[0]) == 0); + if (rc == 0) { + // That's OK -- expecting EOF from the other side of the pipe. + break; + } else if (rc == -1) { + // Other errors besides EINTR are not expected. + return Status::RuntimeError("Unexpected error from the sync pipe", + ErrnoToString(err), err); + } + // No data is expected from the sync pipe. + LOG(FATAL) << Substitute("$0: unexpected data from the sync pipe", rc); + } + } + } + + state_ = kRunning; + return Status::OK(); +} + +Status Subprocess::Wait(int* wait_status) { + return DoWait(wait_status, BLOCKING); +} + +Status Subprocess::WaitNoBlock(int* wait_status) { + return DoWait(wait_status, NON_BLOCKING); +} + +Status Subprocess::Kill(int signal) { + if (state_ != kRunning) { + const string err_str = "Sub-process is not running"; + LOG(DFATAL) << err_str; + return Status::IllegalState(err_str); + } + if (kill(child_pid_, signal) != 0) { + return Status::RuntimeError("Unable to kill", + ErrnoToString(errno), + errno); + } + return Status::OK(); +} + +Status Subprocess::KillAndWait(int signal) { + string procname = Substitute("$0 (pid $1)", argv0(), pid()); + + // This is a fatal error because all errors in Kill() are signal-independent, + // so Kill(SIGKILL) is just as likely to fail if this did. + RETURN_NOT_OK_PREPEND( + Kill(signal), Substitute("Failed to send signal $0 to $1", + signal, procname)); + if (signal == SIGKILL) { + RETURN_NOT_OK_PREPEND( + Wait(), Substitute("Failed to wait on $0", procname)); + } else { + Status s; + Stopwatch sw; + sw.start(); + do { + s = WaitNoBlock(); + if (s.ok()) { + break; + } else if (!s.IsTimedOut()) { + // An unexpected error in WaitNoBlock() is likely to manifest repeatedly, + // so there's no point in retrying this. + RETURN_NOT_OK_PREPEND( + s, Substitute("Unexpected failure while waiting on $0", procname)); + } + SleepFor(MonoDelta::FromMilliseconds(10)); + } while (sw.elapsed().wall_seconds() < kProcessWaitTimeoutSeconds); + if (s.IsTimedOut()) { + return KillAndWait(SIGKILL); + } + } + return Status::OK(); +} + +Status Subprocess::GetExitStatus(int* exit_status, string* info_str) const { + if (state_ != kExited) { + const string err_str = "Sub-process termination hasn't yet been detected"; + LOG(DFATAL) << err_str; + return Status::IllegalState(err_str); + } + string info; + int status; + if (WIFEXITED(wait_status_)) { + status = WEXITSTATUS(wait_status_); + if (status == 0) { + info = Substitute("$0: process successfully exited", program_); + } else { + info = Substitute("$0: process exited with non-zero status $1", + program_, status); + } + } else if (WIFSIGNALED(wait_status_)) { + // Using signal number as exit status. + status = WTERMSIG(wait_status_); + info = Substitute("$0: process exited on signal $1", program_, status); +#if defined(WCOREDUMP) + if (WCOREDUMP(wait_status_)) { + SubstituteAndAppend(&info, " (core dumped)"); + } +#endif + } else { + status = -1; + info = Substitute("$0: process reported unexpected wait status $1", + program_, wait_status_); + LOG(DFATAL) << info; + } + if (exit_status) { + *exit_status = status; + } + if (info_str) { + *info_str = info; + } + return Status::OK(); +} + +Status Subprocess::Call(const string& arg_str) { + vector<string> argv = Split(arg_str, " "); + return Call(argv, "", nullptr, nullptr); +} + +Status Subprocess::Call(const vector<string>& argv, + const string& stdin_in, + string* stdout_out, + string* stderr_out) { + Subprocess p(argv); + + if (stdout_out) { + p.ShareParentStdout(false); + } + if (stderr_out) { + p.ShareParentStderr(false); + } + RETURN_NOT_OK_PREPEND(p.Start(), + "Unable to fork " + argv[0]); + + if (!stdin_in.empty() && + write(p.to_child_stdin_fd(), stdin_in.data(), stdin_in.size()) < stdin_in.size()) { + return Status::IOError("Unable to write to child process stdin", ErrnoToString(errno), errno); + } + + int err = close(p.ReleaseChildStdinFd()); + if (PREDICT_FALSE(err != 0)) { + return Status::IOError("Unable to close child process stdin", ErrnoToString(errno), errno); + } + + vector<int> fds; + if (stdout_out) { + fds.push_back(p.from_child_stdout_fd()); + } + if (stderr_out) { + fds.push_back(p.from_child_stderr_fd()); + } + vector<string> outv; + RETURN_NOT_OK(ReadFdsFully(argv[0], fds, &outv)); + + // Given that ReadFdsFully captures the strings in the order in which we + // had installed 'fds' above, it can be assured that we can receive + // as many strings as there were 'fds' in the vector and in that order. + CHECK_EQ(outv.size(), fds.size()); + if (stdout_out) { + *stdout_out = std::move(outv.front()); + } + if (stderr_out) { + *stderr_out = std::move(outv.back()); + } + + RETURN_NOT_OK_PREPEND(p.Wait(), "Unable to wait() for " + argv[0]); + int exit_status; + string exit_info_str; + RETURN_NOT_OK(p.GetExitStatus(&exit_status, &exit_info_str)); + if (exit_status != 0) { + return Status::RuntimeError(exit_info_str); + } + return Status::OK(); +} + +pid_t Subprocess::pid() const { + CHECK_EQ(state_, kRunning); + return child_pid_; +} + +Status Subprocess::DoWait(int* wait_status, WaitMode mode) { + if (state_ == kExited) { + if (wait_status) { + *wait_status = wait_status_; + } + return Status::OK(); + } + if (state_ != kRunning) { + const string err_str = Substitute("$0: illegal sub-process state", state_); + LOG(DFATAL) << err_str; + return Status::IllegalState(err_str); + } + + const int options = (mode == NON_BLOCKING) ? WNOHANG : 0; + int status; + const int rc = waitpid(child_pid_, &status, options); + if (rc == -1) { + return Status::RuntimeError("Unable to wait on child", + ErrnoToString(errno), errno); + } + if (mode == NON_BLOCKING && rc == 0) { + return Status::TimedOut(""); + } + CHECK_EQ(rc, child_pid_); + CHECK(WIFEXITED(status) || WIFSIGNALED(status)); + + child_pid_ = -1; + wait_status_ = status; + state_ = kExited; + if (wait_status) { + *wait_status = status; + } + return Status::OK(); +} + +void Subprocess::SetEnvVars(std::map<std::string, std::string> env) { + env_ = std::move(env); +} + +void Subprocess::SetFdShared(int stdfd, bool share) { + CHECK_EQ(state_, kNotStarted); + CHECK_NE(fd_state_[stdfd], DISABLED); + fd_state_[stdfd] = share? SHARED : PIPED; +} + +int Subprocess::CheckAndOffer(int stdfd) const { + CHECK_EQ(state_, kRunning); + CHECK_EQ(fd_state_[stdfd], PIPED); + return child_fds_[stdfd]; +} + +int Subprocess::ReleaseChildFd(int stdfd) { + CHECK_EQ(state_, kRunning); + CHECK_GE(child_fds_[stdfd], 0); + CHECK_EQ(fd_state_[stdfd], PIPED); + int ret = child_fds_[stdfd]; + child_fds_[stdfd] = -1; + return ret; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/subprocess.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/subprocess.h b/be/src/kudu/util/subprocess.h new file mode 100644 index 0000000..9834e3a --- /dev/null +++ b/be/src/kudu/util/subprocess.h @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_SUBPROCESS_H +#define KUDU_UTIL_SUBPROCESS_H + +#include <signal.h> + +#include <map> +#include <string> +#include <vector> + +#include <glog/logging.h> + +#include "kudu/gutil/macros.h" +#include "kudu/util/status.h" + +namespace kudu { + +// Wrapper around a spawned subprocess. +// +// program will be treated as an absolute path unless it begins with a dot or a +// slash. +// +// This takes care of creating pipes to/from the subprocess and offers +// basic functionality to wait on it or send signals. +// By default, child process only has stdin captured and separate from the parent. +// The stdout/stderr streams are shared with the parent by default. +// +// The process may only be started and waited on/killed once. +// +// Optionally, user may change parent/child stream sharing. Also, a user may disable +// a subprocess stream. A user cannot do both. +// +// Note that, when the Subprocess object is destructed, the child process +// will be forcibly SIGKILLed to avoid orphaning processes. +class Subprocess { + public: + // Constructs a new Subprocess that will execute 'argv' on Start(). + // + // If the process isn't explicitly killed, 'sig_on_destroy' will be delivered + // to it when the Subprocess goes out of scope. + explicit Subprocess(std::vector<std::string> argv, int sig_on_destruct = SIGKILL); + ~Subprocess(); + + // Disable subprocess stream output. Must be called before subprocess starts. + void DisableStderr(); + void DisableStdout(); + + // Share a stream with parent. Must be called before subprocess starts. + // Cannot set sharing at all if stream is disabled + void ShareParentStdin(bool share = true) { SetFdShared(STDIN_FILENO, share); } + void ShareParentStdout(bool share = true) { SetFdShared(STDOUT_FILENO, share); } + void ShareParentStderr(bool share = true) { SetFdShared(STDERR_FILENO, share); } + + // Add environment variables to be set before executing the subprocess. + // + // These environment variables are merged into the existing environment + // of the parent process. In other words, there is no need to prime this + // map with the current environment; instead, just specify any variables + // that should be overridden. + // + // Repeated calls to this function replace earlier calls. + void SetEnvVars(std::map<std::string, std::string> env); + + // Start the subprocess. Can only be called once. + // + // This returns a bad Status if the fork() fails. However, + // note that if the executable path was incorrect such that + // exec() fails, this will still return Status::OK. You must + // use Wait() to check for failure. + Status Start() WARN_UNUSED_RESULT; + + // Wait for the subprocess to exit. The return value is the same as + // that of the waitpid() syscall. Only call after starting. + // + // NOTE: unlike the standard wait(2) call, this may be called multiple + // times. If the process has exited, it will repeatedly return the same + // exit code. + Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT; + + // Like the above, but does not block. This returns Status::TimedOut + // immediately if the child has not exited. Otherwise returns Status::OK + // and sets *ret. Only call after starting. + // + // NOTE: unlike the standard wait(2) call, this may be called multiple + // times. If the process has exited, it will repeatedly return the same + // exit code. + Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT; + + // Send a signal to the subprocess. + // Note that this does not reap the process -- you must still Wait() + // in order to reap it. Only call after starting. + Status Kill(int signal) WARN_UNUSED_RESULT; + + // Sends a signal to the subprocess and waits for it to exit. + // + // If the signal is not SIGKILL and the process doesn't appear to be exiting, + // retries with SIGKILL. + Status KillAndWait(int signal); + + // Retrieve exit status of the process awaited by Wait() and/or WaitNoBlock() + // methods. Must be called only after calling Wait()/WaitNoBlock(). + Status GetExitStatus(int* exit_status, std::string* info_str = nullptr) const + WARN_UNUSED_RESULT; + + // Helper method that creates a Subprocess, issues a Start() then a Wait(). + // Expects a blank-separated list of arguments, with the first being the + // full path to the executable. + // The returned Status will only be OK if all steps were successful and + // the return code was 0. + static Status Call(const std::string& arg_str) WARN_UNUSED_RESULT; + + // Same as above, but accepts a vector that includes the path to the + // executable as argv[0] and the arguments to the program in argv[1..n]. + // + // Writes the value of 'stdin_in' to the subprocess' stdin. The length of + // 'stdin_in' should be limited to 64kib. + // + // Also collects the output from the child process stdout and stderr into + // 'stdout_out' and 'stderr_out' respectively. + static Status Call(const std::vector<std::string>& argv, + const std::string& stdin_in = "", + std::string* stdout_out = nullptr, + std::string* stderr_out = nullptr) WARN_UNUSED_RESULT; + + // Return the pipe fd to the child's standard stream. + // Stream should not be disabled or shared. + int to_child_stdin_fd() const { return CheckAndOffer(STDIN_FILENO); } + int from_child_stdout_fd() const { return CheckAndOffer(STDOUT_FILENO); } + int from_child_stderr_fd() const { return CheckAndOffer(STDERR_FILENO); } + + // Release control of the file descriptor for the child's stream, only if piped. + // Writes to this FD show up on stdin in the subprocess + int ReleaseChildStdinFd() { return ReleaseChildFd(STDIN_FILENO ); } + // Reads from this FD come from stdout of the subprocess + int ReleaseChildStdoutFd() { return ReleaseChildFd(STDOUT_FILENO); } + // Reads from this FD come from stderr of the subprocess + int ReleaseChildStderrFd() { return ReleaseChildFd(STDERR_FILENO); } + + pid_t pid() const; + const std::string& argv0() const { return argv_[0]; } + + private: + enum State { + kNotStarted, + kRunning, + kExited + }; + enum StreamMode {SHARED, DISABLED, PIPED}; + enum WaitMode {BLOCKING, NON_BLOCKING}; + + Status DoWait(int* wait_status, WaitMode mode) WARN_UNUSED_RESULT; + void SetFdShared(int stdfd, bool share); + int CheckAndOffer(int stdfd) const; + int ReleaseChildFd(int stdfd); + + std::string program_; + std::vector<std::string> argv_; + std::map<std::string, std::string> env_; + State state_; + int child_pid_; + enum StreamMode fd_state_[3]; + int child_fds_[3]; + + // The cached wait status if Wait()/WaitNoBlock() has been called. + // Only valid if state_ == kExited. + int wait_status_; + + // Custom signal to deliver when the subprocess goes out of scope, provided + // the process hasn't already been killed. + int sig_on_destruct_; + + DISALLOW_COPY_AND_ASSIGN(Subprocess); +}; + +} // namespace kudu +#endif /* KUDU_UTIL_SUBPROCESS_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_graph.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_graph.cc b/be/src/kudu/util/test_graph.cc new file mode 100644 index 0000000..052ad9c --- /dev/null +++ b/be/src/kudu/util/test_graph.cc @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/test_graph.h" + +#include <glog/logging.h> +#include <mutex> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/stringprintf.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/locks.h" +#include "kudu/util/status.h" +#include "kudu/util/thread.h" + +using std::shared_ptr; +using std::string; + +namespace kudu { + +void TimeSeries::AddValue(double val) { + std::lock_guard<simple_spinlock> l(lock_); + val_ += val; +} + +void TimeSeries::SetValue(double val) { + std::lock_guard<simple_spinlock> l(lock_); + val_ = val; +} + +double TimeSeries::value() const { + std::lock_guard<simple_spinlock> l(lock_); + return val_; +} + +TimeSeriesCollector::~TimeSeriesCollector() { + if (started_) { + StopDumperThread(); + } +} + +shared_ptr<TimeSeries> TimeSeriesCollector::GetTimeSeries(const string &key) { + MutexLock l(series_lock_); + SeriesMap::const_iterator it = series_map_.find(key); + if (it == series_map_.end()) { + shared_ptr<TimeSeries> ts(new TimeSeries()); + series_map_[key] = ts; + return ts; + } else { + return (*it).second; + } +} + +void TimeSeriesCollector::StartDumperThread() { + LOG(INFO) << "Starting metrics dumper"; + CHECK(!started_); + exit_latch_.Reset(1); + started_ = true; + CHECK_OK(kudu::Thread::Create("time series", "dumper", + &TimeSeriesCollector::DumperThread, this, &dumper_thread_)); +} + +void TimeSeriesCollector::StopDumperThread() { + CHECK(started_); + exit_latch_.CountDown(); + CHECK_OK(ThreadJoiner(dumper_thread_.get()).Join()); + started_ = false; +} + +void TimeSeriesCollector::DumperThread() { + CHECK(started_); + WallTime start_time = WallTime_Now(); + + faststring metrics_str; + while (true) { + metrics_str.clear(); + metrics_str.append("metrics: "); + BuildMetricsString(WallTime_Now() - start_time, &metrics_str); + LOG(INFO) << metrics_str.ToString(); + + // Sleep until next dump time, or return if we should exit + if (exit_latch_.WaitFor(MonoDelta::FromMilliseconds(250))) { + return; + } + } +} + +void TimeSeriesCollector::BuildMetricsString( + WallTime time_since_start, faststring *dst_buf) const { + MutexLock l(series_lock_); + + dst_buf->append(StringPrintf("{ \"scope\": \"%s\", \"time\": %.3f", + scope_.c_str(), time_since_start)); + + for (SeriesMap::const_reference entry : series_map_) { + dst_buf->append(StringPrintf(", \"%s\": %.3f", + entry.first.c_str(), entry.second->value())); + } + dst_buf->append("}"); +} + + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_graph.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_graph.h b/be/src/kudu/util/test_graph.h new file mode 100644 index 0000000..6ea49ca --- /dev/null +++ b/be/src/kudu/util/test_graph.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_TEST_GRAPH_COLLECTOR_H +#define KUDU_TEST_GRAPH_COLLECTOR_H + +#include <memory> +#include <string> +#include <unordered_map> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/macros.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/countdown_latch.h" +#include "kudu/util/faststring.h" +#include "kudu/util/locks.h" +#include "kudu/util/thread.h" + +namespace kudu { + +class TimeSeries { + public: + void AddValue(double val); + void SetValue(double val); + + double value() const; + + private: + friend class TimeSeriesCollector; + + DISALLOW_COPY_AND_ASSIGN(TimeSeries); + + TimeSeries() : + val_(0) + {} + + mutable simple_spinlock lock_; + double val_; +}; + +class TimeSeriesCollector { + public: + explicit TimeSeriesCollector(std::string scope) + : scope_(std::move(scope)), exit_latch_(0), started_(false) {} + + ~TimeSeriesCollector(); + + std::shared_ptr<TimeSeries> GetTimeSeries(const std::string &key); + void StartDumperThread(); + void StopDumperThread(); + + private: + DISALLOW_COPY_AND_ASSIGN(TimeSeriesCollector); + + void DumperThread(); + void BuildMetricsString(WallTime time_since_start, faststring *dst_buf) const; + + std::string scope_; + + typedef std::unordered_map<std::string, std::shared_ptr<TimeSeries> > SeriesMap; + SeriesMap series_map_; + mutable Mutex series_lock_; + + scoped_refptr<kudu::Thread> dumper_thread_; + + // Latch used to stop the dumper_thread_. When the thread is started, + // this is set to 1, and when the thread should exit, it is counted down. + CountDownLatch exit_latch_; + + bool started_; +}; + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_macros.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_macros.h b/be/src/kudu/util/test_macros.h new file mode 100644 index 0000000..63cae5a --- /dev/null +++ b/be/src/kudu/util/test_macros.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_UTIL_TEST_MACROS_H +#define KUDU_UTIL_TEST_MACROS_H + +#include <gmock/gmock.h> +#include <string> + +// ASSERT_NO_FATAL_FAILURE is just too long to type. +#define NO_FATALS(expr) \ + ASSERT_NO_FATAL_FAILURE(expr) + +// Detect fatals in the surrounding scope. NO_FATALS() only checks for fatals +// in the expression passed to it. +#define NO_PENDING_FATALS() \ + if (testing::Test::HasFatalFailure()) { return; } + +#define ASSERT_OK(status) do { \ + const Status& _s = status; \ + if (_s.ok()) { \ + SUCCEED(); \ + } else { \ + FAIL() << "Bad status: " << _s.ToString(); \ + } \ +} while (0); + +#define EXPECT_OK(status) do { \ + const Status& _s = status; \ + if (_s.ok()) { \ + SUCCEED(); \ + } else { \ + ADD_FAILURE() << "Bad status: " << _s.ToString(); \ + } \ +} while (0); + +// Like the above, but doesn't record successful +// tests. +#define ASSERT_OK_FAST(status) do { \ + const Status& _s = status; \ + if (!_s.ok()) { \ + FAIL() << "Bad status: " << _s.ToString(); \ + } \ +} while (0); + +// Substring matches. +#define ASSERT_STR_CONTAINS(str, substr) \ + ASSERT_THAT(str, testing::HasSubstr(substr)) + +#define ASSERT_STR_NOT_CONTAINS(str, substr) \ + ASSERT_THAT(str, testing::Not(testing::HasSubstr(substr))) + +// Substring regular expressions in extended regex (POSIX) syntax. +#define ASSERT_STR_MATCHES(str, pattern) \ + ASSERT_THAT(str, testing::ContainsRegex(pattern)) + +#define ASSERT_STR_NOT_MATCHES(str, pattern) \ + ASSERT_THAT(str, testing::Not(testing::ContainsRegex(pattern))) + +// Batched substring regular expressions in extended regex (POSIX) syntax. +// +// All strings must match the pattern. +#define ASSERT_STRINGS_ALL_MATCH(strings, pattern) do { \ + const auto& _strings = (strings); \ + const auto& _pattern = (pattern); \ + int _str_idx = 0; \ + for (const auto& str : _strings) { \ + ASSERT_STR_MATCHES(str, _pattern) \ + << "string " << _str_idx << ": pattern " << _pattern \ + << " does not match string " << str; \ + _str_idx++; \ + } \ +} while (0) + +// Batched substring regular expressions in extended regex (POSIX) syntax. +// +// At least one string must match the pattern. +#define ASSERT_STRINGS_ANY_MATCH(strings, pattern) do { \ + const auto& _strings = (strings); \ + const auto& _pattern = (pattern); \ + bool matched = false; \ + for (const auto& str : _strings) { \ + if (testing::internal::RE::PartialMatch(str, testing::internal::RE(_pattern))) { \ + matched = true; \ + break; \ + } \ + } \ + ASSERT_TRUE(matched) \ + << "not one string matched pattern " << _pattern; \ +} while (0) + +#define ASSERT_FILE_EXISTS(env, path) do { \ + const std::string& _s = path; \ + ASSERT_TRUE(env->FileExists(_s)) \ + << "Expected file to exist: " << _s; \ +} while (0); + +#define ASSERT_FILE_NOT_EXISTS(env, path) do { \ + const std::string& _s = path; \ + ASSERT_FALSE(env->FileExists(_s)) \ + << "Expected file not to exist: " << _s; \ +} while (0); + +#define CURRENT_TEST_NAME() \ + ::testing::UnitTest::GetInstance()->current_test_info()->name() + +#define CURRENT_TEST_CASE_NAME() \ + ::testing::UnitTest::GetInstance()->current_test_info()->test_case_name() + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_main.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_main.cc b/be/src/kudu/util/test_main.cc new file mode 100644 index 0000000..fa88f21 --- /dev/null +++ b/be/src/kudu/util/test_main.cc @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdlib> +#include <thread> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/util/debug/leakcheck_disabler.h" +#include "kudu/util/pstack_watcher.h" +#include "kudu/util/flags.h" +#include "kudu/util/minidump.h" +#include "kudu/util/signal.h" +#include "kudu/util/status.h" +#include "kudu/util/test_util.h" + +DEFINE_int32(test_timeout_after, 0, + "Maximum total seconds allowed for all unit tests in the suite. Default: disabled"); + +DEFINE_int32(stress_cpu_threads, 0, + "Number of threads to start that burn CPU in an attempt to " + "stimulate race conditions"); + +namespace kudu { + +// Start thread that kills the process if --test_timeout_after is exceeded before +// the tests complete. +static void CreateAndStartTimeoutThread() { + if (FLAGS_test_timeout_after == 0) return; + + // KUDU-1995: if running death tests using EXPECT_EXIT()/ASSERT_EXIT(), LSAN + // reports leaks in CreateAndStartTimeoutThread(). Adding a couple of scoped + // leak check disablers as a workaround since right now it's not clear what + // is going on exactly: LSAN does not report those leaks for tests which run + // ASSERT_DEATH(). This does not seem harmful or hiding any potential leaks + // since it's scoped and targeted only for this utility thread. + debug::ScopedLeakCheckDisabler disabler; + std::thread([=](){ + debug::ScopedLeakCheckDisabler disabler; + SleepFor(MonoDelta::FromSeconds(FLAGS_test_timeout_after)); + // Dump a pstack to stdout. + WARN_NOT_OK(PstackWatcher::DumpStacks(), "Unable to print pstack"); + + // ...and abort. + LOG(FATAL) << "Maximum unit test time exceeded (" << FLAGS_test_timeout_after << " sec)"; + }).detach(); +} +} // namespace kudu + + +static void StartStressThreads() { + for (int i = 0; i < FLAGS_stress_cpu_threads; i++) { + std::thread([]{ + while (true) { + // Do something which won't be optimized out. + base::subtle::MemoryBarrier(); + } + }).detach(); + } +} + +int main(int argc, char **argv) { + google::InstallFailureSignalHandler(); + + // We don't use InitGoogleLoggingSafe() because gtest initializes glog, so we + // need to block SIGUSR1 explicitly in order to test minidump generation. + CHECK_OK(kudu::BlockSigUSR1()); + + // Ignore SIGPIPE for all tests so that threads writing to TLS + // sockets do not crash when writing to a closed socket. See KUDU-1910. + kudu::IgnoreSigPipe(); + + // InitGoogleTest() must precede ParseCommandLineFlags(), as the former + // removes gtest-related flags from argv that would trip up the latter. + ::testing::InitGoogleTest(&argc, argv); + kudu::ParseCommandLineFlags(&argc, &argv, true); + + // Create the test-timeout timer. + kudu::CreateAndStartTimeoutThread(); + + StartStressThreads(); + + // This is called by the KuduTest setup method, but in case we have + // any tests that don't inherit from KuduTest, it's helpful to + // cover our bases and call it here too. + kudu::KuduTest::OverrideKrb5Environment(); + + int ret = RUN_ALL_TESTS(); + + return ret; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_util.cc b/be/src/kudu/util/test_util.cc new file mode 100644 index 0000000..50c80c6 --- /dev/null +++ b/be/src/kudu/util/test_util.cc @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "kudu/util/test_util.h" + +#include <map> +#include <string> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <glog/stl_logging.h> +#include <gtest/gtest-spi.h> + +#include "kudu/gutil/strings/strcat.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/gutil/walltime.h" +#include "kudu/util/env.h" +#include "kudu/util/path_util.h" +#include "kudu/util/random.h" +#include "kudu/util/spinlock_profiling.h" + +DEFINE_string(test_leave_files, "on_failure", + "Whether to leave test files around after the test run. " + " Valid values are 'always', 'on_failure', or 'never'"); + +DEFINE_int32(test_random_seed, 0, "Random seed to use for randomized tests"); + +using std::string; +using std::vector; +using strings::Substitute; + +namespace kudu { + +const char* kInvalidPath = "/dev/invalid-path-for-kudu-tests"; +static const char* const kSlowTestsEnvVariable = "KUDU_ALLOW_SLOW_TESTS"; + +static const uint64 kTestBeganAtMicros = Env::Default()->NowMicros(); + +// Global which production code can check to see if it is running +// in a GTest environment (assuming the test binary links in this module, +// which is typically a good assumption). +// +// This can be checked using the 'IsGTest()' function from test_util_prod.cc. +bool g_is_gtest = true; + +/////////////////////////////////////////////////// +// KuduTest +/////////////////////////////////////////////////// + +KuduTest::KuduTest() + : env_(Env::Default()), + test_dir_(GetTestDataDirectory()) { + std::map<const char*, const char*> flags_for_tests = { + // Disabling fsync() speeds up tests dramatically, and it's safe to do as no + // tests rely on cutting power to a machine or equivalent. + {"never_fsync", "true"}, + // Disable log redaction. + {"redact", "flag"}, + // Reduce default RSA key length for faster tests. We are using strong/high + // TLS v1.2 cipher suites, so minimum possible for TLS-related RSA keys is + // 768 bits. However, for the external mini cluster we use 1024 bits because + // Java default security policies require at least 1024 bits for RSA keys + // used in certificates. For uniformity, here 1024 RSA bit keys are used + // as well. As for the TSK keys, 512 bits is the minimum since the SHA256 + // digest is used for token signing/verification. + {"ipki_server_key_size", "1024"}, + {"ipki_ca_key_size", "1024"}, + {"tsk_num_rsa_bits", "512"}, + }; + for (const auto& e : flags_for_tests) { + // We don't check for errors here, because we have some default flags that + // only apply to certain tests. + google::SetCommandLineOptionWithMode(e.first, e.second, google::SET_FLAGS_DEFAULT); + } +} + +KuduTest::~KuduTest() { + // Clean up the test directory in the destructor instead of a TearDown + // method. This is better because it ensures that the child-class + // dtor runs first -- so, if the child class is using a minicluster, etc, + // we will shut that down before we remove files underneath. + if (FLAGS_test_leave_files == "always") { + LOG(INFO) << "-----------------------------------------------"; + LOG(INFO) << "--test_leave_files specified, leaving files in " << test_dir_; + } else if (FLAGS_test_leave_files == "on_failure" && HasFatalFailure()) { + LOG(INFO) << "-----------------------------------------------"; + LOG(INFO) << "Had fatal failures, leaving test files at " << test_dir_; + } else { + VLOG(1) << "Cleaning up temporary test files..."; + WARN_NOT_OK(env_->DeleteRecursively(test_dir_), + "Couldn't remove test files"); + } +} + +void KuduTest::SetUp() { + InitSpinLockContentionProfiling(); + OverrideKrb5Environment(); +} + +string KuduTest::GetTestPath(const string& relative_path) { + return JoinPathSegments(test_dir_, relative_path); +} + +void KuduTest::OverrideKrb5Environment() { + // Set these variables to paths that definitely do not exist and + // couldn't be accidentally created. + // + // Note that if we were to set these to /dev/null, we end up triggering a leak in krb5 + // when it tries to read an empty file as a ticket cache, whereas non-existent files + // don't have this issue. See MIT krb5 bug #8509. + // + // NOTE: we don't simply *unset* the variables, because then we'd still pick up + // the user's /etc/krb5.conf and other default locations. + setenv("KRB5_CONFIG", kInvalidPath, 1); + setenv("KRB5_KTNAME", kInvalidPath, 1); + setenv("KRB5CCNAME", kInvalidPath, 1); + // Enable the workaround for MIT krb5 1.10 bugs from krb5_realm_override.cc. + setenv("KUDU_ENABLE_KRB5_REALM_FIX", "yes", 1); +} + +/////////////////////////////////////////////////// +// Test utility functions +/////////////////////////////////////////////////// + +bool AllowSlowTests() { + char *e = getenv(kSlowTestsEnvVariable); + if ((e == nullptr) || + (strlen(e) == 0) || + (strcasecmp(e, "false") == 0) || + (strcasecmp(e, "0") == 0) || + (strcasecmp(e, "no") == 0)) { + return false; + } + if ((strcasecmp(e, "true") == 0) || + (strcasecmp(e, "1") == 0) || + (strcasecmp(e, "yes") == 0)) { + return true; + } + LOG(FATAL) << "Unrecognized value for " << kSlowTestsEnvVariable << ": " << e; + return false; +} + +void OverrideFlagForSlowTests(const std::string& flag_name, + const std::string& new_value) { + // Ensure that the flag is valid. + google::GetCommandLineFlagInfoOrDie(flag_name.c_str()); + + // If we're not running slow tests, don't override it. + if (!AllowSlowTests()) { + return; + } + google::SetCommandLineOptionWithMode(flag_name.c_str(), new_value.c_str(), + google::SET_FLAG_IF_DEFAULT); +} + +int SeedRandom() { + int seed; + // Initialize random seed + if (FLAGS_test_random_seed == 0) { + // Not specified by user + seed = static_cast<int>(GetCurrentTimeMicros()); + } else { + seed = FLAGS_test_random_seed; + } + LOG(INFO) << "Using random seed: " << seed; + srand(seed); + return seed; +} + +string GetTestDataDirectory() { + const ::testing::TestInfo* const test_info = + ::testing::UnitTest::GetInstance()->current_test_info(); + CHECK(test_info) << "Must be running in a gtest unit test to call this function"; + string dir; + CHECK_OK(Env::Default()->GetTestDirectory(&dir)); + + // The directory name includes some strings for specific reasons: + // - program name: identifies the directory to the test invoker + // - timestamp and pid: disambiguates with prior runs of the same test + // + // e.g. "env-test.TestEnv.TestReadFully.1409169025392361-23600" + dir += Substitute("/$0.$1.$2.$3-$4", + StringReplace(google::ProgramInvocationShortName(), "/", "_", true), + StringReplace(test_info->test_case_name(), "/", "_", true), + StringReplace(test_info->name(), "/", "_", true), + kTestBeganAtMicros, + getpid()); + Status s = Env::Default()->CreateDir(dir); + CHECK(s.IsAlreadyPresent() || s.ok()) + << "Could not create directory " << dir << ": " << s.ToString(); + if (s.ok()) { + string metadata; + + StrAppend(&metadata, Substitute("PID=$0\n", getpid())); + + StrAppend(&metadata, Substitute("PPID=$0\n", getppid())); + + char* jenkins_build_id = getenv("BUILD_ID"); + if (jenkins_build_id) { + StrAppend(&metadata, Substitute("BUILD_ID=$0\n", jenkins_build_id)); + } + + CHECK_OK(WriteStringToFile(Env::Default(), metadata, + Substitute("$0/test_metadata", dir))); + } + return dir; +} + +void AssertEventually(const std::function<void(void)>& f, + const MonoDelta& timeout) { + const MonoTime deadline = MonoTime::Now() + timeout; + + for (int attempts = 0; MonoTime::Now() < deadline; attempts++) { + // Capture any assertion failures within this scope (i.e. from their function) + // into 'results' + testing::TestPartResultArray results; + testing::ScopedFakeTestPartResultReporter reporter( + testing::ScopedFakeTestPartResultReporter::INTERCEPT_ONLY_CURRENT_THREAD, + &results); + f(); + + // Determine whether their function produced any new test failure results. + bool has_failures = false; + for (int i = 0; i < results.size(); i++) { + has_failures |= results.GetTestPartResult(i).failed(); + } + if (!has_failures) { + return; + } + + // If they had failures, sleep and try again. + int sleep_ms = (attempts < 10) ? (1 << attempts) : 1000; + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + } + + // If we ran out of time looping, run their function one more time + // without capturing its assertions. This way the assertions will + // propagate back out to the normal test reporter. Of course it's + // possible that it will pass on this last attempt, but that's OK + // too, since we aren't trying to be that strict about the deadline. + f(); + if (testing::Test::HasFatalFailure()) { + ADD_FAILURE() << "Timed out waiting for assertion to pass."; + } +} + +int CountOpenFds(Env* env) { + static const char* kProcSelfFd = +#if defined(__APPLE__) + "/dev/fd"; +#else + "/proc/self/fd"; +#endif // defined(__APPLE__) + + vector<string> children; + CHECK_OK(env->GetChildren(kProcSelfFd, &children)); + int num_fds = 0; + for (const auto& c : children) { + // Skip '.' and '..'. + if (c == "." || c == "..") { + continue; + } + num_fds++; + } + + // Exclude the fd opened to iterate over kProcSelfFd. + return num_fds - 1; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_util.h b/be/src/kudu/util/test_util.h new file mode 100644 index 0000000..061fb3a --- /dev/null +++ b/be/src/kudu/util/test_util.h @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// Base test class, with various utility functions. +#ifndef KUDU_UTIL_TEST_UTIL_H +#define KUDU_UTIL_TEST_UTIL_H + +#include <functional> +#include <gtest/gtest.h> +#include <string> + +#include "kudu/gutil/gscoped_ptr.h" +#include "kudu/util/env.h" +#include "kudu/util/monotime.h" +#include "kudu/util/test_macros.h" + +#define ASSERT_EVENTUALLY(expr) do { \ + AssertEventually(expr); \ + NO_PENDING_FATALS(); \ +} while (0) + +namespace kudu { + +extern const char* kInvalidPath; + +class KuduTest : public ::testing::Test { + public: + KuduTest(); + + virtual ~KuduTest(); + + virtual void SetUp() OVERRIDE; + + // Tests assume that they run with no outside-provided kerberos credentials, and if the + // user happened to have some credentials available they might fail due to being already + // kinitted to a different realm, etc. This function overrides the relevant environment + // variables so that we don't pick up the user's credentials. + static void OverrideKrb5Environment(); + + protected: + // Returns absolute path based on a unit test-specific work directory, given + // a relative path. Useful for writing test files that should be deleted after + // the test ends. + std::string GetTestPath(const std::string& relative_path); + + Env* env_; + google::FlagSaver flag_saver_; // Reset flags on every test. + std::string test_dir_; +}; + +// Returns true if slow tests are runtime-enabled. +bool AllowSlowTests(); + +// Override the given gflag to the new value, only in the case that +// slow tests are enabled and the user hasn't otherwise overridden +// it on the command line. +// Example usage: +// +// OverrideFlagForSlowTests( +// "client_inserts_per_thread", +// strings::Substitute("$0", FLAGS_client_inserts_per_thread * 100)); +// +void OverrideFlagForSlowTests(const std::string& flag_name, + const std::string& new_value); + +// Call srand() with a random seed based on the current time, reporting +// that seed to the logs. The time-based seed may be overridden by passing +// --test_random_seed= from the CLI in order to reproduce a failed randomized +// test. Returns the seed. +int SeedRandom(); + +// Return a per-test directory in which to store test data. Guaranteed to +// return the same directory every time for a given unit test. +// +// May only be called from within a gtest unit test. Prefer KuduTest::test_dir_ +// if a KuduTest instance is available. +std::string GetTestDataDirectory(); + +// Wait until 'f()' succeeds without adding any GTest 'fatal failures'. +// For example: +// +// AssertEventually([]() { +// ASSERT_GT(ReadValueOfMetric(), 10); +// }); +// +// The function is run in a loop with exponential backoff, capped at once +// a second. +// +// To check whether AssertEventually() eventually succeeded, call +// NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs +// this check automatically. +void AssertEventually(const std::function<void(void)>& f, + const MonoDelta& timeout = MonoDelta::FromSeconds(30)); + +// Count the number of open file descriptors in use by this process. +int CountOpenFds(Env* env); + +} // namespace kudu +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util_prod.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_util_prod.cc b/be/src/kudu/util/test_util_prod.cc new file mode 100644 index 0000000..5523bac --- /dev/null +++ b/be/src/kudu/util/test_util_prod.cc @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/test_util_prod.h" + +#include <dlfcn.h> + +namespace kudu { + +bool IsGTest() { + return dlsym(RTLD_DEFAULT, "_ZN4kudu10g_is_gtestE") != nullptr; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/test_util_prod.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/test_util_prod.h b/be/src/kudu/util/test_util_prod.h new file mode 100644 index 0000000..8b7ea61 --- /dev/null +++ b/be/src/kudu/util/test_util_prod.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Test-related utility methods that can be called from non-test +// code. This module is part of the 'util' module and is built into +// all binaries, not just tests, whereas 'test_util.cc' is linked +// only into test binaries. + +#pragma once + +namespace kudu { + +// Return true if the current binary is a gtest. More specifically, +// returns true if the 'test_util.cc' module has been linked in +// (either dynamically or statically) to the running process. +bool IsGTest(); + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/thread-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/thread-test.cc b/be/src/kudu/util/thread-test.cc new file mode 100644 index 0000000..a52f2d4 --- /dev/null +++ b/be/src/kudu/util/thread-test.cc @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/util/thread.h" + +#include <gtest/gtest.h> +#include <string> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/env.h" +#include "kudu/util/test_util.h" +#include "kudu/util/thread_restrictions.h" + +using std::string; + +namespace kudu { + +class ThreadTest : public KuduTest {}; + +// Join with a thread and emit warnings while waiting to join. +// This has to be manually verified. +TEST_F(ThreadTest, TestJoinAndWarn) { + if (!AllowSlowTests()) { + LOG(INFO) << "Skipping test in quick test mode, since this sleeps"; + return; + } + + scoped_refptr<Thread> holder; + ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder)); + ASSERT_OK(ThreadJoiner(holder.get()) + .warn_after_ms(10) + .warn_every_ms(100) + .Join()); +} + +TEST_F(ThreadTest, TestFailedJoin) { + if (!AllowSlowTests()) { + LOG(INFO) << "Skipping test in quick test mode, since this sleeps"; + return; + } + + scoped_refptr<Thread> holder; + ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 1000*1000, &holder)); + Status s = ThreadJoiner(holder.get()) + .give_up_after_ms(50) + .Join(); + ASSERT_STR_CONTAINS(s.ToString(), "Timed out after 50ms joining on sleeper thread"); +} + +static void TryJoinOnSelf() { + Status s = ThreadJoiner(Thread::current_thread()).Join(); + // Use CHECK instead of ASSERT because gtest isn't thread-safe. + CHECK(s.IsInvalidArgument()); +} + +// Try to join on the thread that is currently running. +TEST_F(ThreadTest, TestJoinOnSelf) { + scoped_refptr<Thread> holder; + ASSERT_OK(Thread::Create("test", "test", TryJoinOnSelf, &holder)); + holder->Join(); + // Actual assertion is done by the thread spawned above. +} + +TEST_F(ThreadTest, TestDoubleJoinIsNoOp) { + scoped_refptr<Thread> holder; + ASSERT_OK(Thread::Create("test", "sleeper thread", usleep, 0, &holder)); + ThreadJoiner joiner(holder.get()); + ASSERT_OK(joiner.Join()); + ASSERT_OK(joiner.Join()); +} + + +namespace { + +void ExitHandler(string* s, const char* to_append) { + *s += to_append; +} + +void CallAtExitThread(string* s) { + Thread::current_thread()->CallAtExit(Bind(&ExitHandler, s, Unretained("hello 1, "))); + Thread::current_thread()->CallAtExit(Bind(&ExitHandler, s, Unretained("hello 2"))); +} + +} // anonymous namespace + +TEST_F(ThreadTest, TestCallOnExit) { + scoped_refptr<Thread> holder; + string s; + ASSERT_OK(Thread::Create("test", "TestCallOnExit", CallAtExitThread, &s, &holder)); + holder->Join(); + ASSERT_EQ("hello 1, hello 2", s); +} + +// The following tests only run in debug mode, since thread restrictions are no-ops +// in release builds. +#ifndef NDEBUG +TEST_F(ThreadTest, TestThreadRestrictions_IO) { + // Default should be to allow IO + ThreadRestrictions::AssertIOAllowed(); + + ThreadRestrictions::SetIOAllowed(false); + { + ThreadRestrictions::ScopedAllowIO allow_io; + ASSERT_TRUE(Env::Default()->FileExists("/")); + } + ThreadRestrictions::SetIOAllowed(true); + + // Disallow IO - doing IO should crash the process. + ASSERT_DEATH({ + ThreadRestrictions::SetIOAllowed(false); + ignore_result(Env::Default()->FileExists("/")); + }, + "Function marked as IO-only was called from a thread that disallows IO"); +} + +TEST_F(ThreadTest, TestThreadRestrictions_Waiting) { + // Default should be to allow IO + ThreadRestrictions::AssertWaitAllowed(); + + ThreadRestrictions::SetWaitAllowed(false); + { + ThreadRestrictions::ScopedAllowWait allow_wait; + CountDownLatch l(0); + l.Wait(); + } + ThreadRestrictions::SetWaitAllowed(true); + + // Disallow waiting - blocking on a latch should crash the process. + ASSERT_DEATH({ + ThreadRestrictions::SetWaitAllowed(false); + CountDownLatch l(0); + l.Wait(); + }, + "Waiting is not allowed to be used on this thread"); +} +#endif // NDEBUG + +} // namespace kudu
