[ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711498#comment-15711498
 ] 

ASF GitHub Bot commented on FLINK-4523:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2916#discussion_r90410520
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, 
String> streamNamesWithLastSe
         */
        @Override
        public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException 
{
    +           GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
    +                   .withStreamName(shard.getStreamName())
    +                   .withShardId(shard.getShard().getShardId())
    +                   .withShardIteratorType(shardIteratorType)
    +                   .withStartingSequenceNumber(startingSeqNum);
    +           return getShardIterator(getShardIteratorRequest);
    +   }
    +
    +   /**
    +    * {@inheritDoc}
    +    */
    +   @Override
    +   public String getShardIterator(KinesisStreamShard shard, String 
shardIteratorType, @Nonnull final Date startingTimestamp) throws 
InterruptedException {
    +           GetShardIteratorRequest getShardIteratorRequest = new 
GetShardIteratorRequest()
    +                   .withStreamName(shard.getStreamName())
    +                   .withShardId(shard.getShard().getShardId())
    +                   .withShardIteratorType(shardIteratorType)
    +                   .withTimestamp(startingTimestamp);
    --- End diff --
    
    How does the Kinesis API behave, when you set both a non-matching shard 
iterator type, and a timestamp?
    In other words, what happens when the shard iterator type is perhaps 
`TRIM_HORIZON`, but a timestamp is also provided?
    
    If the API call will fail with Kinesis doesn't allow such combinations, we 
probably should make this method implementation fail-proof of such situations.


> Allow Kinesis Consumer to start from specific timestamp / Date
> --------------------------------------------------------------
>
>                 Key: FLINK-4523
>                 URL: https://issues.apache.org/jira/browse/FLINK-4523
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>             Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to