CrynetLogistics commented on a change in pull request #17345:
URL: https://github.com/apache/flink/pull/17345#discussion_r755373684
##########
File path:
flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesaliteContainer.java
##########
@@ -93,45 +94,75 @@ public Properties getHostProperties() {
}
/** Returns the client to access the container from outside the docker
network. */
- public AmazonKinesis getContainerClient() {
+ public KinesisAsyncClient getContainerClient() throws URISyntaxException {
return getClient(getContainerEndpointUrl());
}
/** Returns the client to access the host from inside the docker network.
*/
- public AmazonKinesis getHostClient() {
+ public KinesisAsyncClient getHostClient() throws URISyntaxException {
return getClient(getHostEndpointUrl());
}
- private AmazonKinesis getClient(String endPoint) {
- return AmazonKinesisClientBuilder.standard()
- .withCredentials(
- new AWSStaticCredentialsProvider(
- new BasicAWSCredentials(getAccessKey(),
getSecretKey())))
- .withEndpointConfiguration(
- new AwsClientBuilder.EndpointConfiguration(endPoint,
"us-east-1"))
+ public KinesisAsyncClient getClient(String endPoint) throws
URISyntaxException {
+ return KinesisAsyncClient.builder()
+ .endpointOverride(new URI(endPoint))
+ .region(REGION)
+ .credentialsProvider(
+ () -> AwsBasicCredentials.create(getAccessKey(),
getSecretKey()))
+ .httpClient(buildSdkAsyncHttpClient())
.build();
}
+ private void startContainer() {
+ withCreateContainerCmdModifier(
+ cmd ->
+ cmd.withEntrypoint(
+ "/tini",
+ "--",
+ "/usr/src/app/node_modules/kinesalite/cli.js",
+ "--path",
+ "/var/lib/kinesalite",
+ "--ssl"));
+ }
+
private Properties getProperties(String endpointUrl) {
Properties config = new Properties();
- config.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
+ config.setProperty(AWSConfigConstants.AWS_REGION, REGION.toString());
config.setProperty(AWSConfigConstants.AWS_ENDPOINT, endpointUrl);
config.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID,
getAccessKey());
config.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
getSecretKey());
return config;
}
private class ListStreamsWaitStrategy extends AbstractWaitStrategy {
+ private static final int TIMEOUT_PER_RETRY = 15;
+
+ private final RateLimiter rateLimiter =
+ RateLimiterBuilder.newBuilder()
+ .withRate(TIMEOUT_PER_RETRY, TimeUnit.SECONDS)
+ .withConstantThroughput()
+ .build();
+
@Override
protected void waitUntilReady() {
Unreliables.retryUntilSuccess(
- (int) this.startupTimeout.getSeconds(),
+ (int) this.startupTimeout.getSeconds() * TIMEOUT_PER_RETRY,
TimeUnit.SECONDS,
- () -> this.getRateLimiter().getWhenReady(() -> tryList()));
+ () -> rateLimiter.getWhenReady(() -> tryList()));
}
- private ListStreamsResult tryList() {
- return getContainerClient().listStreams();
+ private ListStreamsResponse tryList()
Review comment:
I guess I wanted to reflect that a single call to this method may fail
and will be retried by the rate limited retry logic under `retryUntilSuccess`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]