[ 
https://issues.apache.org/jira/browse/FLINK-4727?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15570678#comment-15570678
 ] 

ASF GitHub Bot commented on FLINK-4727:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2585#discussion_r83138654
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 ---
    @@ -204,7 +204,22 @@ public void run() {
                        // seek the consumer to the initial offsets
                        for (KafkaTopicPartitionState<TopicPartition> partition 
: subscribedPartitions()) {
                                if (partition.isOffsetDefined()) {
    +                                   LOG.info("Partition {} has restored 
initial offsets {} from checkpoint / savepoint; seeking the consumer " +
    +                                           "to position {}", 
partition.getKafkaPartitionHandle(), partition.getOffset(), 
partition.getOffset() + 1);
    +
                                        
consumer.seek(partition.getKafkaPartitionHandle(), partition.getOffset() + 1);
    +                           } else {
    +                                   // for partitions that do not have 
offsets restored from a checkpoint/savepoint,
    +                                   // we need to define our internal 
offset state for them using the initial offsets retrieved from Kafka
    +                                   // by the KafkaConsumer, so that they 
are correctly checkpointed and committed on the next checkpoint
    +
    +                                   long fetchedOffset = 
consumer.position(partition.getKafkaPartitionHandle());
    --- End diff --
    
    I just confirmed this with a simple test.
    Yes, when no commit offset is available for the group id, the 
"auto.offset.reset" behavior is used.
    The "position" basically means which offset the KafkaConsumer will start 
reading from. It automatically determines the position once it is assigned 
partitions, either from committed offsets or the "auto.offset.reset" if none 
exists.


> Kafka 0.9 Consumer should also checkpoint auto retrieved offsets even when no 
> data is read
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4727
>                 URL: https://issues.apache.org/jira/browse/FLINK-4727
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0, 1.1.3
>
>
> This is basically the 0.9 version counterpart for FLINK-3440.
> When the 0.9 consumer fetches initial offsets from Kafka on startup, but does 
> not have any data to read, it should also checkpoint & commit these initial 
> offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to