Repository: flink
Updated Branches:
  refs/heads/master a273f645b -> 92e3db74b


[FLINK-9402] [kinesis] Kinesis consumer configuration requires either region or 
endpoint.

Fix validation logic to allow either region or endpoint, but not both.

This closes #6045.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ad868c4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ad868c4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ad868c4

Branch: refs/heads/master
Commit: 9ad868c457f5083f5cd96d0fbcaa97d06eede6fe
Parents: a273f64
Author: Thomas Weise <[email protected]>
Authored: Sat May 19 00:27:56 2018 -0700
Committer: Till Rohrmann <[email protected]>
Committed: Wed May 23 00:58:30 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/util/AWSUtil.java        |  5 +++--
 .../kinesis/util/KinesisConfigUtil.java         | 10 +++++++---
 .../kinesis/util/KinesisConfigUtilTest.java     | 20 +++++++++++++++++++-
 3 files changed, 29 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index cfddfa2..2678c90 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -79,14 +79,15 @@ public class AWSUtil {
                // utilize automatic refreshment of credentials by directly 
passing the AWSCredentialsProvider
                AmazonKinesisClientBuilder builder = 
AmazonKinesisClientBuilder.standard()
                                
.withCredentials(AWSUtil.getCredentialsProvider(configProps))
-                               .withClientConfiguration(awsClientConfig)
-                               
.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
+                               .withClientConfiguration(awsClientConfig);
 
                if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
                        // Set signingRegion as null, to facilitate mocking 
Kinesis for local tests
                        builder.withEndpointConfiguration(new 
AwsClientBuilder.EndpointConfiguration(
                                                                                
                        
configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
                                                                                
                        null));
+               } else {
+                       
builder.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
                }
                return builder.build();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9203136..dc8b79b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -243,9 +243,13 @@ public class KinesisConfigUtil {
                        }
                }
 
-               if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
-                       throw new IllegalArgumentException("The AWS region ('" 
+ AWSConfigConstants.AWS_REGION + "') must be set in the config.");
-               } else {
+               if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ 
config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
+                       // per validation in AwsClientBuilder
+                       throw new 
IllegalArgumentException(String.format("Either AWS region ('%s') or AWS 
endpoint ('%s') must be set in the config.",
+                               AWSConfigConstants.AWS_REGION, 
AWSConfigConstants.AWS_REGION));
+               }
+
+               if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
                        // specified AWS Region name must be recognizable
                        if 
(!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
                                StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/9ad868c4/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
index 074b676..69ca58b 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtilTest.java
@@ -131,8 +131,10 @@ public class KinesisConfigUtilTest {
 
        @Test
        public void testMissingAwsRegionInConfig() {
+               String expectedMessage = String.format("Either AWS region 
('%s') or AWS endpoint ('%s') must be set in the config.",
+                       AWSConfigConstants.AWS_REGION, 
AWSConfigConstants.AWS_REGION);
                exception.expect(IllegalArgumentException.class);
-               exception.expectMessage("The AWS region ('" + 
AWSConfigConstants.AWS_REGION + "') must be set in the config.");
+               exception.expectMessage(expectedMessage);
 
                Properties testConfig = new Properties();
                testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");
@@ -155,6 +157,22 @@ public class KinesisConfigUtilTest {
        }
 
        @Test
+       public void testAwsRegionOrEndpointInConfig() {
+               String expectedMessage = String.format("Either AWS region 
('%s') or AWS endpoint ('%s') must be set in the config.",
+                       AWSConfigConstants.AWS_REGION, 
AWSConfigConstants.AWS_REGION);
+               exception.expect(IllegalArgumentException.class);
+               exception.expectMessage(expectedMessage);
+
+               Properties testConfig = new Properties();
+               testConfig.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east");
+               testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
+               testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
"accessKey");
+               
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");
+
+               KinesisConfigUtil.validateAwsConfiguration(testConfig);
+       }
+
+       @Test
        public void 
testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
                exception.expect(IllegalArgumentException.class);
                exception.expectMessage("Please set values for AWS Access Key 
ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +

Reply via email to