[
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476246#comment-16476246
]
ASF GitHub Bot commented on FLINK-8944:
---------------------------------------
Github user kailashhd commented on a diff in the pull request:
https://github.com/apache/flink/pull/5992#discussion_r188377415
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
@@ -353,19 +355,21 @@ protected static boolean
isRecoverableException(AmazonServiceException ex) {
private List<StreamShardHandle> getShardsOfStream(String streamName,
@Nullable String lastSeenShardId) throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
- DescribeStreamResult describeStreamResult;
+ // List Shards returns just the first 1000 shard entries. In
order to read the entire stream,
+ // we need to use the returned nextToken to get additional
shards.
+ ListShardsResult listShardsResult;
+ String startShardToken = null;
do {
- describeStreamResult = describeStream(streamName,
lastSeenShardId);
-
- List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
+ listShardsResult = listShards(streamName,
lastSeenShardId, startShardToken);
--- End diff --
Thanks for catching this. This is something which I have fixed by clearing
shardsOfStream to ensure we return an empty shardsOfStream in case of
ExpiredTokenException.
I intended the following behavior for this: In case there is an unlikely
case of expired next token, then we will just return an empty ShardsOfStream.
This should be alright since in case there are no new shards discovered, by
default it ends up returning an empty shardsOfStream.
> Use ListShards for shard discovery in the flink kinesis connector
> -----------------------------------------------------------------
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
> Issue Type: Improvement
> Reporter: Kailash Hassan Dayanand
> Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a
> restricted rate limits on AWS. (5 requests per sec per account). This is
> problematic when running multiple flink jobs all on same account since each
> subtasks calls the Describe Stream. Changing this to ListShards will provide
> more flexibility on rate limits as ListShards has a 100 requests per second
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)