aljoscha closed pull request #6405: [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop c… URL: https://github.com/apache/flink/pull/6405
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java new file mode 100644 index 00000000000..201d58049bc --- /dev/null +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** Base class for Hadoop file system factories. */ +public abstract class AbstractFileSystemFactory implements FileSystemFactory { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileSystemFactory.class); + + /** Name of this factory for logging. */ + private final String name; + + private final HadoopConfigLoader hadoopConfigLoader; + + protected AbstractFileSystemFactory(String name, HadoopConfigLoader hadoopConfigLoader) { + this.name = name; + this.hadoopConfigLoader = hadoopConfigLoader; + } + + @Override + public void configure(Configuration config) { + hadoopConfigLoader.setFlinkConfig(config); + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + LOG.debug("Creating Hadoop file system (backed by " + name + ")"); + LOG.debug("Loading Hadoop configuration for " + name); + org.apache.hadoop.conf.Configuration hadoopConfig = hadoopConfigLoader.getOrLoadHadoopConfig(); + org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem(); + fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig); + return new HadoopFileSystem(fs); + } + + protected abstract org.apache.hadoop.fs.FileSystem createHadoopFileSystem(); + + protected abstract URI getInitURI( + URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig); +} + diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java new file mode 100644 index 00000000000..a40e24fae9d --- /dev/null +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.HadoopUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.Set; + +/** This class lazily loads hadoop configuration from resettable Flink's configuration. */ +public class HadoopConfigLoader { + private static final Logger LOG = LoggerFactory.getLogger(HadoopConfigLoader.class); + + /** The prefixes that Flink adds to the Hadoop fs config. */ + private final String[] flinkConfigPrefixes; + + /** Keys that are replaced (after prefix replacement, to give a more uniform experience + * across different file system implementations. */ + private final String[][] mirroredConfigKeys; + + /** Hadoop config prefix to replace Flink prefix. */ + private final String hadoopConfigPrefix; + + private final Set<String> packagePrefixesToShade; + private final Set<String> configKeysToShade; + private final String flinkShadingPrefix; + + /** Flink's configuration object. */ + private Configuration flinkConfig; + + /** Hadoop's configuration for the file systems, lazily initialized. */ + private org.apache.hadoop.conf.Configuration hadoopConfig; + + public HadoopConfigLoader( + @Nonnull String[] flinkConfigPrefixes, + @Nonnull String[][] mirroredConfigKeys, + @Nonnull String hadoopConfigPrefix, + Set<String> packagePrefixesToShade, + @Nonnull Set<String> configKeysToShade, + @Nonnull String flinkShadingPrefix) { + this.flinkConfigPrefixes = flinkConfigPrefixes; + this.mirroredConfigKeys = mirroredConfigKeys; + this.hadoopConfigPrefix = hadoopConfigPrefix; + this.packagePrefixesToShade = packagePrefixesToShade; + this.configKeysToShade = configKeysToShade; + this.flinkShadingPrefix = flinkShadingPrefix; + } + + public void setFlinkConfig(Configuration config) { + flinkConfig = config; + hadoopConfig = null; + } + + /** get the loaded Hadoop config (or fall back to one loaded from the classpath). */ + public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig() { + org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; + if (hadoopConfig == null) { + if (flinkConfig != null) { + hadoopConfig = mirrorCertainHadoopConfig(loadHadoopConfigFromFlink()); + } + else { + LOG.warn("Flink configuration is not set prior to loading this configuration." + + " Using Hadoop configuration from the classpath."); + hadoopConfig = new org.apache.hadoop.conf.Configuration(); + } + } + this.hadoopConfig = hadoopConfig; + return hadoopConfig; + } + + // add additional config entries from the Flink config to the Hadoop config + private org.apache.hadoop.conf.Configuration loadHadoopConfigFromFlink() { + org.apache.hadoop.conf.Configuration hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); + for (String key : flinkConfig.keySet()) { + for (String prefix : flinkConfigPrefixes) { + if (key.startsWith(prefix)) { + String value = flinkConfig.getString(key, null); + String newKey = hadoopConfigPrefix + key.substring(prefix.length()); + String newValue = fixHadoopConfig(key, flinkConfig.getString(key, null)); + hadoopConfig.set(newKey, newValue); + + LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config", key, newKey, value); + } + } + } + return hadoopConfig; + } + + // mirror certain keys to make use more uniform across implementations + // with different keys + private org.apache.hadoop.conf.Configuration mirrorCertainHadoopConfig( + org.apache.hadoop.conf.Configuration hadoopConfig) { + for (String[] mirrored : mirroredConfigKeys) { + String value = hadoopConfig.get(mirrored[0], null); + if (value != null) { + hadoopConfig.set(mirrored[1], value); + } + } + return hadoopConfig; + } + + private String fixHadoopConfig(String key, String value) { + return key != null && configKeysToShade.contains(key) ? + shadeClassConfig(value) : value; + } + + private String shadeClassConfig(String classConfig) { + return packagePrefixesToShade.stream().anyMatch(classConfig::startsWith) ? + flinkShadingPrefix + classConfig : classConfig; + } +} diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java index bd272e5701b..4ba49bbc2ba 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java @@ -18,127 +18,77 @@ package org.apache.flink.fs.s3hadoop; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URI; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** * Simple factory for the S3 file system. */ -public class S3FileSystemFactory implements FileSystemFactory { - +public class S3FileSystemFactory extends AbstractFileSystemFactory { private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class); - /** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. */ - private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." }; + private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = + new HashSet<>(Collections.singletonList("com.amazonaws.")); + + private static final Set<String> CONFIG_KEYS_TO_SHADE = + Collections.unmodifiableSet(new HashSet<>(Collections.singleton("fs.s3a.aws.credentials.provider"))); + + private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3hadoop.shaded."; + + private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "s3a.", "fs.s3a." }; - /** Keys that are replaced (after prefix replacement, to give a more uniform experience - * across different file system implementations. */ private static final String[][] MIRRORED_CONFIG_KEYS = { { "fs.s3a.access-key", "fs.s3a.access.key" }, { "fs.s3a.secret-key", "fs.s3a.secret.key" } }; - /** Flink's configuration object. */ - private Configuration flinkConfig; - - /** Hadoop's configuration for the file systems, lazily initialized. */ - private org.apache.hadoop.conf.Configuration hadoopConfig; + public S3FileSystemFactory() { + super("Hadoop s3a file system", createHadoopConfigLoader()); + } @Override public String getScheme() { return "s3"; } - @Override - public void configure(Configuration config) { - flinkConfig = config; - hadoopConfig = null; + @VisibleForTesting + static HadoopConfigLoader createHadoopConfigLoader() { + return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS, + "fs.s3a.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX); } @Override - public FileSystem create(URI fsUri) throws IOException { - LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system)"); - - try { - // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) - - org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; - if (hadoopConfig == null) { - if (flinkConfig != null) { - LOG.debug("Loading Hadoop configuration for s3a file system"); - hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); - - // add additional config entries from the Flink config to the Presto Hadoop config - for (String key : flinkConfig.keySet()) { - for (String prefix : CONFIG_PREFIXES) { - if (key.startsWith(prefix)) { - String value = flinkConfig.getString(key, null); - String newKey = "fs.s3a." + key.substring(prefix.length()); - hadoopConfig.set(newKey, flinkConfig.getString(key, null)); - - LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " + - "S3A File System", key, newKey, value); - } - } - } - - // mirror certain keys to make use more uniform across s3 implementations - // with different keys - for (String[] mirrored : MIRRORED_CONFIG_KEYS) { - String value = hadoopConfig.get(mirrored[0], null); - if (value != null) { - hadoopConfig.set(mirrored[1], value); - } - } - - this.hadoopConfig = hadoopConfig; - } - else { - LOG.warn("The factory has not been configured prior to loading the S3 file system." - + " Using Hadoop configuration from the classpath."); - - hadoopConfig = new org.apache.hadoop.conf.Configuration(); - this.hadoopConfig = hadoopConfig; - } - } - - // -- (2) Instantiate the Hadoop file system class for that scheme + protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() { + return new S3AFileSystem(); + } - final String scheme = fsUri.getScheme(); - final String authority = fsUri.getAuthority(); + @Override + protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) { + final String scheme = fsUri.getScheme(); + final String authority = fsUri.getAuthority(); - if (scheme == null && authority == null) { - fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); - } - else if (scheme != null && authority == null) { - URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); - if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { - fsUri = defaultUri; - } + if (scheme == null && authority == null) { + fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); + } + else if (scheme != null && authority == null) { + URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); + if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { + fsUri = defaultUri; } + } - LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri); - - final S3AFileSystem fs = new S3AFileSystem(); - fs.initialize(fsUri, hadoopConfig); + LOG.debug("Using scheme {} for s3a file system backing the S3 File System", fsUri); - return new HadoopFileSystem(fs); - } - catch (IOException e) { - throw e; - } - catch (Exception e) { - throw new IOException(e.getMessage(), e); - } + return fsUri; } } diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java new file mode 100644 index 00000000000..647a93711b7 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3hadoop; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}. + */ +public class HadoopS3FileSystemTest { + @Test + public void testShadingOfAwsCredProviderConfig() { + final Configuration conf = new Configuration(); + conf.setString("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider"); + + HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader(); + configLoader.setFlinkConfig(conf); + + org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig(); + assertEquals("org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider", + hadoopConfig.get("fs.s3a.aws.credentials.provider")); + } +} diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java index 8847dc9cc2d..a04f9c94a62 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java @@ -18,126 +18,81 @@ package org.apache.flink.fs.s3presto; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader; +import org.apache.flink.util.FlinkRuntimeException; import com.facebook.presto.hive.PrestoS3FileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** * Simple factory for the S3 file system. */ -public class S3FileSystemFactory implements FileSystemFactory { +public class S3FileSystemFactory extends AbstractFileSystemFactory { + private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = + new HashSet<>(Collections.singletonList("com.amazonaws.")); - private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class); + private static final Set<String> CONFIG_KEYS_TO_SHADE = + Collections.unmodifiableSet(new HashSet<>(Collections.singleton("presto.s3.credentials-provider"))); - /** The prefixes that Flink adds to the Hadoop config under 'presto.s3.'. */ - private static final String[] CONFIG_PREFIXES = { "s3.", "presto.s3." }; + private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3presto.shaded."; + + private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "presto.s3." }; - /** Keys that are replaced (after prefix replacement, to give a more uniform experience - * across different file system implementations. */ private static final String[][] MIRRORED_CONFIG_KEYS = { { "presto.s3.access.key", "presto.s3.access-key" }, { "presto.s3.secret.key", "presto.s3.secret-key" } }; - /** Flink's configuration object. */ - private Configuration flinkConfig; - - /** Hadoop's configuration for the file systems, lazily initialized. */ - private org.apache.hadoop.conf.Configuration hadoopConfig; + public S3FileSystemFactory() { + super("Presto S3 File System", createHadoopConfigLoader()); + } @Override public String getScheme() { return "s3"; } + @VisibleForTesting + static HadoopConfigLoader createHadoopConfigLoader() { + return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS, + "presto.s3.", PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX); + } + @Override - public void configure(Configuration config) { - flinkConfig = config; - hadoopConfig = null; + protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() { + return new PrestoS3FileSystem(); } @Override - public FileSystem create(URI fsUri) throws IOException { - LOG.debug("Creating S3 file system (backed by a Hadoop s3a file system"); + protected URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) { + final String scheme = fsUri.getScheme(); + final String authority = fsUri.getAuthority(); + final URI initUri; - try { - // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) - - org.apache.hadoop.conf.Configuration hadoopConfig = this.hadoopConfig; - if (hadoopConfig == null) { - if (flinkConfig != null) { - LOG.debug("Loading Hadoop configuration for Presto S3 file system"); - hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); - - // add additional config entries from the Flink config to the Presto Hadoop config - for (String key : flinkConfig.keySet()) { - for (String prefix : CONFIG_PREFIXES) { - if (key.startsWith(prefix)) { - String value = flinkConfig.getString(key, null); - String newKey = "presto.s3." + key.substring(prefix.length()); - hadoopConfig.set(newKey, flinkConfig.getString(key, null)); - - LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " + - "Presto S3 File System", key, newKey, value); - } - } - } - - // mirror certain keys to make use more uniform across s3 implementations - // with different keys - for (String[] mirrored : MIRRORED_CONFIG_KEYS) { - String value = hadoopConfig.get(mirrored[0], null); - if (value != null) { - hadoopConfig.set(mirrored[1], value); - } - } - - this.hadoopConfig = hadoopConfig; - } - else { - LOG.warn("The factory has not been configured prior to loading the S3 file system." - + " Using Hadoop configuration from the classpath."); - - hadoopConfig = new org.apache.hadoop.conf.Configuration(); - this.hadoopConfig = hadoopConfig; - } - } - - // -- (2) Instantiate the Presto file system class for that scheme - - final String scheme = fsUri.getScheme(); - final String authority = fsUri.getAuthority(); - final URI initUri; - - if (scheme == null && authority == null) { - initUri = new URI("s3://s3.amazonaws.com"); - } - else if (scheme != null && authority == null) { - initUri = new URI(scheme + "://s3.amazonaws.com"); - } - else { - initUri = fsUri; - } - - final PrestoS3FileSystem fs = new PrestoS3FileSystem(); - fs.initialize(initUri, hadoopConfig); - - return new HadoopFileSystem(fs); + if (scheme == null && authority == null) { + initUri = createURI("s3://s3.amazonaws.com"); + } + else if (scheme != null && authority == null) { + initUri = createURI(scheme + "://s3.amazonaws.com"); } - catch (IOException e) { - throw e; + else { + initUri = fsUri; } - catch (Exception e) { - throw new IOException(e.getMessage(), e); + return initUri; + } + + private URI createURI(String str) { + try { + return new URI(str); + } catch (URISyntaxException e) { + throw new FlinkRuntimeException("Error in s3 aws URI - " + str, e); } } } diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java index 7e2d12aebb0..4eeb2d4512a 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import com.amazonaws.auth.AWSCredentialsProvider; @@ -31,6 +32,7 @@ import java.lang.reflect.Field; import java.net.URI; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** @@ -75,6 +77,19 @@ public void testConfigPropagationAlternateStyle() throws Exception{ validateBasicCredentials(fs); } + @Test + public void testShadingOfAwsCredProviderConfig() { + final Configuration conf = new Configuration(); + conf.setString("presto.s3.credentials-provider", "com.amazonaws.auth.ContainerCredentialsProvider"); + + HadoopConfigLoader configLoader = S3FileSystemFactory.createHadoopConfigLoader(); + configLoader.setFlinkConfig(conf); + + org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig(); + assertEquals("org.apache.flink.fs.s3presto.shaded.com.amazonaws.auth.ContainerCredentialsProvider", + hadoopConfig.get("presto.s3.credentials-provider")); + } + // ------------------------------------------------------------------------ // utilities // ------------------------------------------------------------------------ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
