[ 
https://issues.apache.org/jira/browse/BEAM-8382?focusedWorklogId=384158&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384158
 ]

ASF GitHub Bot logged work on BEAM-8382:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Feb/20 23:21
            Start Date: 09/Feb/20 23:21
    Worklog Time Spent: 10m 
      Work Description: jfarr commented on issue #9765: [WIP][BEAM-8382] Add 
rate limit policy to KinesisIO.Read
URL: https://github.com/apache/beam/pull/9765#issuecomment-583906595
 
 
   Hi @aromanenko-dev, I finally got around to testing this AWS SDK 
RetryPolicy-based approach but with very little success. First of all, the SDK 
client by default uses the following retry policy for all throttling exceptions:
   
   - backoff strategy = PredefinedBackoffStrategies.EqualJitterBackoffStrategy
   - base delay = 500ms
   - max backoff = 20s
   
   Where EqualJitterBackoffStrategy is an exponential backoff with jitter. This 
strategy will ensure that the jitter will never be more than 1/2 the max delay 
for that retry attempt. So the initial delay will always be between 250-500ms.
   
   I added some logging and I can see the retry policy behaving as expected. 
For example:
   
   ```
   2020-02-09T21:48:00.413+00:00 com.godaddy..KinesisClientsProvider 
shouldRetry exception 
`com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException: 
Rate exceeded for shard shardId-000000000000 in stream beam-test ...`: true
   2020-02-09T21:48:00.413+00:00 com.godaddy..KinesisClientsProvider 
delayBeforeNextRetry: 333 retriesAttempted: 0
   2020-02-09T21:48:00.413+00:00 com.amazonaws.http.AmazonHttpClient Retriable 
error detected, will retry in 333ms, attempt number: 0
   ```
   
   Indeed the initial backoff always seems to be within 250 to 500ms. The 
exponential backoff is pretty irrelevant, though, because the initial delay 
also seems sufficient that the first retry attempt always succeeds. I didn't 
see any instances of a second retry attempt in my testing.
   
   However, the cumulative effect of all of these initial instances of 
ProvisionedThroughputExceededException is that approximately 30% of all 
getRecords() calls fail. You can see that in the attached charts where 
GetRecords.Success is 0.7 and ReadProvisionedThroughputExceeded is 0.3. I don't 
think it matters whether the retry policy is implemented in Beam or in the AWS 
SDK, I don't think it can be tuned to perform much better than this.
   
   <img width="376" alt="Screen Shot 2020-02-09 at 1 10 18 PM" 
src="https://user-images.githubusercontent.com/1551631/74112146-f9e09a00-4b4e-11ea-8807-233922194aaf.png";>
   <img width="368" alt="Screen Shot 2020-02-09 at 1 10 46 PM" 
src="https://user-images.githubusercontent.com/1551631/74112151-0664f280-4b4f-11ea-8c71-d3babb7b970c.png";>
   
   
   Here is the code I used just for reference:
   ```
       RetryCondition retryCondition = new RetryCondition() {
         RetryCondition defaultCondition = 
PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION;
         @Override
         public boolean shouldRetry(AmazonWebServiceRequest originalRequest, 
AmazonClientException exception, int retriesAttempted) {
           boolean retry = defaultCondition.shouldRetry(originalRequest, 
exception, retriesAttempted);
           LOG.info("shouldRetry exception `{}`: {}", exception, retry);
           return retry;
         }
       };
       BackoffStrategy backoffStrategy = new BackoffStrategy() {
         BackoffStrategy defaultStrategy = 
PredefinedRetryPolicies.DEFAULT_BACKOFF_STRATEGY;
         @Override
         public long delayBeforeNextRetry(AmazonWebServiceRequest 
originalRequest, AmazonClientException exception, int retriesAttempted) {
           long delay = defaultStrategy.delayBeforeNextRetry(originalRequest, 
exception, retriesAttempted);
           LOG.info("delayBeforeNextRetry: {} retriesAttempted: {}", delay, 
retriesAttempted);
           return delay;
         }
       };
       RetryPolicy retryPolicy = new RetryPolicy(retryCondition, 
backoffStrategy,
           PredefinedRetryPolicies.DEFAULT_MAX_ERROR_RETRY, false);
   
       return AmazonKinesisClientBuilder.standard()
           
.withClientConfiguration(PredefinedClientConfigurations.defaultConfig()
               .withRetryPolicy(retryPolicy))
           .build();
   ```
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 384158)
    Time Spent: 12h  (was: 11h 50m)

> Add polling interval to KinesisIO.Read
> --------------------------------------
>
>                 Key: BEAM-8382
>                 URL: https://issues.apache.org/jira/browse/BEAM-8382
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kinesis
>    Affects Versions: 2.13.0, 2.14.0, 2.15.0
>            Reporter: Jonothan Farr
>            Assignee: Jonothan Farr
>            Priority: Major
>          Time Spent: 12h
>  Remaining Estimate: 0h
>
> With the current implementation we are observing Kinesis throttling due to 
> ReadProvisionedThroughputExceeded on the order of hundreds of times per 
> second, regardless of the actual Kinesis throughput. This is because the 
> ShardReadersPool readLoop() method is polling getRecords() as fast as 
> possible.
> From the KDS documentation:
> {quote}Each shard can support up to five read transactions per second.
> {quote}
> and
> {quote}For best results, sleep for at least 1 second (1,000 milliseconds) 
> between calls to getRecords to avoid exceeding the limit on getRecords 
> frequency.
> {quote}
> [https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html]
> [https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to