Itamar Benjamin created KAFKA-10114: ---------------------------------------
Summary: Kafka producer stuck after broker crash Key: KAFKA-10114 URL: https://issues.apache.org/jira/browse/KAFKA-10114 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.4.1, 2.3.1 Reporter: Itamar Benjamin Today two of our kafka brokers crashed (cluster of 3 brokers), and producers were not able to send new messages. After brokers started again all producers resumed sending data except for a single one. at the beginning producer rejected all new messages with TimeoutException: {code:java} org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for incoming-mutable-RuntimeIIL-1:120000 ms has passed since batch creation {code} then after sometime exception changed to {code:java} org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms. {code} jstack shows kafka-producer-network-thread is waiting to get producer id: {code:java} "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 cpu=63594017.16ms elapsed=1511219.38s tid=0x00007fffd8353000 nid=0x4fa4 sleeping [0x00007ff55c177000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(java.base@11.0.1/Native Method) at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) at org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked ownable synchronizers: - None {code} digging into maybeWaitForProducerId(), it waits until some broker is ready (awaitNodeReady function) which in return calls leastLoadedNode() on NetworkClient. This one iterates over all brokers and checks if a request can be sent to it using canSendRequest(). This is the code for canSendRequest(): {code:java} return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node) {code} using some debugging tools i saw this expression always evaluates to false since the last part (canSendMore) is false. This is the code for canSendMore: {code:java} public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); } {code} i verified {code:java} queue.peekFirst().send.completed() {code} is true, and that leads to the live lock - since requests queues are full for all nodes a new request to check broker availability and reconnect to it cannot be submitted. -- This message was sent by Atlassian Jira (v8.3.4#803005)