[
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807401#comment-17807401
]
Yang LI commented on FLINK-33545:
---------------------------------
Hello [~aeolus811tw] [~mason6345] , Just find this ticket, do you think if
there will be a fix about this?
I'll need to activate kafkaProducer property "batch.size" , "linger.ms" and
"compression.type" for batching and compression and we use at_least_once
semantic and kafka ack "1". I imagine I am in the scenario described by
[~aeolus811tw]
> KafkaSink implementation can cause dataloss during broker issue when not
> using EXACTLY_ONCE if there's any batching
> -------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.18.0
> Reporter: Kevin Tseng
> Assignee: Kevin Tseng
> Priority: Major
> Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some
> assumption that were made:
> # KafkaSource completely relies on Checkpoint to manage and track its offset
> in *KafkaSourceReader<T>* class
> # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and
> everything it had read up-til checkpoint being initiated will be processed or
> recorded by operators downstream, including the TwoPhaseCommiter such as
> *KafkaSink*
> *KafkaSink* goes by the model of:
>
> {code:java}
> flush -> prepareCommit -> commit{code}
>
> In a scenario that:
> * KafkaSource ingested records #1 to #100
> * KafkaSink only had chance to send records #1 to #96
> * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer
> to not be able to send out the record after a batch, and is on a constant
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE),
> *WriterCallback* error handling will never be triggered until the next
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following
> code:
> {code:java}
> Properties props = new Properties();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class.getName());
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE);
> props.put(ProducerConfig.ACKS_CONFIG, "all");
> final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord<String, String> record = new
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(10000); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart
> from previously saved checkpoint (which recorded reading up to record #100),
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never
> invoked (due to broker issue) right after the first flush has taken place,
> those records are effectively gone unless someone decided to go back and look
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to
> Transactional KafkaProducer to be committed. And a catch up flush will take
> place during *commit* step. Whether this was intentional or not, due to the
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the
> end of EXACTLY_ONCE actually ensured everything fenced in the current
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>
> Due the above finding, I'm recommending one of the following fixes:
> # need to perform second flush for AT_LEAST_ONCE
> # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do
> checkpoint, it should be right before checkpoint completes then we flush,
> given that's what commit is meant to do.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)