Repository: mesos
Updated Branches:
  refs/heads/master 3052d05e9 -> ab10f8310


Reduced likelihood of a stack overflow in libprocess socket recv path.

Currently, the socket recv path is implemented using an asynchronous
loop with callbacks. Without using `process::loop`, this pattern is
prone to a stack overflow in the case that all asynchronous calls
complete synchronously. This is possible with sockets if the socket
is always ready for reading. The crash has been reported in MESOS-9024,
so the stack overflow has been encountered in practice.

This patch updates the recv path to leverage `process::loop`, which
is supposed to prevent stack overflows in asynchronous loops. However,
it is still possible for `process::loop` to stack overflow due to
MESOS-8852. In practice, I expect that even without MESOS-8852 fixed,
users won't see any stack overflows in the recv path.

Review: https://reviews.apache.org/r/67824


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab10f831
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab10f831
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab10f831

Branch: refs/heads/master
Commit: ab10f8310a735c3119f22dd3d9e636dc9cc38562
Parents: 3052d05
Author: Benjamin Mahler <[email protected]>
Authored: Tue Jul 3 16:54:11 2018 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Tue Jul 3 17:33:39 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 106 ++++++++++++++-----------------
 1 file changed, 46 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ab10f831/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index eb9613f..b3bd028 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -790,63 +790,60 @@ static Future<MessageEvent*> parse(const Request& request)
 
 namespace internal {
 
-void decode_recv(
-    const Future<size_t>& length,
-    char* data,
-    size_t size,
-    Socket socket,
-    StreamingRequestDecoder* decoder)
+void receive(Socket socket)
 {
-  if (length.isDiscarded() || length.isFailed()) {
-    if (length.isFailed()) {
-      VLOG(1) << "Decode failure: " << length.failure();
-    }
+  StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
 
-    socket_manager->close(socket);
-    delete[] data;
-    delete decoder;
-    return;
-  }
+  const size_t size = 80 * 1024;
+  char* data = new char[size];
 
-  if (length.get() == 0) {
-    socket_manager->close(socket);
-    delete[] data;
-    delete decoder;
-    return;
-  }
+  Future<Nothing> recv_loop = process::loop(
+      None(),
+      [=] {
+        return socket.recv(data, size);
+      },
+      [=](size_t length) -> Future<ControlFlow<Nothing>> {
+        if (length == 0) {
+          return Break(); // EOF.
+        }
 
-  // Decode as much of the data as possible into HTTP requests.
-  const deque<Request*> requests = decoder->decode(data, length.get());
+        // Decode as much of the data as possible into HTTP requests.
+        const deque<Request*> requests = decoder->decode(data, length);
 
-  if (requests.empty() && decoder->failed()) {
-     VLOG(1) << "Decoder error while receiving";
-     socket_manager->close(socket);
-     delete[] data;
-     delete decoder;
-     return;
-  }
+        if (requests.empty() && decoder->failed()) {
+          return Failure("Decoder error");
+        }
 
-  if (!requests.empty()) {
-    // Get the peer address to augment the requests.
-    Try<Address> address = socket.peer();
+        if (!requests.empty()) {
+          // Get the peer address to augment the requests.
+          Try<Address> address = socket.peer();
 
-    if (address.isError()) {
-      VLOG(1) << "Failed to get peer address while receiving: "
-              << address.error();
-      socket_manager->close(socket);
-      delete[] data;
-      delete decoder;
-      return;
-    }
+          if (address.isError()) {
+            return Failure("Failed to get peer address: " + address.error());
+          }
+
+          foreach (Request* request, requests) {
+            request->client = address.get();
+            process_manager->handle(socket, request);
+          }
+        }
 
-    foreach (Request* request, requests) {
-      request->client = address.get();
-      process_manager->handle(socket, request);
+        return Continue();
+      });
+
+  recv_loop.onAny([=](const Future<Nothing> f) {
+    if (f.isFailed()) {
+      Try<Address> peer = socket.peer();
+
+      VLOG(1) << "Failure while receiving from peer '"
+              << (peer.isSome() ? stringify(peer.get()) : "unknown")
+              << "': " << f.failure();
     }
-  }
 
-  socket.recv(data, size)
-    .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, 
decoder));
+    socket_manager->close(socket);
+    delete[] data;
+    delete decoder;
+  });
 }
 
 } // namespace internal {
@@ -909,19 +906,8 @@ void on_accept(const Future<Socket>& socket)
     // Inform the socket manager for proper bookkeeping.
     socket_manager->accepted(socket.get());
 
-    const size_t size = 80 * 1024;
-    char* data = new char[size];
-
-    StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
-
-    socket->recv(data, size)
-      .onAny(lambda::bind(
-          &internal::decode_recv,
-          lambda::_1,
-          data,
-          size,
-          socket.get(),
-          decoder));
+    // Start the receive loop for the socket.
+    receive(socket.get());
   }
 
   // NOTE: `__s__` may be cleaned up during `process::finalize`.

Reply via email to