[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16473347#comment-16473347
 ] 

ASF GitHub Bot commented on FLINK-8944:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5992#discussion_r187788338
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
    @@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
        private List<StreamShardHandle> getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
                List<StreamShardHandle> shardsOfStream = new ArrayList<>();
     
    -           DescribeStreamResult describeStreamResult;
    +           // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
    +           // we need to use the returned nextToken to get additional 
shards.
    +           ListShardsResult listShardsResult;
    +           String startShardToken = null;
                do {
    -                   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
    -
    -                   List<Shard> shards = 
describeStreamResult.getStreamDescription().getShards();
    +                   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
    --- End diff --
    
    if within `listShards(...)` we caught the `ExpiredNextTokenException`, then 
`null` will be returned as the result, correct?
    If so, then the current built up `shardsIfStream` will be returned 
immediately, regardless of whether or not there are more shards following.
    
    Although it might not be too common that we have expired tokens here, I 
wonder if we can handle this case more gracefully (e.g., re-fetching a token to 
make sure that there really is no more shards).


> Use ListShards for shard discovery in the flink kinesis connector
> -----------------------------------------------------------------
>
>                 Key: FLINK-8944
>                 URL: https://issues.apache.org/jira/browse/FLINK-8944
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Kailash Hassan Dayanand
>            Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to