[
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473347#comment-16473347
]
ASF GitHub Bot commented on FLINK-8944:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5992#discussion_r187788338
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
---
@@ -353,19 +355,21 @@ protected static boolean
isRecoverableException(AmazonServiceException ex) {
private List<StreamShardHandle> getShardsOfStream(String streamName,
@Nullable String lastSeenShardId) throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();
- DescribeStreamResult describeStreamResult;
+ // List Shards returns just the first 1000 shard entries. In
order to read the entire stream,
+ // we need to use the returned nextToken to get additional
shards.
+ ListShardsResult listShardsResult;
+ String startShardToken = null;
do {
- describeStreamResult = describeStream(streamName,
lastSeenShardId);
-
- List<Shard> shards =
describeStreamResult.getStreamDescription().getShards();
+ listShardsResult = listShards(streamName,
lastSeenShardId, startShardToken);
--- End diff --
if within `listShards(...)` we caught the `ExpiredNextTokenException`, then
`null` will be returned as the result, correct?
If so, then the current built up `shardsIfStream` will be returned
immediately, regardless of whether or not there are more shards following.
Although it might not be too common that we have expired tokens here, I
wonder if we can handle this case more gracefully (e.g., re-fetching a token to
make sure that there really is no more shards).
> Use ListShards for shard discovery in the flink kinesis connector
> -----------------------------------------------------------------
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
> Issue Type: Improvement
> Reporter: Kailash Hassan Dayanand
> Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a
> restricted rate limits on AWS. (5 requests per sec per account). This is
> problematic when running multiple flink jobs all on same account since each
> subtasks calls the Describe Stream. Changing this to ListShards will provide
> more flexibility on rate limits as ListShards has a 100 requests per second
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)