[
https://issues.apache.org/jira/browse/BEAM-8382?focusedWorklogId=354299&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354299
]
ASF GitHub Bot logged work on BEAM-8382:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/19 13:49
Start Date: 05/Dec/19 13:49
Worklog Time Spent: 10m
Work Description: aromanenko-dev commented on issue #9765:
[WIP][BEAM-8382] Add rate limit policy to KinesisIO.Read
URL: https://github.com/apache/beam/pull/9765#issuecomment-562136257
I'd like to come back to idea of using `RetryPolicy` and `BackoffStrategy`
classes from AWS SDK proposed by @cmachgodaddy above. We recently had a similar
issue with failing a pipeline because of `TransientKinesisException ` caused by
reading 2 pipelines from the same shard. It was solved by creating a new
`AWSClientsProvider`, based on current `BasicKinesisProvider`, used by default,
and just overriding a method `getKinesisClient()` where we configured our
custom `RetryPolicy` and `BackoffStrategy`.
The simplified example could look like this:
```
@Override
public AmazonKinesis getKinesisClient() {
AmazonKinesisClientBuilder clientBuilder =
AmazonKinesisClientBuilder.standard()
.withCredentials(getCredentialsProvider());
clientBuilder.withRegion(region);
RetryPolicy.BackoffStrategy backoffStrategy =
new
PredefinedBackoffStrategies.ExponentialBackoffStrategy(baseDelay,
maxBackoffTime);
RetryPolicy retryPolicy = new RetryPolicy(DEFAULT_RETRY_CONDITION,
backoffStrategy,
DEFAULT_MAX_ERROR_RETRY, true);
ClientConfiguration clientConfiguration = new ClientConfiguration();
clientBuilder.withClientConfiguration(clientConfiguration.withRetryPolicy(retryPolicy));
return clientBuilder.build();
}
```
So, instead of adding new API, which will do similar things that already
implemented in AWS SDK, I think it would be enough to just update KinesisIO
javadoc and add an example showing how to configure and leverage custom
`AWSClientsProvider` in such case.
WDYT?
PS: @jfarr I'm sorry to ask you initially to add this functionality in
KinesisIO using Beam SDK, I was not aware about such configuration option in
`AmazonKinesisClient`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 354299)
Time Spent: 10h 50m (was: 10h 40m)
> Add polling interval to KinesisIO.Read
> --------------------------------------
>
> Key: BEAM-8382
> URL: https://issues.apache.org/jira/browse/BEAM-8382
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kinesis
> Affects Versions: 2.13.0, 2.14.0, 2.15.0
> Reporter: Jonothan Farr
> Assignee: Jonothan Farr
> Priority: Major
> Time Spent: 10h 50m
> Remaining Estimate: 0h
>
> With the current implementation we are observing Kinesis throttling due to
> ReadProvisionedThroughputExceeded on the order of hundreds of times per
> second, regardless of the actual Kinesis throughput. This is because the
> ShardReadersPool readLoop() method is polling getRecords() as fast as
> possible.
> From the KDS documentation:
> {quote}Each shard can support up to five read transactions per second.
> {quote}
> and
> {quote}For best results, sleep for at least 1 second (1,000 milliseconds)
> between calls to getRecords to avoid exceeding the limit on getRecords
> frequency.
> {quote}
> [https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html]
> [https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)