Kyle R Stehbens created KAFKA-13840:
---------------------------------------
Summary: KafkaConsumer is unable to recover connection to group
coordinator after commitOffsetsAsync exception
Key: KAFKA-13840
URL: https://issues.apache.org/jira/browse/KAFKA-13840
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Affects Versions: 3.0.0, 2.8.1, 2.7.2, 3.1.0, 2.6.1
Reporter: Kyle R Stehbens
Hi, I've discovered an issue with the java Kafka client (consumer) whereby a
timeout or any other retry-able exception triggered during an async offset
commit, renders the client unable to recover its group co-coordinator and
leaves the client in a broken state.
I first encountered this using v2.8.1 of the java client, and after going
through the code base for all versions of the client, have found it affects all
versions of the client from 2.6.1 onward.
I also confirmed that by rolling back to 2.5.1, the issue is not present.
The issue stems from changes to how the FindCoordinatorResponseHandler in 2.5.1
used to call clearFindCoordinatorFuture(); on both success and failure here:
[https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
In all future version of the client this call is not made:
[https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
What this results in, is when the KafkaConsumer makes a call to
coordinator.commitOffsetsAsync(...), if an error occures such that the
co-ordinator is unavaialabe here:
[https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
then the client will try call:
[https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
However this will never be able to success as it perpetually returns a
reference to a failed future: findCoordinatorFuture that is never cleared out.
This manifests in all future calls to commitOffsetsAsync() throws a
"coordinator unavailable" exception forever going forward after any re-tryable
exception causes the coordinator to close.
Note we discovered this when we upgraded the kafka client in our Flink
consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the client.
We noticed this occurring in our non-flink java consumers too running 3.x
client versions.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)