Graham Campbell created KAFKA-6529:
--------------------------------------

             Summary: Broker leaks memory and file descriptors after sudden 
client disconnects
                 Key: KAFKA-6529
                 URL: https://issues.apache.org/jira/browse/KAFKA-6529
             Project: Kafka
          Issue Type: Bug
          Components: network
    Affects Versions: 0.11.0.2, 1.0.0
            Reporter: Graham Campbell


If a producer forcefully disconnects from a broker while it has staged 
receives, that connection enters a limbo state where it is no longer processed 
by the SocketServer.Processor, leaking the file descriptor for the socket and 
the memory used for the staged recieve queue for that connection.

We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
the rolling restart to upgrade, open file descriptors on the brokers started 
climbing uncontrollably. In a few cases brokers reached our configured max open 
files limit of 100k and crashed before we rolled back.

We tracked this down to a buildup of muted connections in the 
Selector.closingChannels list. If a client disconnects from the broker with 
multiple pending produce requests, when the broker attempts to send an ack to 
the client it recieves an IOException because the TCP socket has been closed. 
This triggers the Selector to close the channel, but because it still has 
pending requests, it adds it to Selector.closingChannels to process those 
requests. However, because that exception was triggered by trying to send a 
response, the SocketServer.Processor has marked the channel as muted and will 
no longer process it at all.

*Reproduced by:*
Starting a Kafka broker/cluster
Client produces several messages and then disconnects abruptly (eg. 
_./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
Broker then leaks file descriptor previously used for TCP socket and memory for 
unprocessed messages

*Proposed solution (which we've implemented internally)*
Whenever an exception is encountered when writing to a socket in 
Selector.pollSelectionKeys(...) record that that connection failed a send by 
adding the KafkaChannel ID to Selector.failedSends. Then re-raise the exception 
to still trigger the socket disconnection logic. Since every exception raised 
in this function triggers a disconnect, we also treat any exception while 
writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to