ASF GitHub Bot commented on FLINK-4523:

Github user tzulitai commented on a diff in the pull request:

    --- Diff: 
    @@ -107,6 +112,17 @@ protected ShardConsumer(KinesisDataFetcher<T> 
                this.fetchIntervalMillis = 
    +           if 
    --- End diff --
    I think it might be cleaner if we extract / parse the `initTimestamp` in 
the sentinel sequence number case determination at the beginning of run().
    As for checking if the timestamp is parseable, we should do that when 
validating the user-provided property configs locally at the job client. You 
can take a look at the constructor code of `FlinkKinesisConsumer` - that's 
where we validate the properties. This validation is done at the job client, so 
that if in any case the properties is errorneous / unparseable, we can handle 
that _before_ a Flink job is actually launched.

> Allow Kinesis Consumer to start from specific timestamp / Date
> --------------------------------------------------------------
>                 Key: FLINK-4523
>                 URL: https://issues.apache.org/jira/browse/FLINK-4523
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>             Fix For: 1.2.0
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-

This message was sent by Atlassian JIRA

Reply via email to