[ 
https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309184#comment-16309184
 ] 

ASF GitHub Bot commented on FLINK-8271:
---------------------------------------

Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5171#discussion_r159363827
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 ---
    @@ -30,37 +30,44 @@
     import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
     import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
     import com.amazonaws.auth.profile.ProfileCredentialsProvider;
    -import com.amazonaws.regions.Region;
    +import com.amazonaws.client.builder.AwsClientBuilder;
     import com.amazonaws.regions.Regions;
    -import com.amazonaws.services.kinesis.AmazonKinesisClient;
    +import com.amazonaws.services.kinesis.AmazonKinesis;
    +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
     
     import java.util.Properties;
     
     /**
      * Some utilities specific to Amazon Web Service.
      */
     public class AWSUtil {
    +   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
    +   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
Kinesis Connector";
     
        /**
    -    * Creates an Amazon Kinesis Client.
    +    * Creates an AmazonKinesis client.
         * @param configProps configuration properties containing the access 
key, secret key, and region
    -    * @return a new Amazon Kinesis Client
    +    * @return a new AmazonKinesis client
         */
    -   public static AmazonKinesisClient createKinesisClient(Properties 
configProps) {
    +   public static AmazonKinesis createKinesisClient(Properties configProps) 
{
                // set a Flink-specific user agent
    -           ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig();
    -           awsClientConfig.setUserAgent("Apache Flink " + 
EnvironmentInformation.getVersion() +
    -                   " (" + 
EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis 
Connector");
    +           ClientConfiguration awsClientConfig = new 
ClientConfigurationFactory().getConfig()
    +                           
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
    +                                                                           
                                EnvironmentInformation.getVersion(),
    +                                                                           
                                
EnvironmentInformation.getRevisionInformation().commitId));
     
                // utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
    -           AmazonKinesisClient client = new AmazonKinesisClient(
    -                   AWSUtil.getCredentialsProvider(configProps), 
awsClientConfig);
    +           AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
    +                           
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
    +                           .withClientConfiguration(awsClientConfig)
    +                           
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
     
    -           
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
                if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
    -                   
client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
    +                   builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
    +                                                                           
                        
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
    +                                                                           
                        
configProps.getProperty(AWSConfigConstants.AWS_REGION)));
    --- End diff --
    
    @tzulitai  You are right. After some research I found the `region` field in 
`EndpointConfiguration` 
([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363))
 is used the same as [old code 
here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368).
 I will set it to null.
    
    Take it a further step, if what you described is a frequent use case, we 
should add a unit test for it for future validation. Would you like to create a 
ticket and a PR for it?


> upgrade from deprecated classes to AmazonKinesis
> ------------------------------------------------
>
>                 Key: FLINK-8271
>                 URL: https://issues.apache.org/jira/browse/FLINK-8271
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.4.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>             Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to