[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17782609#comment-17782609
 ] 

Jasmin Redzepovic commented on FLINK-33293:
-------------------------------------------

Not before, but did that test now.

*flinkVersion: 1.17.0*

connectorVersion: 1.17.0 and 3.0.0-1.17 (did 2 tests, one for each connector 
version, just to be sure :D)

 

{*}Results for connectorVersion 1.17.0{*}: It only required one job run to 
process all messages from Kafka topic. There were some Kafka TimeoutExceptions, 
but no restart was required - job managed to recover from it and managed to 
write checkpoints successfully. After all messages were processed, consumer lag 
on topic was 0, but after examining processed data in database I detected data 
loss.

 

{*}Results for connectorVersion 3.0.0-1.17{*}: Did the same test as on 19/Oct 
with 2 job runs. All messages were processed, consumer lag on topic was 0, but 
after examining processed data in database I detected data loss.

> Data loss with Kafka Sink
> -------------------------
>
>                 Key: FLINK-33293
>                 URL: https://issues.apache.org/jira/browse/FLINK-33293
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1, 1.16.2, 1.17.1
>            Reporter: Jasmin Redzepovic
>            Priority: Major
>         Attachments: job.log, job_1_16_2_run1.log, job_1_16_2_run2.log, 
> job_1_17_1_run1.log, job_1_17_1_run2.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to