This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2724a23a01e185ed22ee901fb60e67424b8ba413
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Nov 15 20:49:04 2018 +0100

    [hotfix] [tests] Make S3 config key forwarding a proper unit test
    
    This avoids unnecessary and expensive connections to S3 just to validate 
whether config
    keys of various formats are forwarded.
---
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      | 35 ---------------
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  | 51 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 35 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 2195bd0..1fd0afd 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -126,41 +126,6 @@ public class HadoopS3FileSystemITCase extends TestLogger {
        }
 
        @Test
-       public void testConfigKeysForwarding() throws Exception {
-               final Path path = new Path(getBasePath());
-
-               // standard Hadoop-style credential keys
-               {
-                       Configuration conf = new Configuration();
-                       conf.setString("fs.s3a.access.key", 
S3TestCredentials.getS3AccessKey());
-                       conf.setString("fs.s3a.secret.key", 
S3TestCredentials.getS3SecretKey());
-
-                       FileSystem.initialize(conf);
-                       path.getFileSystem();
-               }
-
-               // shortened Hadoop-style credential keys
-               {
-                       Configuration conf = new Configuration();
-                       conf.setString("s3.access.key", 
S3TestCredentials.getS3AccessKey());
-                       conf.setString("s3.secret.key", 
S3TestCredentials.getS3SecretKey());
-
-                       FileSystem.initialize(conf);
-                       path.getFileSystem();
-               }
-
-               // shortened Presto-style credential keys
-               {
-                       Configuration conf = new Configuration();
-                       conf.setString("s3.access-key", 
S3TestCredentials.getS3AccessKey());
-                       conf.setString("s3.secret-key", 
S3TestCredentials.getS3SecretKey());
-
-                       FileSystem.initialize(conf);
-                       path.getFileSystem();
-               }
-       }
-
-       @Test
        public void testSimpleFileWriteAndRead() throws Exception {
                final long deadline = System.nanoTime() + 30_000_000_000L; // 
30 secs
                final Configuration conf = new Configuration();
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index 6faf5b2..4471b38 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -29,6 +29,7 @@ import static org.junit.Assert.assertEquals;
  * Unit tests for the S3 file system support via Hadoop's {@link 
org.apache.hadoop.fs.s3a.S3AFileSystem}.
  */
 public class HadoopS3FileSystemTest {
+
        @Test
        public void testShadingOfAwsCredProviderConfig() {
                final Configuration conf = new Configuration();
@@ -41,4 +42,54 @@ public class HadoopS3FileSystemTest {
                
assertEquals("org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
                        hadoopConfig.get("fs.s3a.aws.credentials.provider"));
        }
+
+       // 
------------------------------------------------------------------------
+       //  These tests check that the S3FileSystemFactory properly forwards
+       // various patterns of keys for credentials.
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Test forwarding of standard Hadoop-style credential keys.
+        */
+       @Test
+       public void testConfigKeysForwardingHadoopStyle() {
+               Configuration conf = new Configuration();
+               conf.setString("fs.s3a.access.key", "test_access_key");
+               conf.setString("fs.s3a.secret.key", "test_secret_key");
+
+               checkHadoopAccessKeys(conf, "test_access_key", 
"test_secret_key");
+       }
+
+       /**
+        * Test forwarding of shortened Hadoop-style credential keys.
+        */
+       @Test
+       public void testConfigKeysForwardingShortHadoopStyle() {
+               Configuration conf = new Configuration();
+               conf.setString("s3.access.key", "my_key_a");
+               conf.setString("s3.secret.key", "my_key_b");
+
+               checkHadoopAccessKeys(conf, "my_key_a", "my_key_b");
+       }
+
+       /**
+        * Test forwarding of shortened Presto-style credential keys.
+        */
+       @Test
+       public void testConfigKeysForwardingPrestoStyle() {
+               Configuration conf = new Configuration();
+               conf.setString("s3.access-key", "clé d'accès");
+               conf.setString("s3.secret-key", "clef secrète");
+                               checkHadoopAccessKeys(conf, "clé d'accès", 
"clef secrète");
+       }
+
+       private static void checkHadoopAccessKeys(Configuration flinkConf, 
String accessKey, String secretKey) {
+               HadoopConfigLoader configLoader = 
S3FileSystemFactory.createHadoopConfigLoader();
+               configLoader.setFlinkConfig(flinkConf);
+
+               org.apache.hadoop.conf.Configuration hadoopConf = 
configLoader.getOrLoadHadoopConfig();
+
+               assertEquals(accessKey, hadoopConf.get("fs.s3a.access.key", 
null));
+               assertEquals(secretKey, hadoopConf.get("fs.s3a.secret.key", 
null));
+       }
 }

Reply via email to