Repository: flink Updated Branches: refs/heads/master d84599ea0 -> 4666e65ef
[FLINK-4611] [kinesis] Make "AUTO" credential provider as default for Kinesis Connector This closes #2914. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4666e65e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4666e65e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4666e65e Branch: refs/heads/master Commit: 4666e65ef7b1e42a2bf0bba1d7d08e8d68e1af01 Parents: d84599e Author: éåå² <[email protected]> Authored: Wed Nov 30 18:17:24 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Dec 15 22:04:33 2016 +0800 ---------------------------------------------------------------------- .../connectors/kinesis/util/AWSUtil.java | 24 +++++++++++++++----- .../kinesis/util/KinesisConfigUtil.java | 12 ++-------- .../kinesis/FlinkKinesisConsumerTest.java | 12 ---------- 3 files changed, 20 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index cff69e5..a6aad02 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -69,8 +69,20 @@ public class AWSUtil { * @return The corresponding AWS Credentials Provider instance */ public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) { - CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty( - AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString())); + CredentialProvider credentialProviderType; + if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) { + if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) + && configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { + // if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC + credentialProviderType = CredentialProvider.BASIC; + } else { + // if the credential provider type is not specified, it will default to AUTO + credentialProviderType = CredentialProvider.AUTO; + } + } else { + credentialProviderType = CredentialProvider.valueOf(configProps.getProperty( + AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)); + } AWSCredentialsProvider credentialsProvider; @@ -90,10 +102,6 @@ public class AWSUtil { ? new ProfileCredentialsProvider(profileName) : new ProfileCredentialsProvider(profileConfigPath, profileName); break; - case AUTO: - credentialsProvider = new DefaultAWSCredentialsProviderChain(); - break; - default: case BASIC: credentialsProvider = new AWSCredentialsProvider() { @Override @@ -108,6 +116,10 @@ public class AWSUtil { // do nothing } }; + break; + default: + case AUTO: + credentialsProvider = new DefaultAWSCredentialsProviderChain(); } return credentialsProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 9aa14ad..d8ea0a2 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -130,15 +130,7 @@ public class KinesisConfigUtil { * Validate configuration properties related to Amazon AWS service */ public static void validateAwsConfiguration(Properties config) { - if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) { - // if the credential provider type is not specified, it will default to BASIC later on, - // so the Access Key ID and Secret Key must be given - if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) - || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { - throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + - "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); - } - } else { + if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) { String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER); // value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable @@ -157,7 +149,7 @@ public class KinesisConfigUtil { if (providerType == CredentialProvider.BASIC) { if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID) || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) { - throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + + throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " + "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index dbf95f9..a72d8df 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -87,18 +87,6 @@ public class FlinkKinesisConsumerTest { } @Test - public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() { - exception.expect(IllegalArgumentException.class); - exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " + - "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type."); - - Properties testConfig = new Properties(); - testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - - KinesisConfigUtil.validateAwsConfiguration(testConfig); - } - - @Test public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
