This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit da6211df5f8df0c53ceedd542b61634f3bab7205 Author: Ádám Bakai <[email protected]> AuthorDate: Fri Nov 8 13:16:18 2024 +0100 [subprocess] KUDU-3624 Fix DoWait thread-safety waitpid() does return with an error if it is called with a pid that was already shut down. So Subprocess::DoWait() stores the return value of previous waitpid execution and returns it instead of running it again. But in EchoSubprocessTest.TestSubprocessMetricsOnError it can happen that SubprocessServer::ExitCheckerThread() and Subprocess::KillAndWait() both call Subprocess::DoWait() and both of them call waitpid. And if ExitCheckerThread() calls it second, then it fails the following check: Check failed: _s.ok() Bad status: Runtime error: Unable to wait on child: No child processes (error 10) To fix this behaviour, wait_mutex_ is added. If a thread runs and calls waitpid(), other threads won't execute it in the same time. If locking is unsuccessful but the WaitMode is NON_BLOCKING, then return as if nothing happened. Unit test SubprocessTest.TestMultiThreadWait was added to verify executing two wait commands concurrently. Change-Id: I1cb540860b439c26e1c8529123c8b29940d9f84f Reviewed-on: http://gerrit.cloudera.org:8080/22056 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> --- src/kudu/util/subprocess-test.cc | 10 ++++++++++ src/kudu/util/subprocess.cc | 12 ++++++++++++ src/kudu/util/subprocess.h | 12 ++++++++++++ 3 files changed, 34 insertions(+) diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc index f7058aea6..9a586582e 100644 --- a/src/kudu/util/subprocess-test.cc +++ b/src/kudu/util/subprocess-test.cc @@ -223,6 +223,16 @@ TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) { ASSERT_STR_CONTAINS(exit_info, "process successfully exited"); } +TEST_F(SubprocessTest, TestMultiThreadWait) { + Subprocess p({ "/bin/sh", "-c", "sleep 1; exit 0" }); + ASSERT_OK(p.Start()); + thread subprocess_thread([&]() { + ASSERT_OK(p.Wait()); + }); + SCOPED_CLEANUP({ subprocess_thread.join(); }); + ASSERT_OK(p.Wait()); +} + TEST_F(SubprocessTest, TestGetExitStatusExitFailure) { static const vector<int> kStatusCodes = { 1, 255 }; for (auto code : kStatusCodes) { diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc index 8130c07b9..ff631dee0 100644 --- a/src/kudu/util/subprocess.cc +++ b/src/kudu/util/subprocess.cc @@ -799,6 +799,18 @@ pid_t Subprocess::pid() const { } Status Subprocess::DoWait(int* wait_status, WaitMode mode) { + std::unique_lock lock(wait_mutex_, std::try_to_lock); + if (!lock.owns_lock()) { + // Mutex wasn't locked. If this is a non blocking request, then it's fine to return as nothing + // happened. + if (mode == NON_BLOCKING) { + return Status::TimedOut(""); + } + lock.lock(); + } + + // Now, we are in a locked state. It's important to check state_, because other threads + // may have updated it. if (state_ == kExited) { if (wait_status) { *wait_status = wait_status_; diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h index ed7b5b60c..a24724b02 100644 --- a/src/kudu/util/subprocess.h +++ b/src/kudu/util/subprocess.h @@ -22,6 +22,7 @@ #include <atomic> #include <csignal> #include <map> +#include <mutex> #include <string> #include <vector> @@ -50,6 +51,10 @@ namespace kudu { // // Note that, when the Subprocess object is destructed, the child process // will be forcibly SIGKILLed to avoid orphaning processes. +// Note that DoWait(), WaitNoBlock() and WaitAndCheckExitCode() are thread-safe to +// each other, but Kill(), KillAndWait() and GetExitStatus() are not thread-safe +// with the waiting functions if the subprocess is stopped on its own at approximately +// the same time of the Kill() command. class Subprocess { public: // Constructs a new Subprocess that will execute 'argv' on Start(). @@ -102,6 +107,7 @@ class Subprocess { // NOTE: unlike the standard wait(2) call, this may be called multiple // times. If the process has exited, it will repeatedly return the same // exit code. + // Note: this is thread-safe with WaitNoBlock() and WaitAndCheckExitCode() Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT; // Like the above, but does not block. This returns Status::TimedOut @@ -111,10 +117,12 @@ class Subprocess { // NOTE: unlike the standard wait(2) call, this may be called multiple // times. If the process has exited, it will repeatedly return the same // exit code. + // Note: this is thread-safe with Wait() and WaitAndCheckExitCode() Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT; // Like Wait, but it also checks the exit code is 0. If it's not, or if it's // not a clean exit, it returns RemoteError. + // Note: this is thread-safe with WaitNoBlock() and Wait() Status WaitAndCheckExitCode() WARN_UNUSED_RESULT; // Send a signal to the subprocess. @@ -220,6 +228,10 @@ class Subprocess { // Only valid if state_ == kExited. int wait_status_; + // Mutex to make sure that only one thread is running waitpid() and possibly updating + // Wait()/WaitNoBlock() cache with its result at any given time. + std::mutex wait_mutex_; + // Custom signal to deliver when the subprocess goes out of scope, provided // the process hasn't already been killed. int sig_on_destruct_;
