[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554949#comment-17554949
 ] 

Qingsheng Ren commented on FLINK-28060:
---------------------------------------

Personally I prefer to upgrade the Kafka client. Downgrading the Kafka client 
could introduce new bugs (like the one fixed in KAFKA-10793). I think the 
backward compatibility of Kafka is trustable according to our previous 
experience. On consumer side we don't use any hacks in Kafka source, purely 
depend on APIs so I think it should be fine, but for the producer we use 
[reflection|https://github.com/apache/flink/blob/c9a706b8388b324a37da43298e37074d0a452a34/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L331-L343]
 which is concerning. I can have a try to bump the version and see if it works. 

Any ideas from [~becket_qin] ?

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> ---------------------------------------------------------------------
>
>                 Key: FLINK-28060
>                 URL: https://issues.apache.org/jira/browse/FLINK-28060
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Connectors / Kafka
>    Affects Versions: 1.15.0
>         Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>            Reporter: Christian Lorenz
>            Priority: Major
>         Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to