[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015323#comment-16015323
 ] 

ASF GitHub Bot commented on FLINK-6352:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3915#discussion_r117175199
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
    @@ -187,31 +191,65 @@ public FlinkKafkaConsumer08(List<String> topics, 
KeyedDeserializationSchema<T> d
                validateAutoOffsetResetValue(props);
        }
     
    +   /**
    +    * Search offset from timestamp for each topic in kafka. If no offset 
exist, use the latest offset.
    +    *
    +    * @param partitionTimesMap Kafka topic partition and timestamp
    +    * @return Kafka topic partition and the earliest offset after the 
timestamp. If no offset exist, use the latest offset in kafka
    +    */
    +   private Map<KafkaTopicPartition, Long> 
convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
    --- End diff --
    
    Actually I think lets just disable the timestamp option for 0.8.
    
    I just think its a bit strange that the functionality is there for 0.8 and 
0.10, but skipped for 0.10.
    
    Sorry for jumping back and forth here, trying to figure out what would be 
most natural.


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -------------------------------------------------------------------------
>
>                 Key: FLINK-6352
>                 URL: https://issues.apache.org/jira/browse/FLINK-6352
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Fang Yong
>            Assignee: Fang Yong
>             Fix For: 1.3.0
>
>
>     Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
>     So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition    
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to