Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5171#discussion_r158166090
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
---
@@ -30,37 +30,44 @@
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import java.util.Properties;
/**
* Some utilities specific to Amazon Web Service.
*/
public class AWSUtil {
+ /** Used for formatting Flink-specific user agent string when creating
Kinesis client. */
+ private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s)
Kinesis Connector";
/**
- * Creates an Amazon Kinesis Client.
+ * Creates an AmazonKinesis client.
* @param configProps configuration properties containing the access
key, secret key, and region
- * @return a new Amazon Kinesis Client
+ * @return a new AmazonKinesis client
*/
- public static AmazonKinesisClient createKinesisClient(Properties
configProps) {
+ public static AmazonKinesis createKinesisClient(Properties configProps)
{
// set a Flink-specific user agent
- ClientConfiguration awsClientConfig = new
ClientConfigurationFactory().getConfig();
- awsClientConfig.setUserAgent("Apache Flink " +
EnvironmentInformation.getVersion() +
- " (" +
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis
Connector");
+ ClientConfiguration awsClientConfig = new
ClientConfigurationFactory().getConfig()
+
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+
EnvironmentInformation.getVersion(),
+
EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly
passing the AWSCredentialsProvider
- AmazonKinesisClient client = new AmazonKinesisClient(
- AWSUtil.getCredentialsProvider(configProps),
awsClientConfig);
+ AmazonKinesisClientBuilder builder =
AmazonKinesisClientBuilder.standard()
+
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+ .withClientConfiguration(awsClientConfig)
+
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
-
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ builder.withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(
+
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+
configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --
Why does the endpoint configuration have a region now?
For example, lets say a user wants to test the connector against a local
Kinesis mock service at "localhost:1111". The user also originally was issuing
against the regular AWS Kinesis service, at region "us-west-1". The users
properties would be like -
```
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-west-1");
configProps.setProperty(AWSConfigConstants.AWS_ENDPOINT, "localhost:1111");
```
In the past, this would correctly redirect requests to "localhost:1111".
With this change, is this also the case? Or do we actually need to call
`new
AwsClientBuilder.EndpointConfiguration(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
null)` instead (do not provide region in endpoint)?
---