[ 
https://issues.apache.org/jira/browse/MESOS-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099281#comment-16099281
 ] 

Alexander Rukletsov commented on MESOS-7748:
--------------------------------------------

I've played a bit with the issue and possible solutions. The main question is 
which queue overflow we track: TCP buffer, outgoing socket queue in libprocess, 
or {{http::Pipe}}'s in the streaming processing.

I will be referring {{socket_manager->queue_size()}} function below, which is 
defined as
{code}
size_t SocketManager::queue_size(const Socket& socket)
{
  synchronized (mutex) {
    return outgoing.count(socket) ? outgoing[socket].size() : 0;
  }
}
{code}
h4. Close connection if {{socket.send()}} does not succeed after a timeout.
This means the TCP buffer is full, hence {{select()}} on the socket [gates the 
send 
operation|https://github.com/apache/mesos/blob/9a45267c51d7fbc68947d6721623b8004b912ddc/3rdparty/libprocess/src/poll_socket.cpp#L271].
 While this solution is not specific to the streaming API, it protects against 
unresponsive consumers only: a slow consumer can read data from the buffer in a 
way that the timeout is not triggered, yet the data still pile up.
{code:title=process.cpp, ~L2100}
namespace internal {
...
void send(Encoder* encoder, Socket socket)
{
  // If send() hangs, close the connection.
  auto timedout = [](Future<size_t> future) -> Future<size_t> {
    future.discard();
    return Failure("Socket buffer is full");
  };

  switch (encoder->kind()) {
    case Encoder::DATA: {
      size_t size;
      const char* data = static_cast<DataEncoder*>(encoder)->next(&size);
      socket.send(data, size)
        .after(Seconds(5), timedout) // <--
        .onAny(lambda::bind(
            &internal::_send,
            lambda::_1,
            socket,
            encoder,
            size));
      break;
    }
  ...
  }
}
...
} // namespace internal
{code}
h4. Close connection if the socket queue overflows.
If {{queue_size(some_socket)}} grows beyond a certain threshold, close the 
socket. To be nicer for slow readers, we might send an {{599}}? HTTP status 
code and _schedule_ socket close after a certain timeout. For streaming 
connections, there is probably not so much value in keeping the connection 
because the client will anyway have to resubscribe to catch up.
{code:title=process.cpp, ~L1670}
void HttpProxy::stream(
    const Owned<Request>& request,
    const Future<string>& chunk)
{
  CHECK_SOME(pipe);
  CHECK_NOTNULL(request.get());

  http::Pipe::Reader reader = pipe.get();

  bool finished = false; // Whether we're done streaming.

  // If the socket queue overflows, close the connection.
  if (socket_manager->queue_size(socket) > 1000) {
    socket_manager->close(socket);
    finished = true;
  } else if (chunk.isReady()) {
    std::ostringstream out;
  ...
{code}
h4. Stop reading data from the pipe if the socket queue non-empty.
If the socket queue becomes non-empty (or grows beyond a certain threshold), 
stop reading data from the {{http::Pipe}}. This way, we delegate the overflow 
problem to {{HttpConnection}} and {{Master}}, which for example, allows to 
expose queue size as a metric. This approach will require introducing a 
signalling mechanism for socket queues when they become empty.
{code:title=process.cpp, ~L1670}
void HttpProxy::stream(
    const Owned<Request>& request,
    const Future<string>& chunk)
{
  CHECK_SOME(pipe);
  CHECK_NOTNULL(request.get());

  http::Pipe::Reader reader = pipe.get();

  bool finished = false; // Whether we're done streaming.

  // NOTE: Instead of `delay`, hook on queue empty event.
  if (socket_manager->queue_size(socket) > 1) {
    delay(Seconds(1), self(), &Self::stream, request, chunk);
    return;
  }

  if (chunk.isReady()) {
    std::ostringstream out;
  ...
{code}
{code:title=process/http.hpp, ~L300}
class Pipe {
...
  class Writer
  {
    ...
    std::size_t size() const
    {
      return data->writes.size();
    }
    ...
  }
}
{code}
{code:title=master.hpp, ~L280}
struct HttpConnection
{
...
template <typename Message, typename Event = v1::scheduler::Event>
  bool send(const Message& message)
  {
    // Does it make sense to skip messages?
    // How is the subscriber supposed to catch up?
    if (writer.size() > 1000) {
      return false;
    }

    ::recordio::Encoder<Event> encoder (lambda::bind(
        serialize, contentType, lambda::_1));

    return writer.write(encoder.encode(evolve(message)));
  }
...
}
{code}

> Slow subscribers of streaming APIs can lead to Mesos OOMing.
> ------------------------------------------------------------
>
>                 Key: MESOS-7748
>                 URL: https://issues.apache.org/jira/browse/MESOS-7748
>             Project: Mesos
>          Issue Type: Bug
>            Reporter: Alexander Rukletsov
>            Assignee: Alexander Rukletsov
>            Priority: Critical
>              Labels: mesosphere, reliability
>
> For each active subscriber, Mesos master / slave maintains an event queue, 
> which grows over time if the subscriber does not read fast enough. As the 
> number of such "slow" subscribers grows, so does Mesos master / slave memory 
> consumption, which might lead to an OOM event.
> Ideas to consider:
> * Restrict the number of subscribers for the streaming APIs
> * Check (ping) for inactive or "slow" subscribers
> * Disconnect the subscriber when there are too many queued events in memory



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to