This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit da6211df5f8df0c53ceedd542b61634f3bab7205
Author: Ádám Bakai <[email protected]>
AuthorDate: Fri Nov 8 13:16:18 2024 +0100

    [subprocess] KUDU-3624 Fix DoWait thread-safety
    
    waitpid() does return with an error if it is called with a pid that was
    already shut down. So Subprocess::DoWait() stores the return value of
    previous waitpid execution and returns it instead of running it again.
    But in EchoSubprocessTest.TestSubprocessMetricsOnError it can happen
    that SubprocessServer::ExitCheckerThread() and Subprocess::KillAndWait()
    both call Subprocess::DoWait() and both of them call waitpid. And if
    ExitCheckerThread() calls it second, then it fails the following check:
    Check failed: _s.ok() Bad status: Runtime error: Unable
    to wait on child: No child processes (error 10)
    
    To fix this behaviour, wait_mutex_ is added. If a thread runs and
    calls waitpid(), other threads won't execute it in the same time. If
    locking is unsuccessful but the WaitMode is NON_BLOCKING, then return as
    if nothing happened. Unit test SubprocessTest.TestMultiThreadWait
    was added to verify executing two wait commands concurrently.
    
    Change-Id: I1cb540860b439c26e1c8529123c8b29940d9f84f
    Reviewed-on: http://gerrit.cloudera.org:8080/22056
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/util/subprocess-test.cc | 10 ++++++++++
 src/kudu/util/subprocess.cc      | 12 ++++++++++++
 src/kudu/util/subprocess.h       | 12 ++++++++++++
 3 files changed, 34 insertions(+)

diff --git a/src/kudu/util/subprocess-test.cc b/src/kudu/util/subprocess-test.cc
index f7058aea6..9a586582e 100644
--- a/src/kudu/util/subprocess-test.cc
+++ b/src/kudu/util/subprocess-test.cc
@@ -223,6 +223,16 @@ TEST_F(SubprocessTest, TestGetExitStatusExitSuccess) {
   ASSERT_STR_CONTAINS(exit_info, "process successfully exited");
 }
 
+TEST_F(SubprocessTest, TestMultiThreadWait) {
+  Subprocess p({ "/bin/sh", "-c", "sleep 1; exit 0" });
+  ASSERT_OK(p.Start());
+  thread subprocess_thread([&]() {
+    ASSERT_OK(p.Wait());
+  });
+  SCOPED_CLEANUP({ subprocess_thread.join(); });
+  ASSERT_OK(p.Wait());
+}
+
 TEST_F(SubprocessTest, TestGetExitStatusExitFailure) {
   static const vector<int> kStatusCodes = { 1, 255 };
   for (auto code : kStatusCodes) {
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index 8130c07b9..ff631dee0 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -799,6 +799,18 @@ pid_t Subprocess::pid() const {
 }
 
 Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
+  std::unique_lock lock(wait_mutex_, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    // Mutex wasn't locked. If this is a non blocking request, then it's fine 
to return as nothing
+    // happened.
+    if (mode == NON_BLOCKING) {
+      return Status::TimedOut("");
+    }
+    lock.lock();
+  }
+
+  // Now, we are in a locked state. It's important to check state_, because 
other threads
+  // may have updated it.
   if (state_ == kExited) {
     if (wait_status) {
       *wait_status = wait_status_;
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index ed7b5b60c..a24724b02 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -22,6 +22,7 @@
 #include <atomic>
 #include <csignal>
 #include <map>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -50,6 +51,10 @@ namespace kudu {
 //
 // Note that, when the Subprocess object is destructed, the child process
 // will be forcibly SIGKILLed to avoid orphaning processes.
+// Note that DoWait(), WaitNoBlock() and WaitAndCheckExitCode() are 
thread-safe to
+// each other, but Kill(), KillAndWait() and GetExitStatus() are not 
thread-safe
+// with the waiting functions if the subprocess is stopped on its own at 
approximately
+// the same time of the Kill() command.
 class Subprocess {
  public:
   // Constructs a new Subprocess that will execute 'argv' on Start().
@@ -102,6 +107,7 @@ class Subprocess {
   // NOTE: unlike the standard wait(2) call, this may be called multiple
   // times. If the process has exited, it will repeatedly return the same
   // exit code.
+  // Note: this is thread-safe with WaitNoBlock() and WaitAndCheckExitCode()
   Status Wait(int* wait_status = nullptr) WARN_UNUSED_RESULT;
 
   // Like the above, but does not block. This returns Status::TimedOut
@@ -111,10 +117,12 @@ class Subprocess {
   // NOTE: unlike the standard wait(2) call, this may be called multiple
   // times. If the process has exited, it will repeatedly return the same
   // exit code.
+  // Note: this is thread-safe with Wait() and WaitAndCheckExitCode()
   Status WaitNoBlock(int* wait_status = nullptr) WARN_UNUSED_RESULT;
 
   // Like Wait, but it also checks the exit code is 0. If it's not, or if it's
   // not a clean exit, it returns RemoteError.
+  // Note: this is thread-safe with WaitNoBlock() and Wait()
   Status WaitAndCheckExitCode() WARN_UNUSED_RESULT;
 
   // Send a signal to the subprocess.
@@ -220,6 +228,10 @@ class Subprocess {
   // Only valid if state_ == kExited.
   int wait_status_;
 
+  // Mutex to make sure that only one thread is running waitpid() and possibly 
updating
+  // Wait()/WaitNoBlock() cache with its result at any given time.
+  std::mutex wait_mutex_;
+
   // Custom signal to deliver when the subprocess goes out of scope, provided
   // the process hasn't already been killed.
   int sig_on_destruct_;

Reply via email to