This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new fd98e9e73 KUDU-3504 Crash master on subprocess death
fd98e9e73 is described below

commit fd98e9e7331a0e8fc6b091faa2d2744a7787e6d7
Author: Attila Bukor <[email protected]>
AuthorDate: Tue Aug 15 16:55:39 2023 +0200

    KUDU-3504 Crash master on subprocess death
    
    In the past, there were several instances when the Ranger subprocess
    crashed.  In these cases, the master happily went on, but failed to
    authorize requests. Since there's no way to restart the subprocess
    without restarting the master anyway, it's better to crash the master as
    well to make sure the failure of the subprocess is detected in time and
    can be addressed.
    
    As there are multiple concurrent calls to Subprocess::DoWait() now, this
    commit also changes some member variables to atomic to make sure it's
    thread-safe as TSAN complained about a data race.
    
    Change-Id: Iec516f3d684f152bd29874b60b810c526ee5a184
    Reviewed-on: http://gerrit.cloudera.org:8080/20365
    Tested-by: Kudu Jenkins
    Reviewed-by: Marton Greber <[email protected]>
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/ranger/ranger_client.cc       |  9 +++++++--
 src/kudu/subprocess/server.cc          | 33 +++++++++++++++++++++++++++++----
 src/kudu/subprocess/server.h           | 16 ++++++++++++++--
 src/kudu/subprocess/subprocess_proxy.h |  6 +++---
 src/kudu/util/subprocess.cc            | 13 ++++++++-----
 src/kudu/util/subprocess.h             |  7 ++++---
 6 files changed, 65 insertions(+), 19 deletions(-)

diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 318c0568f..7baac4c99 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -104,6 +104,10 @@ DEFINE_bool(ranger_logtostdout, false,
 TAG_FLAG(ranger_logtostdout, advanced);
 TAG_FLAG(ranger_logtostdout, evolving);
 
+DEFINE_bool(ranger_crash_master_on_subprocess_failure, true,
+            "Whether to crash the Master if the Ranger subprocess crashes.");
+TAG_FLAG(ranger_crash_master_on_subprocess_failure, advanced);
+
 DECLARE_int32(max_log_files);
 DECLARE_uint32(max_log_size);
 DECLARE_uint32(subprocess_max_message_size_bytes);
@@ -422,8 +426,9 @@ Status RangerClient::Start() {
   const string fifo_path = SubprocessServer::FifoPath(RangerFifoBase());
   vector<string> argv;
   RETURN_NOT_OK(BuildArgv(fifo_path, log_properties_path, &argv));
-  subprocess_.reset(new RangerSubprocess(env_, fifo_path, std::move(argv), 
metric_entity_,
-                                         "Ranger client subprocess"));
+  subprocess_.reset(new RangerSubprocess(env_, fifo_path, argv, metric_entity_,
+                                         "Ranger client subprocess",
+                                         
FLAGS_ranger_crash_master_on_subprocess_failure));
   return subprocess_->Start();
 }
 
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 94f2d9488..a0f0f1a1b 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -20,6 +20,7 @@
 #include <unistd.h>
 
 #include <csignal>
+#include <cstdlib>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -102,19 +103,21 @@ string SubprocessServer::FifoPath(const string& base) {
   return Substitute("$0.$1.$2", base, getpid(), Thread::CurrentThreadId());
 }
 
-SubprocessServer::SubprocessServer(Env* env, const string& receiver_file,
+SubprocessServer::SubprocessServer(Env* env, string receiver_file,
                                    vector<string> subprocess_argv,
-                                   SubprocessMetrics metrics)
+                                   SubprocessMetrics metrics,
+                                   bool exit_on_failure)
     : call_timeout_(MonoDelta::FromSeconds(FLAGS_subprocess_timeout_secs)),
       max_message_size_bytes_(FLAGS_subprocess_max_message_size_bytes),
       next_id_(1),
       closing_(1),
       env_(env),
-      receiver_file_(receiver_file),
+      receiver_file_(std::move(receiver_file)),
       process_(make_shared<Subprocess>(std::move(subprocess_argv))),
       outbound_call_queue_(FLAGS_subprocess_request_queue_size_bytes),
       inbound_response_queue_(FLAGS_subprocess_response_queue_size_bytes),
