Repository: flink
Updated Branches:
  refs/heads/master d84599ea0 -> 4666e65ef


[FLINK-4611] [kinesis] Make "AUTO" credential provider as default for Kinesis 
Connector

This closes #2914.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4666e65e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4666e65e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4666e65e

Branch: refs/heads/master
Commit: 4666e65ef7b1e42a2bf0bba1d7d08e8d68e1af01
Parents: d84599e
Author: 魏偉哲 <[email protected]>
Authored: Wed Nov 30 18:17:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Dec 15 22:04:33 2016 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/util/AWSUtil.java        | 24 +++++++++++++++-----
 .../kinesis/util/KinesisConfigUtil.java         | 12 ++--------
 .../kinesis/FlinkKinesisConsumerTest.java       | 12 ----------
 3 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index cff69e5..a6aad02 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -69,8 +69,20 @@ public class AWSUtil {
         * @return The corresponding AWS Credentials Provider instance
         */
        public static AWSCredentialsProvider getCredentialsProvider(final 
Properties configProps) {
-               CredentialProvider credentialProviderType = 
CredentialProvider.valueOf(configProps.getProperty(
-                       AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, 
CredentialProvider.BASIC.toString()));
+               CredentialProvider credentialProviderType;
+               if 
(!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+                       if 
(configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+                               && 
configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+                               // if the credential provider type is not 
specified, but the Access Key ID and Secret Key are given, it will default to 
BASIC
+                               credentialProviderType = 
CredentialProvider.BASIC;
+                       } else {
+                               // if the credential provider type is not 
specified, it will default to AUTO
+                               credentialProviderType = 
CredentialProvider.AUTO;
+                       }
+               } else {
+                       credentialProviderType = 
CredentialProvider.valueOf(configProps.getProperty(
+                               AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+               }
 
                AWSCredentialsProvider credentialsProvider;
 
@@ -90,10 +102,6 @@ public class AWSUtil {
                                        ? new 
ProfileCredentialsProvider(profileName)
                                        : new 
ProfileCredentialsProvider(profileConfigPath, profileName);
                                break;
-                       case AUTO:
-                               credentialsProvider = new 
DefaultAWSCredentialsProviderChain();
-                               break;
-                       default:
                        case BASIC:
                                credentialsProvider = new 
AWSCredentialsProvider() {
                                        @Override
@@ -108,6 +116,10 @@ public class AWSUtil {
                                                // do nothing
                                        }
                                };
+                               break;
+                       default:
+                       case AUTO:
+                               credentialsProvider = new 
DefaultAWSCredentialsProviderChain();
                }
 
                return credentialsProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9aa14ad..d8ea0a2 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -130,15 +130,7 @@ public class KinesisConfigUtil {
         * Validate configuration properties related to Amazon AWS service
         */
        public static void validateAwsConfiguration(Properties config) {
-               if 
(!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
-                       // if the credential provider type is not specified, it 
will default to BASIC later on,
-                       // so the Access Key ID and Secret Key must be given
-                       if 
(!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
-                               || 
!config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
-                               throw new IllegalArgumentException("Please set 
values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-                                       "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
-                       }
-               } else {
+               if 
(config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
                        String credentialsProviderType = 
config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
 
                        // value specified for 
AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
@@ -157,7 +149,7 @@ public class KinesisConfigUtil {
                        if (providerType == CredentialProvider.BASIC) {
                                if 
(!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
                                        || 
!config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
-                                       throw new 
IllegalArgumentException("Please set values for AWS Access Key ID ('"+ 
AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+                                       throw new 
IllegalArgumentException("Please set values for AWS Access Key ID ('" + 
AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
                                                "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index dbf95f9..a72d8df 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -87,18 +87,6 @@ public class FlinkKinesisConsumerTest {
        }
 
        @Test
-       public void 
testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
-               exception.expect(IllegalArgumentException.class);
-               exception.expectMessage("Please set values for AWS Access Key 
ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-                               "and Secret Key ('" + 
AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS 
credential provider type.");
-
-               Properties testConfig = new Properties();
-               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-
-               KinesisConfigUtil.validateAwsConfiguration(testConfig);
-       }
-
-       @Test
        public void 
testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("Please set values for AWS Access Key 
ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +

Reply via email to