Repository: flink Updated Branches: refs/heads/master a273f645b -> 92e3db74b
[FLINK-9402] [kinesis] Kinesis consumer configuration requires either region or endpoint. Fix validation logic to allow either region or endpoint, but not both. This closes #6045. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ad868c4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ad868c4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ad868c4 Branch: refs/heads/master Commit: 9ad868c457f5083f5cd96d0fbcaa97d06eede6fe Parents: a273f64 Author: Thomas Weise <[email protected]> Authored: Sat May 19 00:27:56 2018 -0700 Committer: Till Rohrmann <[email protected]> Committed: Wed May 23 00:58:30 2018 +0200 ---------------------------------------------------------------------- .../connectors/kinesis/util/AWSUtil.java | 5 +++-- .../kinesis/util/KinesisConfigUtil.java | 10 +++++++--- .../kinesis/util/KinesisConfigUtilTest.java | 20 +++++++++++++++++++- 3 files changed, 29 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index cfddfa2..2678c90 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -79,14 +79,15 @@ public class AWSUtil { // utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard() .withCredentials(AWSUtil.getCredentialsProvider(configProps)) - .withClientConfiguration(awsClientConfig) - .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); + .withClientConfiguration(awsClientConfig); if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) { // Set signingRegion as null, to facilitate mocking Kinesis for local tests builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT), null)); + } else { + builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))); } return builder.build(); } http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index 9203136..dc8b79b 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -243,9 +243,13 @@ public class KinesisConfigUtil { } } - if (!config.containsKey(AWSConfigConstants.AWS_REGION)) { - throw new IllegalArgumentException("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); - } else { + if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) { + // per validation in AwsClientBuilder + throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", + AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION)); + } + + if (config.containsKey(AWSConfigConstants.AWS_REGION)) { // specified AWS Region name must be recognizable if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java index 074b676..69ca58b 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java @@ -131,8 +131,10 @@ public class KinesisConfigUtilTest { @Test public void testMissingAwsRegionInConfig() { + String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", + AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION); exception.expect(IllegalArgumentException.class); - exception.expectMessage("The AWS region ('" + AWSConfigConstants.AWS_REGION + "') must be set in the config."); + exception.expectMessage(expectedMessage); Properties testConfig = new Properties(); testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); @@ -155,6 +157,22 @@ public class KinesisConfigUtilTest { } @Test + public void testAwsRegionOrEndpointInConfig() { + String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.", + AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION); + exception.expect(IllegalArgumentException.class); + exception.expectMessage(expectedMessage); + + Properties testConfig = new Properties(); + testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east"); + testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake"); + testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey"); + testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey"); + + KinesisConfigUtil.validateAwsConfiguration(testConfig); + } + + @Test public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() { exception.expect(IllegalArgumentException.class); exception.expectMessage("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
