[ 
https://issues.apache.org/jira/browse/FLINK-3037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570751#comment-15570751
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-3037:
--------------------------------------------

I'm a little bit confused here.
On checkpoints / savepoint restores, I think we should simply overwrite any 
committed offsets fetched from Kafka by using our Flink checkpointed offsets 
for partitions as the correct read position.

In the case where there are some partitions which weren't in the checkpointed 
state before, they are new partitions that were created after the last run, so 
we should start reading them from the earliest record possible.

In any case, I think on checkpoints / savepoints, the "auto.offset.reset" 
behaviour should not be used at all. So, I'm not quite sure of why we'd want to 
make the behaviour controllable.

> Make the behavior of the Kafka consumer configurable if the offsets to 
> restore from are not available
> -----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-3037
>                 URL: https://issues.apache.org/jira/browse/FLINK-3037
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Robert Metzger
>
> Currently, if the {{FlinkKafkaConsumer}} is restoring a checkpoint and the 
> offset is not available anymore in Kafka, its restoring according to 
> {{auto.offset.reset}}.
> This leads to inconsistent behavior (not exactly-once anymore) because the 
> operators will not receive data in sync with the checkpoint.
> With this pull request, I would like to make the behavior controllable, using 
> a flag. The simplest approach would be to let the consumer fail in that case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to