Github user tony810430 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2916#discussion_r90420632
--- Diff:
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
@@ -230,13 +233,34 @@ public GetShardListResult getShardList(Map<String,
String> streamNamesWithLastSe
*/
@Override
public String getShardIterator(KinesisStreamShard shard, String
shardIteratorType, @Nullable String startingSeqNum) throws InterruptedException
{
+ GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
+ .withStreamName(shard.getStreamName())
+ .withShardId(shard.getShard().getShardId())
+ .withShardIteratorType(shardIteratorType)
+ .withStartingSequenceNumber(startingSeqNum);
+ return getShardIterator(getShardIteratorRequest);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getShardIterator(KinesisStreamShard shard, String
shardIteratorType, @Nonnull final Date startingTimestamp) throws
InterruptedException {
+ GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
+ .withStreamName(shard.getStreamName())
+ .withShardId(shard.getShard().getShardId())
+ .withShardIteratorType(shardIteratorType)
+ .withTimestamp(startingTimestamp);
--- End diff --
I will check the shard iterator type in the new method by merging these two
'getShardIterator' methods.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---