[
https://issues.apache.org/jira/browse/MESOS-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099281#comment-16099281
]
Alexander Rukletsov commented on MESOS-7748:
--------------------------------------------
I've played a bit with the issue and possible solutions. The main question is
which queue overflow we track: TCP buffer, outgoing socket queue in libprocess,
or {{http::Pipe}}'s in the streaming processing.
I will be referring {{socket_manager->queue_size()}} function below, which is
defined as
{code}
size_t SocketManager::queue_size(const Socket& socket)
{
synchronized (mutex) {
return outgoing.count(socket) ? outgoing[socket].size() : 0;
}
}
{code}
h4. Close connection if {{socket.send()}} does not succeed after a timeout.
This means the TCP buffer is full, hence {{select()}} on the socket [gates the
send
operation|https://github.com/apache/mesos/blob/9a45267c51d7fbc68947d6721623b8004b912ddc/3rdparty/libprocess/src/poll_socket.cpp#L271].
While this solution is not specific to the streaming API, it protects against
unresponsive consumers only: a slow consumer can read data from the buffer in a
way that the timeout is not triggered, yet the data still pile up.
{code:title=process.cpp, ~L2100}
namespace internal {
...
void send(Encoder* encoder, Socket socket)
{
// If send() hangs, close the connection.
auto timedout = [](Future<size_t> future) -> Future<size_t> {
future.discard();
return Failure("Socket buffer is full");
};
switch (encoder->kind()) {
case Encoder::DATA: {
size_t size;
const char* data = static_cast<DataEncoder*>(encoder)->next(&size);
socket.send(data, size)
.after(Seconds(5), timedout) // <--
.onAny(lambda::bind(
&internal::_send,
lambda::_1,
socket,
encoder,
size));
break;
}
...
}
}
...
} // namespace internal
{code}
h4. Close connection if the socket queue overflows.
If {{queue_size(some_socket)}} grows beyond a certain threshold, close the
socket. To be nicer for slow readers, we might send an {{599}}? HTTP status
code and _schedule_ socket close after a certain timeout. For streaming
connections, there is probably not so much value in keeping the connection
because the client will anyway have to resubscribe to catch up.
{code:title=process.cpp, ~L1670}
void HttpProxy::stream(
const Owned<Request>& request,
const Future<string>& chunk)
{
CHECK_SOME(pipe);
CHECK_NOTNULL(request.get());
http::Pipe::Reader reader = pipe.get();
bool finished = false; // Whether we're done streaming.
// If the socket queue overflows, close the connection.
if (socket_manager->queue_size(socket) > 1000) {
socket_manager->close(socket);
finished = true;
} else if (chunk.isReady()) {
std::ostringstream out;
...
{code}
h4. Stop reading data from the pipe if the socket queue non-empty.
If the socket queue becomes non-empty (or grows beyond a certain threshold),
stop reading data from the {{http::Pipe}}. This way, we delegate the overflow
problem to {{HttpConnection}} and {{Master}}, which for example, allows to
expose queue size as a metric. This approach will require introducing a
signalling mechanism for socket queues when they become empty.
{code:title=process.cpp, ~L1670}
void HttpProxy::stream(
const Owned<Request>& request,
const Future<string>& chunk)
{
CHECK_SOME(pipe);
CHECK_NOTNULL(request.get());
http::Pipe::Reader reader = pipe.get();
bool finished = false; // Whether we're done streaming.
// NOTE: Instead of `delay`, hook on queue empty event.
if (socket_manager->queue_size(socket) > 1) {
delay(Seconds(1), self(), &Self::stream, request, chunk);
return;
}
if (chunk.isReady()) {
std::ostringstream out;
...
{code}
{code:title=process/http.hpp, ~L300}
class Pipe {
...
class Writer
{
...
std::size_t size() const
{
return data->writes.size();
}
...
}
}
{code}
{code:title=master.hpp, ~L280}
struct HttpConnection
{
...
template <typename Message, typename Event = v1::scheduler::Event>
bool send(const Message& message)
{
// Does it make sense to skip messages?
// How is the subscriber supposed to catch up?
if (writer.size() > 1000) {
return false;
}
::recordio::Encoder<Event> encoder (lambda::bind(
serialize, contentType, lambda::_1));
return writer.write(encoder.encode(evolve(message)));
}
...
}
{code}
> Slow subscribers of streaming APIs can lead to Mesos OOMing.
> ------------------------------------------------------------
>
> Key: MESOS-7748
> URL: https://issues.apache.org/jira/browse/MESOS-7748
> Project: Mesos
> Issue Type: Bug
> Reporter: Alexander Rukletsov
> Assignee: Alexander Rukletsov
> Priority: Critical
> Labels: mesosphere, reliability
>
> For each active subscriber, Mesos master / slave maintains an event queue,
> which grows over time if the subscriber does not read fast enough. As the
> number of such "slow" subscribers grows, so does Mesos master / slave memory
> consumption, which might lead to an OOM event.
> Ideas to consider:
> * Restrict the number of subscribers for the streaming APIs
> * Check (ping) for inactive or "slow" subscribers
> * Disconnect the subscriber when there are too many queued events in memory
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)