[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776361#comment-17776361
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33293 at 10/17/23 7:29 PM:
-----------------------------------------------------------------------

[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?


was (Author: tzulitai):
[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1?

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?

> Data loss with Kafka Sink
> -------------------------
>
>                 Key: FLINK-33293
>                 URL: https://issues.apache.org/jira/browse/FLINK-33293
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1, 1.16.2, 1.17.1
>            Reporter: Jasmin Redzepovic
>            Priority: Major
>         Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to