[
https://issues.apache.org/jira/browse/FLINK-34023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850841#comment-17850841
]
Brad Atcheson commented on FLINK-34023:
---------------------------------------
[Here's](https://github.com/brada/flink-connector-aws/commit/aa4fce309e94f979aa1f3ef48d5d27fa17f55759)
a possible solution that adds 3 new config properties:
- aws.dynamodb.client.retry-policy.num-retries
- aws.firehose.client.retry-policy.num-retries
- aws.kinesis.client.retry-policy.num-retries
That seemed like the simplest possible approach. Exposing the complete AWS SDK
retry policy would require many more parameters, including [backoff
strategy](https://sdk.amazonaws.com/java/api/2.0.0/software/amazon/awssdk/core/retry/backoff/BackoffStrategy.html),
base delay, max backoff time etc. Since num-retries is the most important
parameter, would it be acceptable to expose just that one for now?
The `mvn clean verify` and `mvm clean package` commands fail on that branch,
but they also fail on the origin main branch for what appears to be issues
unrelated to my commit:
```
[ERROR] Failures:
[ERROR] RowDataToAttributeValueConverterTest.testFloat:208
Expecting map:
\{"key"=AttributeValue(N=1.2345679E17)}
to contain entries:
["key"=AttributeValue(N=1.23456791E17)]
but the following map entries had different values:
["key"=AttributeValue(N=1.2345679E17) (expected:
AttributeValue(N=1.23456791E17))]
```
> Expose Kinesis client retry config in sink
> ------------------------------------------
>
> Key: FLINK-34023
> URL: https://issues.apache.org/jira/browse/FLINK-34023
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kinesis
> Reporter: Brad Atcheson
> Priority: Major
>
> The consumer side exposes client retry configuration like
> [flink.shard.getrecords.maxretries|https://nightlies.apache.org/flink/flink-docs-stable/api/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.html#SHARD_GETRECORDS_RETRIES]
> but the producer side lacks similar config for PutRecords.
> The KinesisStreamsSinkWriter constructor calls
> {code}
> this.httpClient =
> AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
> this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
> {code}
> But those methods only refer to these values (aside from
> endpoint/region/creds) in the kinesisClientProperties:
> * aws.http-client.max-concurrency
> * aws.http-client.read-timeout
> * aws.trust.all.certificates
> * aws.http.protocol.version
> Without control over retry, users can observe exceptions like {code}Request
> attempt 2 failure: Unable to execute HTTP request: connection timed out after
> 2000 ms: kinesis.us-west-2.amazonaws.com{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)