[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776361#comment-17776361
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-33293 at 10/17/23 7:30 PM:
-----------------------------------------------------------------------

[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

Prior to FLINK-31305, the KafkaSink was not correctly checking that the flush 
was fully successful before acknowledging checkpoint complete.

If you did not test against those versions, could you try that and report back?


was (Author: tzulitai):
[~jredzepovic] was this actually reproduced with versions 1.16.2 and 1.17.1? I 
see the ticket tagged with these versions, but in the linked Slack thread it 
seems like it was only reproduced with version 1.16.1.

I'm asking because, at a first glance, checkpoints incorrectly succeeding in 
the case of flush failures seems to be caused by FLINK-31305, which is fixed 
via Flink Kafka connector versions 1.16.2, 1.17.1, and 3.0.0-1.17 (the 
externalized Kafka connector).

If you did not test against those versions, could you try that and report back?

> Data loss with Kafka Sink
> -------------------------
>
>                 Key: FLINK-33293
>                 URL: https://issues.apache.org/jira/browse/FLINK-33293
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1, 1.16.2, 1.17.1
>            Reporter: Jasmin Redzepovic
>            Priority: Major
>         Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to