Tzu-Li (Gordon) Tai created FLINK-5701:
------------------------------------------
Summary: FlinkKafkaPrdocuer violates at-least-once by not handling
failed records
Key: FLINK-5701
URL: https://issues.apache.org/jira/browse/FLINK-5701
Project: Flink
Issue Type: Bug
Components: Kafka Connector, Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai
Reported in ML:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
The problem:
The producer holds a {{pendingRecords}} value that is incremented on each
invoke() and decremented on each callback, used to check if the producer needs
to sync on pending callbacks on checkpoints.
On each checkpoint, we should only consider the checkpoint succeeded iff after
flushing the {{pendingRecords == 0}} and {{asyncException == null}} (currently,
we’re only checking {{pendingRecords}}).
Generally, to fix this, we need to handle exceptions in the callback and re-add
the original record back into the producer. I think the {{onComplete}} method
is called after the KafkaProducer internally finishes all retry attempts and is
removed from the buffer, so if we don’t do anything with the exception other
than just logging it, the message will be lost.
Two additional things we need to address in order to solve this:
1. {{FlinkKafkaProducer}} needs to keep a map of callback to their
corresponding original record.
2. We need to determine what async exceptions to actually re-add to the
FlinkKafkaProducer. We simply cannot re-add for every exception, otherwise
errors that simply cannot be resolved by retrying will hang the checkpoint
flush process forever, and it'll be unclear to the user why the checkpoint is
taking so long. The ElasticsearchSink has similar issues (FLINK-5353 and
FLINK-5122). The proposed approach for this, instead of determining which async
exceptions to retry case by case, is to let the user provide async failure
handlers and let them implement logic on which exceptions to handle / re-add.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)