This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 39bd47462d7c3d57f7207e9eebd4a8102484f568 Author: Andrew Wong <[email protected]> AuthorDate: Tue Mar 10 15:23:58 2020 -0700 subprocess: maintain a thread for fork/exec If SubprocessServer::Init() is called from a short-lived thread, because it does a fork/exec, the subprocess will silently be reaped when the thread exits. Specifically, we saw this happening upon initializing the CatalogManager, which happens in a short-lived thread, when starting up the Ranger client. This surfaced as us getting an EOF from the subprocess receiver thread. Change-Id: I803b1613ef1a988df1da4c908c2c37e1fbbdcf81 Reviewed-on: http://gerrit.cloudera.org:8080/15398 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> --- src/kudu/subprocess/server.cc | 20 +++++++++++++++++++- src/kudu/subprocess/server.h | 7 +++++++ src/kudu/subprocess/subprocess_server-test.cc | 19 +++++++++++++++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc index 3c7607b..6687e3b 100644 --- a/src/kudu/subprocess/server.cc +++ b/src/kudu/subprocess/server.cc @@ -100,9 +100,23 @@ SubprocessServer::~SubprocessServer() { Shutdown(); } +void SubprocessServer::StartSubprocessThread(const StdStatusCallback& cb) { + Status s = process_->Start(); + cb(s); + if (PREDICT_TRUE(s.ok())) { + // If we successfully started the process, we should stay alive until we + // shut down. + closing_.Wait(); + } +} + Status SubprocessServer::Init() { VLOG(2) << "Starting the subprocess"; - RETURN_NOT_OK_PREPEND(process_->Start(), "Failed to start subprocess"); + Synchronizer sync; + auto cb = sync.AsStdStatusCallback(); + RETURN_NOT_OK(Thread::Create("subprocess", "start", &SubprocessServer::StartSubprocessThread, + this, cb, &read_thread_)); + RETURN_NOT_OK_PREPEND(sync.Wait(), "Failed to start subprocess"); // Start the message protocol. CHECK(!message_protocol_); @@ -172,6 +186,9 @@ void SubprocessServer::Shutdown() { if (deadline_checker_) { deadline_checker_->Join(); } + if (start_thread_) { + start_thread_->Join(); + } for (const auto& t : responder_threads_) { t->Join(); } @@ -197,6 +214,7 @@ void SubprocessServer::ReceiveMessagesThread() { if (s.IsEndOfFile()) { // The underlying pipe was closed. We're likely shutting down. DCHECK_EQ(0, closing_.count()); + LOG(INFO) << "Received an EOF from the subprocess"; return; } // TODO(awong): getting an error here indicates that this server and the diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h index 512966b..3c33e5b 100644 --- a/src/kudu/subprocess/server.h +++ b/src/kudu/subprocess/server.h @@ -230,6 +230,8 @@ class SubprocessServer { private: FRIEND_TEST(SubprocessServerTest, TestCallsReturnWhenShuttingDown); + void StartSubprocessThread(const StdStatusCallback& cb); + // Stop the subprocess and stop processing messages. void Shutdown(); @@ -274,6 +276,11 @@ class SubprocessServer { // Protocol with which to send and receive bytes to and from 'process_'. std::shared_ptr<SubprocessProtocol> message_protocol_; + // Thread that runs the subprocess. Since the subprocess is run via + // fork/exec, this thread must stay alive for the lifetime of the server. + // Otherwise, the OS may silently kill the spawned child process. + scoped_refptr<Thread> start_thread_; + // Pulls requests off the request queue and serializes them via the // message protocol. scoped_refptr<Thread> write_thread_; diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc index d14293f..857d61f 100644 --- a/src/kudu/subprocess/subprocess_server-test.cc +++ b/src/kudu/subprocess/subprocess_server-test.cc @@ -33,6 +33,7 @@ #include "kudu/subprocess/subprocess.pb.h" #include "kudu/util/env.h" #include "kudu/util/metrics.h" +#include "kudu/util/monotime.h" #include "kudu/util/path_util.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/status.h" @@ -260,6 +261,24 @@ TEST_F(SubprocessServerTest, TestCallsReturnWhenShuttingDown) { ASSERT_FALSE(s.ok()); } +// Some usage of a subprocess warrants calling Init() from a short-lived +// thread. Let's ensure there's no funny business when that happens (e.g. +// ensure the OS doesn't reap the underlying process when the parent thread +// exits). +TEST_F(SubprocessServerTest, TestInitFromThread) { + Status s; + thread t([&] { + s = ResetSubprocessServer(); + }); + t.join(); + ASSERT_OK(s); + // Wait a bit to give time for the OS to wreak havoc (though it shouldn't). + SleepFor(MonoDelta::FromSeconds(3)); + SubprocessRequestPB request = CreateEchoSubprocessRequestPB(kHello); + SubprocessResponsePB response; + ASSERT_OK(server_->Execute(&request, &response)); +} + } // namespace subprocess } // namespace kudu
