[ 
https://issues.apache.org/jira/browse/FLINK-28060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577679#comment-17577679
 ] 

Luke Chen edited comment on FLINK-28060 at 8/10/22 12:17 AM:
-------------------------------------------------------------

[~mason6345] , yes, Kafka v3.1.1 assumes consumer poll ran before commitAsync 
call. But in Kafka v3.2.1, this assumption is removed. So, in v3.2.1, even if 
commitAsync calls earlier than poll, it'll work well.

So, for your question:

1. However, unusually, step 4 will fail if the test doesn't invoke poll 
regardless of the kafka clients version. I think it is possible for Flink to 
have a race condition where commitAsync is executed before poll (short 
checkpoint interval causing commitAsync before poll if topic partitions take 
long to assign). Is this behavior intended?

--> Yes, Kafka should not assume poll calls first or commitAsync calls first. 
That's a bug fixed in Kafka v3.2.1

2. Otherwise, do you have any recommendations for reproducing the issue in a CI 
environment where we do not assume poll() is invoked?

--> I think it's good. I also use the reproducer in this ticket with Kafka 
v3.2.1, and also investigate Kafka client logs, I confirmed it works well even 
if commitAsync calls earlier than poll.

 

Thanks.


was (Author: showuon):
[~mason6345] , yes, Kafka v3.1.1 expects(assumes) consumer poll ran before 
commitAsync call. But in Kafka v3.2.1, this assumption is removed. So, in 
v3.2.1, even if commitAsync calls earlier than poll, it'll work well.

So, for your question:

1. However, unusually, step 4 will fail if the test doesn't invoke poll 
regardless of the kafka clients version. I think it is possible for Flink to 
have a race condition where commitAsync is executed before poll (short 
checkpoint interval causing commitAsync before poll if topic partitions take 
long to assign). Is this behavior intended?

--> Yes, Kafka should not assume poll calls first or commitAsync calls first. 
That's a bug fixed in Kafka v3.2.1

2. Otherwise, do you have any recommendations for reproducing the issue in a CI 
environment where we do not assume poll() is invoked?

--> I think it's good. I also use the reproducer in this ticket with Kafka 
v3.2.1, and also investigate Kafka client logs, I confirmed it works well even 
if commitAsync calls earlier than poll.

 

Thanks.

> Kafka Commit on checkpointing fails repeatedly after a broker restart
> ---------------------------------------------------------------------
>
>                 Key: FLINK-28060
>                 URL: https://issues.apache.org/jira/browse/FLINK-28060
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, Connectors / Kafka
>    Affects Versions: 1.15.0
>         Environment: Reproduced on MacOS and Linux.
> Using java 8, Flink 1.15.0, Kafka 2.8.1.
>            Reporter: Christian Lorenz
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink-kafka-testjob.zip
>
>
> When Kafka Offset committing is enabled and done on Flinks checkpointing, an 
> error might occur if one Kafka broker is shutdown which might be the leader 
> of that partition in Kafkas internal __consumer_offsets topic.
> This is an expected behaviour. But once the broker is started up again, the 
> next checkpoint issued by flink should commit the meanwhile processed offsets 
> back to kafka. Somehow this does not seem to happen always in Flink 1.15.0 
> anymore and the offset committing is broken. An warning like the following 
> will be logged on each checkpoint:
> {code}
> [info] 14:33:13.684 WARN  [Source Data Fetcher for Source: input-kafka-source 
> -> Sink: output-stdout-sink (1/1)#1] o.a.f.c.k.s.reader.KafkaSourceReader - 
> Failed to commit consumer offsets for checkpoint 35
> [info] org.apache.kafka.clients.consumer.RetriableCommitFailedException: 
> Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> [info] Caused by: 
> org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
> coordinator is not available.
> {code}
> To reproduce this I've attached a small flink job program.  To execute this 
> java8, scala sbt and docker / docker-compose is required.  Also see readme.md 
> for more details.
> The job can be run with `sbt run`, kafka cluster is started by 
> `docker-compose up`. If then the kafka brokers are restarted gracefully by 
> e.g. `docker-compose stop kafka1` and `docker-compose start kafka1` with 
> kafka2 and kafka3 afterwards, this warning will occur and no offsets will be 
> committed into kafka.
> This is not reproducible in flink 1.14.4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to