-      metrics_(std::move(metrics)) {
+      metrics_(std::move(metrics)),
+      exit_on_failure_(exit_on_failure) {
   process_->ShareParentStdin(false);
   process_->ShareParentStdout(true);
   process_->ShareParentStderr(true);
@@ -143,6 +146,9 @@ Status SubprocessServer::Init() {
                                [this, &cb]() { 
this->StartSubprocessThread(cb); },
                                &read_thread_));
   RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess");
+  RETURN_NOT_OK(Thread::Create("subprocess", "exit-checker",
+                               [this]() { this->ExitCheckerThread(); },
+                               &exit_checker_));
 
   // NOTE: callers should try to ensure each receiver file path is used by a
   // single subprocess.
@@ -211,6 +217,9 @@ void SubprocessServer::Shutdown() {
   // don't init there. Shutdown() is still called in this case from the
   // destructor though so these checks are necessary.
   if (process_->IsStarted()) {
+    // We're intentionally killing the process so the parent process shouldn't
+    // die when the child gets killed.
+    exit_on_failure_ = false;
     WARN_NOT_OK(process_->KillAndWait(SIGTERM), "failed to stop subprocess");
   }
   inbound_response_queue_.Shutdown();
@@ -230,6 +239,9 @@ void SubprocessServer::Shutdown() {
   if (start_thread_) {
     start_thread_->Join();
   }
+  if (exit_checker_) {
+    exit_checker_->Join();
+  }
   for (const auto& t : responder_threads_) {
     t->Join();
   }
@@ -384,6 +396,19 @@ void SubprocessServer::CheckDeadlinesThread() {
   }
 }
 
+void SubprocessServer::ExitCheckerThread() {
+  int status;
+  CHECK_OK(process_->Wait(&status));
+  string message = Substitute("The subprocess has exited with status $0",
+                              WIFEXITED(status) ? WEXITSTATUS(status) : 
WTERMSIG(status));
+
+  if (exit_on_failure_) {
+    LOG(FATAL) << message;
+  } else {
+    LOG(WARNING) << message;
+  }
+}
+
 void SubprocessServer::SendMessagesThread() {
   DCHECK(message_protocol_) << "message protocol is not initialized";
   Status s;
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index a5222e910..01af2fcaa 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -224,8 +224,9 @@ class SubprocessServer {
   // collisions between subprocesses started in different process and threads.
   static std::string FifoPath(const std::string& base);
 
-  SubprocessServer(Env* env, const std::string& receiver_file,
-      std::vector<std::string> subprocess_argv, SubprocessMetrics metrics);
+  SubprocessServer(Env* env, std::string receiver_file,
+      std::vector<std::string> subprocess_argv, SubprocessMetrics metrics,
+      bool exit_on_failure = false);
   virtual ~SubprocessServer();
 
   // Initialize the server, starting the subprocess and worker threads.
@@ -270,6 +271,9 @@ class SubprocessServer {
   // inbound response queue.
   void ReceiveMessagesThread();
 
+  // Starts the exit checker thread.
+  void ExitCheckerThread();
+
   // Fixed timeout to be used for each call.
   const MonoDelta call_timeout_;
 
@@ -312,6 +316,10 @@ class SubprocessServer {
   // and triggers their callbacks.
   scoped_refptr<Thread> deadline_checker_;
 
+  // Waits for the process to exit. If it exits with a non-zero exit code, it
+  // kills the main process if exit_on_failure_ is set to true.
+  scoped_refptr<Thread> exit_checker_;
+
   // Pull work off the response queue and trigger the associated callbacks if
   // appropriate.
   std::vector<scoped_refptr<Thread>> responder_threads_;
@@ -334,6 +342,10 @@ class SubprocessServer {
   // call's callback.
   simple_spinlock in_flight_lock_;
   std::map<CallId, std::shared_ptr<SubprocessCall>> call_by_id_;
+
+  // If set to true, kills the main process if the subprocess exits with a
+  // non-zero exit code.
+  std::atomic<bool> exit_on_failure_;
 };
 
 } // namespace subprocess
diff --git a/src/kudu/subprocess/subprocess_proxy.h 
b/src/kudu/subprocess/subprocess_proxy.h
index 37f668740..586c246b4 100644
--- a/src/kudu/subprocess/subprocess_proxy.h
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -54,9 +54,9 @@ template<class ReqPB, class RespPB, class MetricsPB>
 class SubprocessProxy {
  public:
   SubprocessProxy(Env* env, const std::string& receiver_file,
-                  std::vector<std::string> argv, const 
scoped_refptr<MetricEntity>& entity,
-                  std::string subprocess_name = "subprocess")
-      : server_(new SubprocessServer(env, receiver_file, std::move(argv), 
MetricsPB(entity))),
+                  const std::vector<std::string>& argv, const 
scoped_refptr<MetricEntity>& entity,
+                  std::string subprocess_name = "subprocess", bool 
exit_on_failure = true)
+      : server_(new SubprocessServer(env, receiver_file, argv, 
MetricsPB(entity), exit_on_failure)),
         subprocess_name_(std::move(subprocess_name)) {}
 
   // Starts the underlying subprocess.
diff --git a/src/kudu/util/subprocess.cc b/src/kudu/util/subprocess.cc
index c629e600e..2228b98f6 100644
--- a/src/kudu/util/subprocess.cc
+++ b/src/kudu/util/subprocess.cc
@@ -19,7 +19,6 @@
 
 #include <dirent.h>
 #include <fcntl.h>
-#include <signal.h>
 #if defined(__linux__)
 #include <sys/prctl.h>
 #endif
@@ -278,7 +277,7 @@ Subprocess::~Subprocess() {
   if (state_ == kRunning) {
     LOG(WARNING) << Substitute(
         "Child process $0 ($1) was orphaned. Sending signal $2...",
-        child_pid_, JoinStrings(argv_, " "), sig_on_destruct_);
+        child_pid_.load(), JoinStrings(argv_, " "), sig_on_destruct_);
     WARN_NOT_OK(KillAndWait(sig_on_destruct_),
                 Substitute("Failed to KillAndWait() with signal $0",
                            sig_on_destruct_));
@@ -321,7 +320,7 @@ static int pipe2(int pipefd[2], int flags) {
 Status Subprocess::Start() {
   VLOG(2) << "Invoking command: " << argv_;
   if (state_ != kNotStarted) {
-    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    const string err_str = Substitute("$0: illegal sub-process state", 
state_.load());
     LOG(DFATAL) << err_str;
     return Status::IllegalState(err_str);
   }
@@ -603,11 +602,15 @@ Status Subprocess::Kill(int signal) {
     LOG(DFATAL) << err_str;
     return Status::IllegalState(err_str);
   }
-  if (kill(child_pid_, signal) != 0) {
+  int ret = kill(child_pid_, signal);
+  if (ret != 0 && ret != ESRCH) {
     return Status::RuntimeError("Unable to kill",
                                 ErrnoToString(errno),
                                 errno);
   }
+  if (ret == ESRCH) {
+    LOG(WARNING) << Substitute("Process $0 ($1) has already exited", program_, 
child_pid_.load());
+  }
 
   // Signal delivery is often asynchronous. For some signals, we try to wait
   // for the process to actually change state, using /proc/<pid>/stat as a
@@ -804,7 +807,7 @@ Status Subprocess::DoWait(int* wait_status, WaitMode mode) {
     return Status::OK();
   }
   if (state_ != kRunning) {
-    const string err_str = Substitute("$0: illegal sub-process state", state_);
+    const string err_str = Substitute("$0: illegal sub-process state", 
state_.load());
     LOG(DFATAL) << err_str;
     return Status::IllegalState(err_str);
   }
diff --git a/src/kudu/util/subprocess.h b/src/kudu/util/subprocess.h
index d0561ef45..ed7b5b60c 100644
--- a/src/kudu/util/subprocess.h
+++ b/src/kudu/util/subprocess.h
@@ -17,9 +17,10 @@
 #ifndef KUDU_UTIL_SUBPROCESS_H
 #define KUDU_UTIL_SUBPROCESS_H
 
-#include <signal.h>
 #include <unistd.h>
 
+#include <atomic>
+#include <csignal>
 #include <map>
 #include <string>
 #include <vector>
@@ -209,8 +210,8 @@ class Subprocess {
   std::string program_;
   std::vector<std::string> argv_;
   std::map<std::string, std::string> env_;
-  State state_;
-  int child_pid_;
+  std::atomic<State> state_;
+  std::atomic<int> child_pid_;
   enum StreamMode fd_state_[3];
   int child_fds_[3];
   std::string cwd_;

Reply via email to