[
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577675#comment-17577675
]
Mason Chen commented on FLINK-28060:
------------------------------------
[~showuon] Earlier, I had attempted to write an integration test with
testcontainers in an attempt to reproduce the issue by following the steps in
the ticket details. I compared the versions kafka-clients 2.8.1 and 3.1.1.
My test is as follows:
# Start Kafka container and create topic
# Create reader and assign the topics to underlying consumer
# Fetch and invoke poll()
# Call commitAsync()
# Restart Kafka container
# Call commitAsync()
# Call commitAsync()
For 2.8.1, there are 2 failures in the commit async call in step 6 and 7. For
3.1.1, there is only 1 failure at step 6.
3.1.1 seems to be the desired behavior. Some concerns:
# However, unusually, step 4 will fail if the test doesn't invoke poll. I
think it is possible for Flink to have a race condition where commitAsync is
executed before poll (short checkpoint interval causing commitAsync before poll
if topic partitions take long to assign). Is this behavior intended?
# Otherwise, do you have any recommendations for reproducing the issue in a CI
environment where we do not assume poll() is invoked?
> Kafka Commit on checkpointing fails repeatedly after a broker restart
> ---------------------------------------------------------------------
>
> Key: FLINK-28060
> URL: https://issues.apache.org/jira/browse/FLINK-28060
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Connectors / Kafka
> Affects Versions: 1.15.0
> Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
> Reporter: Christian Lorenz
> Priority: Major
> Labels: pull-request-available
> Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an
> error might occur if one Kafka broker is shutdown which might be the leader
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the
> next checkpoint issued by flink should commit the meanwhile processed offsets
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0
> anymore and the offset committing is broken. An warning like the following
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN [Source Data Fetcher for Source: input-kafka-source
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader -
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException:
> Offset commit failed with a retriable exception. You should retry committing
> the latest consumed offsets.
> [info] Caused by:
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program. To execute this
> java8, scala sbt and docker / docker-compose is required. Also see readme.md
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by
> `docker-compose up`. If then the kafka brokers are restarted gracefully by
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be
> committed into kafka.
> This is not reproducible in flink 1.14.4.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)