[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711497#comment-15711497
 ] 

ASF GitHub Bot commented on FLINK-4523:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2916#discussion_r90410957
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
    @@ -57,6 +60,17 @@ public static void 
validateConsumerConfiguration(Properties config) {
                                }
                                throw new IllegalArgumentException("Invalid 
initial position in stream set in config. Valid values are: " + sb.toString());
                        }
    +
    +                   // specified initial timestamp in stream when using 
AT_TIMESTAMP
    +                   if (InitialPosition.valueOf(initPosType) == 
InitialPosition.AT_TIMESTAMP) {
    +                           if 
(!config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP)) {
    +                                   throw new 
IllegalArgumentException("Please set value for initial timestamp ('"
    +                                           + 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + "') when using AT_TIMESTAMP 
initial position.");
    +                           }
    +                           validateOptionalDateProperty(config, 
ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP,
    +                                   "Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
    +                                           + "Must be a valid format: 
yyyy-MM-dd'T'HH:mm:ss.SSSXXX or non-negative double value. For example, 
2016-04-04T19:58:46.480-00:00 or 1459799926.480 .");
    --- End diff --
    
    That's really good error messaging for the user :) Like I mentioned above, 
we need to inform this in the document as well.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --------------------------------------------------------------
>
>                 Key: FLINK-4523
>                 URL: https://issues.apache.org/jira/browse/FLINK-4523
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>             Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to