[ 
https://issues.apache.org/jira/browse/FLINK-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948783#comment-15948783
 ] 

ASF GitHub Bot commented on FLINK-5625:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3651#discussion_r108886633
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
    @@ -216,14 +217,73 @@ public void 
testUnixTimestampInValidateOptionDatePropertyForInitialTimestampInCo
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
                
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
unixTimestamp);
     
    -           KinesisConfigUtil.validateConsumerConfiguration(testConfig);
    +           try {
    +                   
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
     
    -           try{
                        double value = Double.parseDouble(unixTimestamp);
                        if (value < 0) {
                                throw new NumberFormatException();
                        }
    -           } catch (Exception e){
    +           } catch (Exception e) {
    +                   e.printStackTrace();
    +                   fail();
    +           }
    +   }
    +
    +   @Test
    +   public void testInvalidPatternForInitialTimestampInConfig() {
    +           exception.expect(IllegalArgumentException.class);
    +           exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
    +
    +           Properties testConfig = new Properties();
    +           testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"2016-03-14");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
"InvalidPattern");
    +
    +           KinesisConfigUtil.validateConsumerConfiguration(testConfig);
    +   }
    +
    +   @Test
    +   public void 
testUnparsableDateForUserDefinedDateFormatForInitialTimestampInConfig() {
    +           exception.expect(IllegalArgumentException.class);
    +           exception.expectMessage("Invalid value given for initial 
timestamp for AT_TIMESTAMP initial position in stream.");
    +
    +           Properties testConfig = new Properties();
    +           testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
"stillUnparsable");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
"yyyy-MM-dd");
    +
    +           KinesisConfigUtil.validateConsumerConfiguration(testConfig);
    +   }
    +
    +   @Test
    +   public void 
testDateStringForUserDefinedDateFormatForValidateOptionDateProperty() {
    +           String unixTimestamp = "2016-04-04";
    +           String pattern = "yyyy-MM-dd";
    +
    +           Properties testConfig = new Properties();
    +           testConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, 
"BASIC");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKeyId");
    +           
testConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"secretKey");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
"AT_TIMESTAMP");
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, 
unixTimestamp);
    +           
testConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, 
pattern);
    +
    +           try {
    +                   
KinesisConfigUtil.validateConsumerConfiguration(testConfig);
    +
    +                   SimpleDateFormat customDateFormat = new 
SimpleDateFormat(pattern);
    +                   customDateFormat.parse(unixTimestamp);
    --- End diff --
    
    Are these 2 lines necessary?:
    ```
    SimpleDateFormat customDateFormat = new SimpleDateFormat(pattern);
    customDateFormat.parse(unixTimestamp);
    ```


> Let Date format for timestamp-based start position in Kinesis consumer be 
> configurable.
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-5625
>                 URL: https://issues.apache.org/jira/browse/FLINK-5625
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> Currently, the Kinesis consumer's Date format for timestamp-based start 
> positions is fixed. It'll be nice to make this format configurable.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to