Repository: mesos Updated Branches: refs/heads/1.5.x b0ef216b4 -> 19f65537f
Reduced likelihood of a stack overflow in libprocess socket recv path. Currently, the socket recv path is implemented using an asynchronous loop with callbacks. Without using `process::loop`, this pattern is prone to a stack overflow in the case that all asynchronous calls complete synchronously. This is possible with sockets if the socket is always ready for reading. The crash has been reported in MESOS-9024, so the stack overflow has been encountered in practice. This patch updates the recv path to leverage `process::loop`, which is supposed to prevent stack overflows in asynchronous loops. However, it is still possible for `process::loop` to stack overflow due to MESOS-8852. In practice, I expect that even without MESOS-8852 fixed, users won't see any stack overflows in the recv path. Review: https://reviews.apache.org/r/67824 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e5229d53 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e5229d53 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e5229d53 Branch: refs/heads/1.5.x Commit: e5229d531e2c62a50024c6e86445bd517779997a Parents: b0ef216 Author: Benjamin Mahler <[email protected]> Authored: Tue Jul 3 16:54:11 2018 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue Jul 3 17:36:40 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 106 ++++++++++++++----------------- 1 file changed, 46 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e5229d53/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index 631119c..879f623 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -763,63 +763,60 @@ static Future<MessageEvent*> parse(const Request& request) namespace internal { -void decode_recv( - const Future<size_t>& length, - char* data, - size_t size, - Socket socket, - StreamingRequestDecoder* decoder) +void receive(Socket socket) { - if (length.isDiscarded() || length.isFailed()) { - if (length.isFailed()) { - VLOG(1) << "Decode failure: " << length.failure(); - } + StreamingRequestDecoder* decoder = new StreamingRequestDecoder(); - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + const size_t size = 80 * 1024; + char* data = new char[size]; - if (length.get() == 0) { - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + Future<Nothing> recv_loop = process::loop( + None(), + [=] { + return socket.recv(data, size); + }, + [=](size_t length) -> Future<ControlFlow<Nothing>> { + if (length == 0) { + return Break(); // EOF. + } - // Decode as much of the data as possible into HTTP requests. - const deque<Request*> requests = decoder->decode(data, length.get()); + // Decode as much of the data as possible into HTTP requests. + const deque<Request*> requests = decoder->decode(data, length); - if (requests.empty() && decoder->failed()) { - VLOG(1) << "Decoder error while receiving"; - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + if (requests.empty() && decoder->failed()) { + return Failure("Decoder error"); + } - if (!requests.empty()) { - // Get the peer address to augment the requests. - Try<Address> address = socket.peer(); + if (!requests.empty()) { + // Get the peer address to augment the requests. + Try<Address> address = socket.peer(); - if (address.isError()) { - VLOG(1) << "Failed to get peer address while receiving: " - << address.error(); - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + if (address.isError()) { + return Failure("Failed to get peer address: " + address.error()); + } + + foreach (Request* request, requests) { + request->client = address.get(); + process_manager->handle(socket, request); + } + } - foreach (Request* request, requests) { - request->client = address.get(); - process_manager->handle(socket, request); + return Continue(); + }); + + recv_loop.onAny([=](const Future<Nothing> f) { + if (f.isFailed()) { + Try<Address> peer = socket.peer(); + + VLOG(1) << "Failure while receiving from peer '" + << (peer.isSome() ? stringify(peer.get()) : "unknown") + << "': " << f.failure(); } - } - socket.recv(data, size) - .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, decoder)); + socket_manager->close(socket); + delete[] data; + delete decoder; + }); } } // namespace internal { @@ -871,19 +868,8 @@ void on_accept(const Future<Socket>& socket) // Inform the socket manager for proper bookkeeping. socket_manager->accepted(socket.get()); - const size_t size = 80 * 1024; - char* data = new char[size]; - - StreamingRequestDecoder* decoder = new StreamingRequestDecoder(); - - socket.get().recv(data, size) - .onAny(lambda::bind( - &internal::decode_recv, - lambda::_1, - data, - size, - socket.get(), - decoder)); + // Start the receive loop for the socket. + receive(socket.get()); } else { LOG(INFO) << "Failed to accept socket: " << (socket.isFailed() ? socket.failure() : "future discarded");
