[ https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711498#comment-15711498 ]
ASF GitHub Bot commented on FLINK-4523: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2916#discussion_r90410520 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSe */ @Override public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withStartingSequenceNumber(startingSeqNum); + return getShardIterator(getShardIteratorRequest); + } + + /** + * {@inheritDoc} + */ + @Override + public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, @Nonnull final Date startingTimestamp) throws InterruptedException { + GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest() + .withStreamName(shard.getStreamName()) + .withShardId(shard.getShard().getShardId()) + .withShardIteratorType(shardIteratorType) + .withTimestamp(startingTimestamp); --- End diff -- How does the Kinesis API behave, when you set both a non-matching shard iterator type, and a timestamp? In other words, what happens when the shard iterator type is perhaps `TRIM_HORIZON`, but a timestamp is also provided? If the API call will fail with Kinesis doesn't allow such combinations, we probably should make this method implementation fail-proof of such situations. > Allow Kinesis Consumer to start from specific timestamp / Date > -------------------------------------------------------------- > > Key: FLINK-4523 > URL: https://issues.apache.org/jira/browse/FLINK-4523 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > We had a Kinesis user requesting this feature on an offline chat. > To be specific, we let all initial Kinesis shards be iterated starting from > records at the given timestamp. > The AWS Java SDK we're using already provides API for this, so we can add > this functionality with fairly low overhead: > http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date- -- This message was sent by Atlassian JIRA (v6.3.4#6332)