Refactored the gRPC client runtime wrapper in libprocess. The refactoring does the following things: 1. Manage the gRPC completion queue and the looper thread in the runtime process to get rid of a lock in `Runtime::Data`. 2. Move the computation of sending a request into the runtime process. 3. Let libprocess manage the runtime process automatically instead of managing its lifecycle in `Runtime::Data`.
Review: https://reviews.apache.org/r/67157 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db9b1738 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db9b1738 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db9b1738 Branch: refs/heads/master Commit: db9b17389bf4e8da62a5aa99958c16acd72bdb12 Parents: 4bfe2db Author: Chun-Hung Hsiao <[email protected]> Authored: Wed May 16 11:24:01 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Fri Jun 8 15:09:20 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/grpc.hpp | 203 +++++++++++++--------- 3rdparty/libprocess/src/grpc.cpp | 113 ++++++++---- 2 files changed, 196 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/include/process/grpc.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp index cf165f0..0ff8184 100644 --- a/3rdparty/libprocess/include/process/grpc.hpp +++ b/3rdparty/libprocess/include/process/grpc.hpp @@ -13,9 +13,9 @@ #ifndef __PROCESS_GRPC_HPP__ #define __PROCESS_GRPC_HPP__ -#include <atomic> #include <chrono> #include <memory> +#include <string> #include <thread> #include <type_traits> #include <utility> @@ -24,13 +24,15 @@ #include <grpcpp/grpcpp.h> +#include <process/check.hpp> +#include <process/dispatch.hpp> #include <process/future.hpp> -#include <process/owned.hpp> +#include <process/pid.hpp> #include <process/process.hpp> -#include <stout/duration.hpp> +#include <stout/error.hpp> #include <stout/lambda.hpp> -#include <stout/synchronized.hpp> +#include <stout/nothing.hpp> #include <stout/try.hpp> @@ -111,17 +113,16 @@ public: /** - * A copyable interface to manage an internal gRPC runtime instance for - * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC - * `CompletionQueue` to manage outstanding requests, a looper thread to - * wait for any incoming responses from the `CompletionQueue`, and a - * process to handle the responses. All `Runtime` copies share the same - * gRPC runtime instance. Usually we only need a single gRPC runtime - * instance to handle all gRPC calls, but multiple instances can be - * instantiated for more parallelism or isolation. - * NOTE: The destruction of the internal gRPC runtime instance is a - * blocking operation: it waits for the managed process to terminate. - * The user should ensure that this only happens at shutdown. + * A copyable interface to manage an internal runtime process for asynchronous + * gRPC calls. A runtime process keeps a gRPC `CompletionQueue` to manage + * outstanding requests, a looper thread to wait for any incoming responses from + * the `CompletionQueue`, and handles the requests and responses. All `Runtime` + * copies share the same runtime process. Usually we only need a single runtime + * process to handle all gRPC calls, but multiple runtime processes can be + * instantiated for better parallelism and isolation. + * + * NOTE: The caller must call `terminate` to drain the `CompletionQueue` before + * finalizing libprocess to gracefully terminate the gRPC runtime. */ class Runtime { @@ -138,7 +139,7 @@ public: * returned for the call, so the caller can handle the error programmatically. * * @param connection A connection to a gRPC server. - * @param rpc The asynchronous gRPC call to make. This can be obtained + * @param method The asynchronous gRPC call to make. This should be obtained * by the `GRPC_CLIENT_METHOD(service, rpc)` macro. * @param request The request protobuf for the gRPC call. * @return a `Future` of `Try` waiting for a response protobuf or an error. @@ -148,60 +149,72 @@ public: typename Request = typename internal::MethodTraits<Method>::request_type, typename Response = - typename internal::MethodTraits<Method>::response_type> + typename internal::MethodTraits<Method>::response_type, + typename std::enable_if< + std::is_convertible< + typename std::decay<Request>::type*, + google::protobuf::Message*>::value, + int>::type = 0> Future<Try<Response, StatusError>> call( const Connection& connection, Method&& method, - const Request& request) + Request&& request) { - static_assert( - std::is_convertible<Request*, google::protobuf::Message*>::value, - "Request must be a protobuf message"); - - synchronized (data->lock) { - if (data->terminating) { - return Failure("Runtime has been terminated."); - } - - std::shared_ptr<::grpc::ClientContext> context( - new ::grpc::ClientContext()); - - // TODO(chhsiao): Allow the caller to specify a timeout. - context->set_deadline( - std::chrono::system_clock::now() + std::chrono::seconds(5)); - - // Enable the gRPC wait-for-ready semantics by default. See: - // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md - // TODO(chhsiao): Allow the caller to set the option. - context->set_wait_for_ready(true); - - // Create a `Promise` and a callback lambda as a tag and invokes - // an asynchronous gRPC call through the `CompletionQueue` - // managed by `data`. The `Promise` will be set by the callback - // upon server response. - std::shared_ptr<Promise<Try<Response, StatusError>>> promise( - new Promise<Try<Response, StatusError>>); - - promise->future().onDiscard([=] { context->TryCancel(); }); - - std::shared_ptr<Response> response(new Response()); - std::shared_ptr<::grpc::Status> status(new ::grpc::Status()); - - std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader = - (typename internal::MethodTraits<Method>::stub_type( - connection.channel).*method)(context.get(), request, &data->queue); - - reader->StartCall(); - reader->Finish( - response.get(), - status.get(), - new lambda::function<void()>( - // NOTE: `context` and `reader` need to be held on in - // order to get updates for the ongoing RPC, and thus - // are captured here. The lambda itself will later be - // retrieved and managed in `Data::loop()`. + // Create a `Promise` that will be set upon receiving a response. + // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only + // to be captured by the lambda below. Use a `unique_ptr` once we get C++14. + std::shared_ptr<Promise<Try<Response, StatusError>>> promise( + new Promise<Try<Response, StatusError>>); + Future<Try<Response, StatusError>> future = promise->future(); + + // Send the request in the internal runtime process. + // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an + // extra copy. We should capture it by forwarding once we get C++14. + dispatch(data->pid, &RuntimeProcess::send, std::bind( + [connection, method, promise]( + const Request& request, + bool terminating, + ::grpc::CompletionQueue* queue) { + if (terminating) { + promise->fail("Runtime has been terminated"); + return; + } + + // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be + // captured by the lambda below. Use `unique_ptr`s once we get C++14. + std::shared_ptr<::grpc::ClientContext> context( + new ::grpc::ClientContext()); + + // TODO(chhsiao): Allow the caller to specify a timeout. + context->set_deadline( + std::chrono::system_clock::now() + std::chrono::seconds(5)); + + // Enable the gRPC wait-for-ready semantics by default. See: + // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md + // TODO(chhsiao): Allow the caller to set the option. + context->set_wait_for_ready(true); + + promise->future().onDiscard([=] { context->TryCancel(); }); + + std::shared_ptr<Response> response(new Response()); + std::shared_ptr<::grpc::Status> status(new ::grpc::Status()); + + std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader = + (typename internal::MethodTraits<Method>::stub_type( + connection.channel).*method)(context.get(), request, queue); + + reader->StartCall(); + + // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for + // the current asynchronous gRPC call. The callback will set up the + // above `Promise` upon receiving a response. + // NOTE: `context` and `reader` need to be held on in order to get + // updates for the ongoing RPC, and thus are captured here. The + // callback itself will later be retrieved and managed in the + // looper thread. + void* tag = new ReceiveCallback( [context, reader, response, status, promise]() { - CHECK(promise->future().isPending()); + CHECK_PENDING(promise->future()); if (promise->future().hasDiscard()) { promise->discard(); } else { @@ -209,44 +222,70 @@ public: ? std::move(*response) : Try<Response, StatusError>::error(std::move(*status))); } - })); + }); - return promise->future(); - } + reader->Finish(response.get(), status.get(), tag); + }, + std::forward<Request>(request), + lambda::_1, + lambda::_2)); + + return future; } /** - * Asks the internal gRPC runtime instance to shut down the - * `CompletionQueue`, which would stop its looper thread, drain and - * fail all pending gRPC calls in the `CompletionQueue`, then - * asynchronously join the looper thread. + * Asks the internal runtime process to shut down the `CompletionQueue`, which + * would asynchronously drain and fail all pending gRPC calls in the + * `CompletionQueue`, then join the looper thread. */ void terminate(); /** * @return A `Future` waiting for all pending gRPC calls in the - * `CompletionQueue` of the internal gRPC runtime instance to be - * drained and the looper thread to be joined. + * `CompletionQueue` of the internal runtime process to be drained and the + * looper thread to be joined. */ Future<Nothing> wait(); private: - struct Data + // Type of the callback functions that can get invoked when sending a request + // or receiving a response. + typedef lambda::CallableOnce< + void(bool, ::grpc::CompletionQueue*)> SendCallback; + typedef lambda::CallableOnce<void()> ReceiveCallback; + + class RuntimeProcess : public Process<RuntimeProcess> { - Data(); - ~Data(); + public: + RuntimeProcess(); + virtual ~RuntimeProcess(); - void loop(); + void send(SendCallback callback); + void receive(ReceiveCallback callback); void terminate(); + Future<Nothing> wait(); + + private: + void initialize() override; + void finalize() override; + + void loop(); - std::unique_ptr<std::thread> looper; ::grpc::CompletionQueue queue; - ProcessBase process; - std::atomic_flag lock = ATOMIC_FLAG_INIT; - bool terminating = false; + std::unique_ptr<std::thread> looper; + bool terminating; Promise<Nothing> terminated; }; + struct Data + { + Data(); + ~Data(); + + PID<RuntimeProcess> pid; + Future<Nothing> terminated; + }; + std::shared_ptr<Data> data; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/db9b1738/3rdparty/libprocess/src/grpc.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/grpc.cpp b/3rdparty/libprocess/src/grpc.cpp index a80bcb6..4e4f989 100644 --- a/3rdparty/libprocess/src/grpc.cpp +++ b/3rdparty/libprocess/src/grpc.cpp @@ -10,85 +10,122 @@ // See the License for the specific language governing permissions and // limitations under the License -#include <process/dispatch.hpp> #include <process/grpc.hpp> + +#include <process/dispatch.hpp> #include <process/id.hpp> +#include <process/process.hpp> namespace process { namespace grpc { - namespace client { void Runtime::terminate() { - data->terminate(); + dispatch(data->pid, &RuntimeProcess::terminate); } Future<Nothing> Runtime::wait() { - return data->terminated.future(); + return data->terminated; } -Runtime::Data::Data() - : process(ID::generate("__grpc_client__")) +Runtime::RuntimeProcess::RuntimeProcess() + : ProcessBase(ID::generate("__grpc_client__")), terminating(false) {} + + +Runtime::RuntimeProcess::~RuntimeProcess() +{ + CHECK(!looper); +} + + +void Runtime::RuntimeProcess::send(SendCallback callback) +{ + std::move(callback)(terminating, &queue); +} + + +void Runtime::RuntimeProcess::receive(ReceiveCallback callback) +{ + std::move(callback)(); +} + + +void Runtime::RuntimeProcess::terminate() { - spawn(process); + if (!terminating) { + terminating = true; + queue.Shutdown(); + } +} + +Future<Nothing> Runtime::RuntimeProcess::wait() +{ + return terminated.future(); +} + + +void Runtime::RuntimeProcess::initialize() +{ // The looper thread can only be created here since it need to happen // after `queue` is initialized. - looper.reset(new std::thread(&Runtime::Data::loop, this)); + CHECK(!looper); + looper.reset(new std::thread(&RuntimeProcess::loop, this)); } -Runtime::Data::~Data() +void Runtime::RuntimeProcess::finalize() { - terminate(); - process::wait(process); + CHECK(terminating) << "Runtime has not yet been terminated"; + + // NOTE: This is a blocking call. However, the thread is guaranteed + // to be exiting, therefore the amount of blocking time should be + // short (just like other syscalls we invoke). + looper->join(); + looper.reset(); + terminated.set(Nothing()); } -void Runtime::Data::loop() +void Runtime::RuntimeProcess::loop() { void* tag; bool ok; while (queue.Next(&tag, &ok)) { - // The returned callback object is managed by the `callback` shared - // pointer, so if we get a regular event from the `CompletionQueue`, - // then the object would be captured by the following lambda - // dispatched to `process`; otherwise it would be reclaimed here. - std::shared_ptr<lambda::function<void()>> callback( - reinterpret_cast<lambda::function<void()>*>(tag)); - if (ok) { - dispatch(process, [=] { (*callback)(); }); - } + // Currently only unary RPCs are supported, so `ok` should always be true. + // See: https://grpc.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a86d9810ced694e50f7987ac90b9f8c1a // NOLINT + CHECK(ok); + + // Obtain the tag as a `ReceiveCallback` and dispatch it to the runtime + // process. The tag is then reclaimed here. + ReceiveCallback* callback = reinterpret_cast<ReceiveCallback*>(tag); + dispatch(self(), &RuntimeProcess::receive, std::move(*callback)); + delete callback; } - dispatch(process, [this] { - // NOTE: This is a blocking call. However, the thread is guaranteed - // to be exiting, therefore the amount of blocking time should be - // short (just like other syscalls we invoke). - looper->join(); - // Terminate `process` after all events are drained. - process::terminate(process, false); - terminated.set(Nothing()); - }); + // Terminate self after all events are drained. + process::terminate(self(), false); } -void Runtime::Data::terminate() +Runtime::Data::Data() { - synchronized (lock) { - if (!terminating) { - terminating = true; - queue.Shutdown(); - } - } + RuntimeProcess* process = new RuntimeProcess(); + terminated = process->wait(); + pid = spawn(process, true); } -} // namespace client { +Runtime::Data::~Data() +{ + dispatch(pid, &RuntimeProcess::terminate); +} + +} // namespace client { } // namespace grpc { } // namespace process {
