[ https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554457#comment-17554457 ]
Qingsheng Ren commented on FLINK-28060: --------------------------------------- I did some investigation and strongly suspect that this is caused by KAFKA-13563. We upgraded the Kafka client from 2.4.1 to 2.8.1 in Flink 1.15, and the bug was introduced in 2.6.2 by KAFKA-10793 :( The patch of KAFKA-13563 is applied only on Kafka 3.1 and 3.2, so we have to bump the version as well, or urge Kafka guys to cherry-pick the patch back on 2.x. WDYT? [~martijnvisser] > Kafka Commit on checkpointing fails repeatedly after a broker restart > --------------------------------------------------------------------- > > Key: FLINK-28060 > URL: https://issues.apache.org/jira/browse/FLINK-28060 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka > Affects Versions: 1.15.0 > Environment: Reproduced on MacOS and Linux. > Using java 8, Flink 1.15.0, Kafka 2.8.1. > Reporter: Christian Lorenz > Priority: Major > Attachments: flink-kafka-testjob.zip > > > When Kafka Offset committing is enabled and done on Flinks checkpointing, an > error might occur if one Kafka broker is shutdown which might be the leader > of that partition in Kafkas internal __consumer_offsets topic. > This is an expected behaviour. But once the broker is started up again, the > next checkpoint issued by flink should commit the meanwhile processed offsets > back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 > anymore and the offset committing is broken. An warning like the following > will be logged on each checkpoint: > {code} > [info] 14:33:13.684 WARN [Source Data Fetcher for Source: input-kafka-source > -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - > Failed to commit consumer offsets for checkpoint 35 > [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: > Offset commit failed with a retriable exception. You should retry committing > the latest consumed offsets. > [info] Caused by: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. > {code} > To reproduce this I've attached a small flink job program. To execute this > java8, scala sbt and docker / docker-compose is required. Also see readme.md > for more details. > The job can be run with `sbt run`, kafka cluster is started by > `docker-compose up`. If then the kafka brokers are restarted gracefully by > e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with > kafka2 and kafka3 afterwards, this warning will occur and no offsets will be > committed into kafka. > This is not reproducible in flink 1.14.4. -- This message was sent by Atlassian Jira (v8.20.7#820007)