Repository: mesos
Updated Branches:
  refs/heads/1.6.x d565c179b -> f86c9f8ac


Reduced likelihood of a stack overflow in libprocess socket recv path.

Currently, the socket recv path is implemented using an asynchronous
loop with callbacks. Without using `process::loop`, this pattern is
prone to a stack overflow in the case that all asynchronous calls
complete synchronously. This is possible with sockets if the socket
is always ready for reading. The crash has been reported in MESOS-9024,
so the stack overflow has been encountered in practice.

This patch updates the recv path to leverage `process::loop`, which
is supposed to prevent stack overflows in asynchronous loops. However,
it is still possible for `process::loop` to stack overflow due to
MESOS-8852. In practice, I expect that even without MESOS-8852 fixed,
users won't see any stack overflows in the recv path.

Review: https://reviews.apache.org/r/67824


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b01c0a8a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b01c0a8a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b01c0a8a

Branch: refs/heads/1.6.x
Commit: b01c0a8a8be41b5851d0dd51f4595081abd00bd3
Parents: d565c17
Author: Benjamin Mahler <[email protected]>
Authored: Tue Jul 3 16:54:11 2018 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Tue Jul 3 17:34:15 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 106 ++++++++++++++-----------------
 1 file changed, 46 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b01c0a8a/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp 
b/3rdparty/libprocess/src/process.cpp
index ef85966..90eb0c5 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -790,63 +790,60 @@ static Future<MessageEvent*> parse(const Request& request)
 
 namespace internal {
 
-void decode_recv(
-    const Future<size_t>& length,
-    char* data,
-    size_t size,
-    Socket socket,
-    StreamingRequestDecoder* decoder)
+void receive(Socket socket)
 {
-  if (length.isDiscarded() || length.isFailed()) {
-    if (length.isFailed()) {
-      VLOG(1) << "Decode failure: " << length.failure();
-    }
+  StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
 
-    socket_manager->close(socket);
-    delete[] data;
-    delete decoder;
-    return;
-  }
+  const size_t size = 80 * 1024;
+  char* data = new char[size];
 
-  if (length.get() == 0) {
-    socket_manager->close(socket);
-    delete[] data;
-    delete decoder;
-    return;
-  }
+  Future<Nothing> recv_loop = process::loop(
+      None(),
+      [=] {
+        return socket.recv(data, size);
+      },
+      [=](size_t length) -> Future<ControlFlow<Nothing>> {
+        if (length == 0) {
+          return Break(); // EOF.
+        }
 
-  // Decode as much of the data as possible into HTTP requests.
-  const deque<Request*> requests = decoder->decode(data, length.get());
+        // Decode as much of the data as possible into HTTP requests.
+        const deque<Request*> requests = decoder->decode(data, length);
 
-  if (requests.empty() && decoder->failed()) {
-     VLOG(1) << "Decoder error while receiving";
-     socket_manager->close(socket);
-     delete[] data;
-     delete decoder;
-     return;
-  }
+        if (requests.empty() && decoder->failed()) {
+          return Failure("Decoder error");
+        }
 
-  if (!requests.empty()) {
-    // Get the peer address to augment the requests.
-    Try<Address> address = socket.peer();
+        if (!requests.empty()) {
+          // Get the peer address to augment the requests.
+          Try<Address> address = socket.peer();
 
-    if (address.isError()) {
-      VLOG(1) << "Failed to get peer address while receiving: "
-              << address.error();
-      socket_manager->close(socket);
-      delete[] data;
-      delete decoder;
-      return;
-    }
+          if (address.isError()) {
+            return Failure("Failed to get peer address: " + address.error());
+          }
+
+          foreach (Request* request, requests) {
+            request->client = address.get();
+            process_manager->handle(socket, request);
+          }
+        }
 
-    foreach (Request* request, requests) {
-      request->client = address.get();
-      process_manager->handle(socket, request);
+        return Continue();
+      });
+
+  recv_loop.onAny([=](const Future<Nothing> f) {
+    if (f.isFailed()) {
+      Try<Address> peer = socket.peer();
+
+      VLOG(1) << "Failure while receiving from peer '"
+              << (peer.isSome() ? stringify(peer.get()) : "unknown")
+              << "': " << f.failure();
     }
-  }
 
-  socket.recv(data, size)
-    .onAny(lambda::bind(&decode_recv, lambda::_1, data, size, socket, 
decoder));
+    socket_manager->close(socket);
+    delete[] data;
+    delete decoder;
+  });
 }
 
 } // namespace internal {
@@ -909,19 +906,8 @@ void on_accept(const Future<Socket>& socket)
     // Inform the socket manager for proper bookkeeping.
     socket_manager->accepted(socket.get());
 
-    const size_t size = 80 * 1024;
-    char* data = new char[size];
-
-    StreamingRequestDecoder* decoder = new StreamingRequestDecoder();
-
-    socket->recv(data, size)
-      .onAny(lambda::bind(
-          &internal::decode_recv,
-          lambda::_1,
-          data,
-          size,
-          socket.get(),
-          decoder));
+    // Start the receive loop for the socket.
+    receive(socket.get());
   }
 
   // NOTE: `__s__` may be cleaned up during `process::finalize`.

Reply via email to