[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711503#comment-15711503
 ] 

ASF GitHub Bot commented on FLINK-4523:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2916#discussion_r90411420
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
    @@ -145,6 +145,39 @@ public void 
testUnrecognizableStreamInitPositionTypeInConfig() {
        }
     
        @Test
    +   public void 
testStreamInitPositionTypeSetToAtTimestampButNoInitTimestampSetInConfig() {
    +           exception.expect(IllegalArgumentException.class);
    +           exception.expectMessage("Please set value for initial timestamp 
('"
    +                   + ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP + 
"') when using AT_TIMESTAMP initial position.");
    +
    +           Properties testConfig = new Properties();
    +           testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
    +
    +           KinesisConfigUtil.validateConsumerConfiguration(testConfig);
    +   }
    +
    +   @Test
    +   public void testUnparsableDateForInitialTimestampInConfig() {
    +           exception.expect(IllegalArgumentException.class);
    +           exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream. "
    --- End diff --
    
    I would suggest to not let the expect message be this verbose in tests.
    This increases the likeliness that the tests will need to be altered, 
whenever we want to tweak the messages a bit.
    I think `Invalid value given for initial timestamp for AT_TIMESTAMP initial 
position in stream` is enough.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --------------------------------------------------------------
>
>                 Key: FLINK-4523
>                 URL: https://issues.apache.org/jira/browse/FLINK-4523
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>             Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to