Repository: mesos Updated Branches: refs/heads/1.6.x d565c179b -> f86c9f8ac
Reduced likelihood of a stack overflow in libprocess socket recv path. Currently, the socket recv path is implemented using an asynchronous loop with callbacks. Without using `process::loop`, this pattern is prone to a stack overflow in the case that all asynchronous calls complete synchronously. This is possible with sockets if the socket is always ready for reading. The crash has been reported in MESOS-9024, so the stack overflow has been encountered in practice. This patch updates the recv path to leverage `process::loop`, which is supposed to prevent stack overflows in asynchronous loops. However, it is still possible for `process::loop` to stack overflow due to MESOS-8852. In practice, I expect that even without MESOS-8852 fixed, users won't see any stack overflows in the recv path. Review: https://reviews.apache.org/r/67824 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b01c0a8a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b01c0a8a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b01c0a8a Branch: refs/heads/1.6.x Commit: b01c0a8a8be41b5851d0dd51f4595081abd00bd3 Parents: d565c17 Author: Benjamin Mahler <[email protected]> Authored: Tue Jul 3 16:54:11 2018 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Tue Jul 3 17:34:15 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/src/process.cpp | 106 ++++++++++++++----------------- 1 file changed, 46 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b01c0a8a/3rdparty/libprocess/src/process.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp index ef85966..90eb0c5 100644 --- a/3rdparty/libprocess/src/process.cpp +++ b/3rdparty/libprocess/src/process.cpp @@ -790,63 +790,60 @@ static Future<MessageEvent*> parse(const Request& request) namespace internal { -void decode_recv( - const Future<size_t>& length, - char* data, - size_t size, - Socket socket, - StreamingRequestDecoder* decoder) +void receive(Socket socket) { - if (length.isDiscarded() || length.isFailed()) { - if (length.isFailed()) { - VLOG(1) << "Decode failure: " << length.failure(); - } + StreamingRequestDecoder* decoder = new StreamingRequestDecoder(); - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + const size_t size = 80 * 1024; + char* data = new char[size]; - if (length.get() == 0) { - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + Future<Nothing> recv_loop = process::loop( + None(), + [=] { + return socket.recv(data, size); + }, + [=](size_t length) -> Future<ControlFlow<Nothing>> { + if (length == 0) { + return Break(); // EOF. + } - // Decode as much of the data as possible into HTTP requests. - const deque<Request*> requests = decoder->decode(data, length.get()); + // Decode as much of the data as possible into HTTP requests. + const deque<Request*> requests = decoder->decode(data, length); - if (requests.empty() && decoder->failed()) { - VLOG(1) << "Decoder error while receiving"; - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + if (requests.empty() && decoder->failed()) { + return Failure("Decoder error"); + } - if (!requests.empty()) { - // Get the peer address to augment the requests. - Try<Address> address = socket.peer(); + if (!requests.empty()) { + // Get the peer address to augment the requests. + Try<Address> address = socket.peer(); - if (address.isError()) { - VLOG(1) << "Failed to get peer address while receiving: " - << address.error(); - socket_manager->close(socket); - delete[] data; - delete decoder; - return; - } + if (address.isError()) { + return Failure("Failed to get peer address: " + address.error()); + } + + foreach (Request* request, requests) { + request->client = address.get(); + process_manager->handle(socket, request); + } + } - foreach (Request* request, requests) { - request->client = address.get(); - process_manager->handle(socket, request); + return Continue(); + }); + + recv_loop.onAny([=](const Future<Nothing> f) { + if (f.isFailed()) { + Try<Address> peer = socket.peer(); + + VLOG(1) << "Failure while receiving from peer '" + << (peer.isSome() ? stringify(peer.get()) : "unknown") + << "': " << f.failure(); } - } - socket.recv(data, size) - .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, decoder)); + socket_manager->close(socket); + delete[] data; + delete decoder; + }); } } // namespace internal { @@ -909,19 +906,8 @@ void on_accept(const Future<Socket>& socket) // Inform the socket manager for proper bookkeeping. socket_manager->accepted(socket.get()); - const size_t size = 80 * 1024; - char* data = new char[size]; - - StreamingRequestDecoder* decoder = new StreamingRequestDecoder(); - - socket->recv(data, size) - .onAny(lambda::bind( - &internal::decode_recv, - lambda::_1, - data, - size, - socket.get(), - decoder)); + // Start the receive loop for the socket. + receive(socket.get()); } // NOTE: `__s__` may be cleaned up during `process::finalize`.
