[
https://issues.apache.org/jira/browse/FLINK-8439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559627#comment-16559627
]
ASF GitHub Bot commented on FLINK-8439:
---------------------------------------
aljoscha closed pull request #6405: [FLINK-8439] Add Flink shading to AWS
credential provider s3 hadoop c…
URL: https://github.com/apache/flink/pull/6405
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
new file mode 100644
index 00000000000..201d58049bc
--- /dev/null
+++
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/AbstractFileSystemFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/** Base class for Hadoop file system factories. */
+public abstract class AbstractFileSystemFactory implements FileSystemFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractFileSystemFactory.class);
+
+ /** Name of this factory for logging. */
+ private final String name;
+
+ private final HadoopConfigLoader hadoopConfigLoader;
+
+ protected AbstractFileSystemFactory(String name, HadoopConfigLoader
hadoopConfigLoader) {
+ this.name = name;
+ this.hadoopConfigLoader = hadoopConfigLoader;
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ hadoopConfigLoader.setFlinkConfig(config);
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws IOException {
+ LOG.debug("Creating Hadoop file system (backed by " + name +
")");
+ LOG.debug("Loading Hadoop configuration for " + name);
+ org.apache.hadoop.conf.Configuration hadoopConfig =
hadoopConfigLoader.getOrLoadHadoopConfig();
+ org.apache.hadoop.fs.FileSystem fs = createHadoopFileSystem();
+ fs.initialize(getInitURI(fsUri, hadoopConfig), hadoopConfig);
+ return new HadoopFileSystem(fs);
+ }
+
+ protected abstract org.apache.hadoop.fs.FileSystem
createHadoopFileSystem();
+
+ protected abstract URI getInitURI(
+ URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
+}
+
diff --git
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
new file mode 100644
index 00000000000..a40e24fae9d
--- /dev/null
+++
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/** This class lazily loads hadoop configuration from resettable Flink's
configuration. */
+public class HadoopConfigLoader {
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopConfigLoader.class);
+
+ /** The prefixes that Flink adds to the Hadoop fs config. */
+ private final String[] flinkConfigPrefixes;
+
+ /** Keys that are replaced (after prefix replacement, to give a more
uniform experience
+ * across different file system implementations. */
+ private final String[][] mirroredConfigKeys;
+
+ /** Hadoop config prefix to replace Flink prefix. */
+ private final String hadoopConfigPrefix;
+
+ private final Set<String> packagePrefixesToShade;
+ private final Set<String> configKeysToShade;
+ private final String flinkShadingPrefix;
+
+ /** Flink's configuration object. */
+ private Configuration flinkConfig;
+
+ /** Hadoop's configuration for the file systems, lazily initialized. */
+ private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+ public HadoopConfigLoader(
+ @Nonnull String[] flinkConfigPrefixes,
+ @Nonnull String[][] mirroredConfigKeys,
+ @Nonnull String hadoopConfigPrefix,
+ Set<String> packagePrefixesToShade,
+ @Nonnull Set<String> configKeysToShade,
+ @Nonnull String flinkShadingPrefix) {
+ this.flinkConfigPrefixes = flinkConfigPrefixes;
+ this.mirroredConfigKeys = mirroredConfigKeys;
+ this.hadoopConfigPrefix = hadoopConfigPrefix;
+ this.packagePrefixesToShade = packagePrefixesToShade;
+ this.configKeysToShade = configKeysToShade;
+ this.flinkShadingPrefix = flinkShadingPrefix;
+ }
+
+ public void setFlinkConfig(Configuration config) {
+ flinkConfig = config;
+ hadoopConfig = null;
+ }
+
+ /** get the loaded Hadoop config (or fall back to one loaded from the
classpath). */
+ public org.apache.hadoop.conf.Configuration getOrLoadHadoopConfig() {
+ org.apache.hadoop.conf.Configuration hadoopConfig =
this.hadoopConfig;
+ if (hadoopConfig == null) {
+ if (flinkConfig != null) {
+ hadoopConfig =
mirrorCertainHadoopConfig(loadHadoopConfigFromFlink());
+ }
+ else {
+ LOG.warn("Flink configuration is not set prior
to loading this configuration."
+ + " Using Hadoop configuration from the
classpath.");
+ hadoopConfig = new
org.apache.hadoop.conf.Configuration();
+ }
+ }
+ this.hadoopConfig = hadoopConfig;
+ return hadoopConfig;
+ }
+
+ // add additional config entries from the Flink config to the Hadoop
config
+ private org.apache.hadoop.conf.Configuration
loadHadoopConfigFromFlink() {
+ org.apache.hadoop.conf.Configuration hadoopConfig =
HadoopUtils.getHadoopConfiguration(flinkConfig);
+ for (String key : flinkConfig.keySet()) {
+ for (String prefix : flinkConfigPrefixes) {
+ if (key.startsWith(prefix)) {
+ String value =
flinkConfig.getString(key, null);
+ String newKey = hadoopConfigPrefix +
key.substring(prefix.length());
+ String newValue = fixHadoopConfig(key,
flinkConfig.getString(key, null));
+ hadoopConfig.set(newKey, newValue);
+
+ LOG.debug("Adding Flink config entry
for {} as {}={} to Hadoop config", key, newKey, value);
+ }
+ }
+ }
+ return hadoopConfig;
+ }
+
+ // mirror certain keys to make use more uniform across implementations
+ // with different keys
+ private org.apache.hadoop.conf.Configuration mirrorCertainHadoopConfig(
+ org.apache.hadoop.conf.Configuration hadoopConfig) {
+ for (String[] mirrored : mirroredConfigKeys) {
+ String value = hadoopConfig.get(mirrored[0], null);
+ if (value != null) {
+ hadoopConfig.set(mirrored[1], value);
+ }
+ }
+ return hadoopConfig;
+ }
+
+ private String fixHadoopConfig(String key, String value) {
+ return key != null && configKeysToShade.contains(key) ?
+ shadeClassConfig(value) : value;
+ }
+
+ private String shadeClassConfig(String classConfig) {
+ return
packagePrefixesToShade.stream().anyMatch(classConfig::startsWith) ?
+ flinkShadingPrefix + classConfig : classConfig;
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index bd272e5701b..4ba49bbc2ba 100644
---
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -18,127 +18,77 @@
package org.apache.flink.fs.s3hadoop;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory implements FileSystemFactory {
-
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
private static final Logger LOG =
LoggerFactory.getLogger(S3FileSystemFactory.class);
- /** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'.
*/
- private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.",
"fs.s3a." };
+ private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+ new HashSet<>(Collections.singletonList("com.amazonaws."));
+
+ private static final Set<String> CONFIG_KEYS_TO_SHADE =
+ Collections.unmodifiableSet(new
HashSet<>(Collections.singleton("fs.s3a.aws.credentials.provider")));
+
+ private static final String FLINK_SHADING_PREFIX =
"org.apache.flink.fs.s3hadoop.shaded.";
+
+ private static final String[] FLINK_CONFIG_PREFIXES = { "s3.", "s3a.",
"fs.s3a." };
- /** Keys that are replaced (after prefix replacement, to give a more
uniform experience
- * across different file system implementations. */
private static final String[][] MIRRORED_CONFIG_KEYS = {
{ "fs.s3a.access-key", "fs.s3a.access.key" },
{ "fs.s3a.secret-key", "fs.s3a.secret.key" }
};
- /** Flink's configuration object. */
- private Configuration flinkConfig;
-
- /** Hadoop's configuration for the file systems, lazily initialized. */
- private org.apache.hadoop.conf.Configuration hadoopConfig;
+ public S3FileSystemFactory() {
+ super("Hadoop s3a file system", createHadoopConfigLoader());
+ }
@Override
public String getScheme() {
return "s3";
}
- @Override
- public void configure(Configuration config) {
- flinkConfig = config;
- hadoopConfig = null;
+ @VisibleForTesting
+ static HadoopConfigLoader createHadoopConfigLoader() {
+ return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES,
MIRRORED_CONFIG_KEYS,
+ "fs.s3a.", PACKAGE_PREFIXES_TO_SHADE,
CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
@Override
- public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating S3 file system (backed by a Hadoop s3a file
system)");
-
- try {
- // -- (1) get the loaded Hadoop config (or fall back to
one loaded from the classpath)
-
- org.apache.hadoop.conf.Configuration hadoopConfig =
this.hadoopConfig;
- if (hadoopConfig == null) {
- if (flinkConfig != null) {
- LOG.debug("Loading Hadoop configuration
for s3a file system");
- hadoopConfig =
HadoopUtils.getHadoopConfiguration(flinkConfig);
-
- // add additional config entries from
the Flink config to the Presto Hadoop config
- for (String key : flinkConfig.keySet())
{
- for (String prefix :
CONFIG_PREFIXES) {
- if
(key.startsWith(prefix)) {
- String value =
flinkConfig.getString(key, null);
- String newKey =
"fs.s3a." + key.substring(prefix.length());
-
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
-
LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
-
"S3A File System", key, newKey, value);
- }
- }
- }
-
- // mirror certain keys to make use more
uniform across s3 implementations
- // with different keys
- for (String[] mirrored :
MIRRORED_CONFIG_KEYS) {
- String value =
hadoopConfig.get(mirrored[0], null);
- if (value != null) {
-
hadoopConfig.set(mirrored[1], value);
- }
- }
-
- this.hadoopConfig = hadoopConfig;
- }
- else {
- LOG.warn("The factory has not been
configured prior to loading the S3 file system."
- + " Using Hadoop
configuration from the classpath.");
-
- hadoopConfig = new
org.apache.hadoop.conf.Configuration();
- this.hadoopConfig = hadoopConfig;
- }
- }
-
- // -- (2) Instantiate the Hadoop file system class for
that scheme
+ protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+ return new S3AFileSystem();
+ }
- final String scheme = fsUri.getScheme();
- final String authority = fsUri.getAuthority();
+ @Override
+ protected URI getInitURI(URI fsUri,
org.apache.hadoop.conf.Configuration hadoopConfig) {
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
- if (scheme == null && authority == null) {
- fsUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
- }
- else if (scheme != null && authority == null) {
- URI defaultUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
- if (scheme.equals(defaultUri.getScheme()) &&
defaultUri.getAuthority() != null) {
- fsUri = defaultUri;
- }
+ if (scheme == null && authority == null) {
+ fsUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ }
+ else if (scheme != null && authority == null) {
+ URI defaultUri =
org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig);
+ if (scheme.equals(defaultUri.getScheme()) &&
defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
}
+ }
- LOG.debug("Using scheme {} for s3a file system backing
the S3 File System", fsUri);
-
- final S3AFileSystem fs = new S3AFileSystem();
- fs.initialize(fsUri, hadoopConfig);
+ LOG.debug("Using scheme {} for s3a file system backing the S3
File System", fsUri);
- return new HadoopFileSystem(fs);
- }
- catch (IOException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
- }
+ return fsUri;
}
}
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
new file mode 100644
index 00000000000..647a93711b7
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for the S3 file system support via Hadoop's {@link
org.apache.hadoop.fs.s3a.S3AFileSystem}.
+ */
+public class HadoopS3FileSystemTest {
+ @Test
+ public void testShadingOfAwsCredProviderConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider");
+
+ HadoopConfigLoader configLoader =
S3FileSystemFactory.createHadoopConfigLoader();
+ configLoader.setFlinkConfig(conf);
+
+ org.apache.hadoop.conf.Configuration hadoopConfig =
configLoader.getOrLoadHadoopConfig();
+
assertEquals("org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+ hadoopConfig.get("fs.s3a.aws.credentials.provider"));
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index 8847dc9cc2d..a04f9c94a62 100644
---
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -18,126 +18,81 @@
package org.apache.flink.fs.s3presto;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
+import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* Simple factory for the S3 file system.
*/
-public class S3FileSystemFactory implements FileSystemFactory {
+public class S3FileSystemFactory extends AbstractFileSystemFactory {
+ private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
+ new HashSet<>(Collections.singletonList("com.amazonaws."));
- private static final Logger LOG =
LoggerFactory.getLogger(S3FileSystemFactory.class);
+ private static final Set<String> CONFIG_KEYS_TO_SHADE =
+ Collections.unmodifiableSet(new
HashSet<>(Collections.singleton("presto.s3.credentials-provider")));
- /** The prefixes that Flink adds to the Hadoop config under
'presto.s3.'. */
- private static final String[] CONFIG_PREFIXES = { "s3.", "presto.s3." };
+ private static final String FLINK_SHADING_PREFIX =
"org.apache.flink.fs.s3presto.shaded.";
+
+ private static final String[] FLINK_CONFIG_PREFIXES = { "s3.",
"presto.s3." };
- /** Keys that are replaced (after prefix replacement, to give a more
uniform experience
- * across different file system implementations. */
private static final String[][] MIRRORED_CONFIG_KEYS = {
{ "presto.s3.access.key", "presto.s3.access-key" },
{ "presto.s3.secret.key", "presto.s3.secret-key" }
};
- /** Flink's configuration object. */
- private Configuration flinkConfig;
-
- /** Hadoop's configuration for the file systems, lazily initialized. */
- private org.apache.hadoop.conf.Configuration hadoopConfig;
+ public S3FileSystemFactory() {
+ super("Presto S3 File System", createHadoopConfigLoader());
+ }
@Override
public String getScheme() {
return "s3";
}
+ @VisibleForTesting
+ static HadoopConfigLoader createHadoopConfigLoader() {
+ return new HadoopConfigLoader(FLINK_CONFIG_PREFIXES,
MIRRORED_CONFIG_KEYS,
+ "presto.s3.", PACKAGE_PREFIXES_TO_SHADE,
CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
+ }
+
@Override
- public void configure(Configuration config) {
- flinkConfig = config;
- hadoopConfig = null;
+ protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
+ return new PrestoS3FileSystem();
}
@Override
- public FileSystem create(URI fsUri) throws IOException {
- LOG.debug("Creating S3 file system (backed by a Hadoop s3a file
system");
+ protected URI getInitURI(URI fsUri,
org.apache.hadoop.conf.Configuration hadoopConfig) {
+ final String scheme = fsUri.getScheme();
+ final String authority = fsUri.getAuthority();
+ final URI initUri;
- try {
- // -- (1) get the loaded Hadoop config (or fall back to
one loaded from the classpath)
-
- org.apache.hadoop.conf.Configuration hadoopConfig =
this.hadoopConfig;
- if (hadoopConfig == null) {
- if (flinkConfig != null) {
- LOG.debug("Loading Hadoop configuration
for Presto S3 file system");
- hadoopConfig =
HadoopUtils.getHadoopConfiguration(flinkConfig);
-
- // add additional config entries from
the Flink config to the Presto Hadoop config
- for (String key : flinkConfig.keySet())
{
- for (String prefix :
CONFIG_PREFIXES) {
- if
(key.startsWith(prefix)) {
- String value =
flinkConfig.getString(key, null);
- String newKey =
"presto.s3." + key.substring(prefix.length());
-
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
-
-
LOG.debug("Adding Flink config entry for {} as {}={} to Hadoop config for " +
-
"Presto S3 File System", key, newKey, value);
- }
- }
- }
-
- // mirror certain keys to make use more
uniform across s3 implementations
- // with different keys
- for (String[] mirrored :
MIRRORED_CONFIG_KEYS) {
- String value =
hadoopConfig.get(mirrored[0], null);
- if (value != null) {
-
hadoopConfig.set(mirrored[1], value);
- }
- }
-
- this.hadoopConfig = hadoopConfig;
- }
- else {
- LOG.warn("The factory has not been
configured prior to loading the S3 file system."
- + " Using Hadoop
configuration from the classpath.");
-
- hadoopConfig = new
org.apache.hadoop.conf.Configuration();
- this.hadoopConfig = hadoopConfig;
- }
- }
-
- // -- (2) Instantiate the Presto file system class for
that scheme
-
- final String scheme = fsUri.getScheme();
- final String authority = fsUri.getAuthority();
- final URI initUri;
-
- if (scheme == null && authority == null) {
- initUri = new URI("s3://s3.amazonaws.com");
- }
- else if (scheme != null && authority == null) {
- initUri = new URI(scheme +
"://s3.amazonaws.com");
- }
- else {
- initUri = fsUri;
- }
-
- final PrestoS3FileSystem fs = new PrestoS3FileSystem();
- fs.initialize(initUri, hadoopConfig);
-
- return new HadoopFileSystem(fs);
+ if (scheme == null && authority == null) {
+ initUri = createURI("s3://s3.amazonaws.com");
+ }
+ else if (scheme != null && authority == null) {
+ initUri = createURI(scheme + "://s3.amazonaws.com");
}
- catch (IOException e) {
- throw e;
+ else {
+ initUri = fsUri;
}
- catch (Exception e) {
- throw new IOException(e.getMessage(), e);
+ return initUri;
+ }
+
+ private URI createURI(String str) {
+ try {
+ return new URI(str);
+ } catch (URISyntaxException e) {
+ throw new FlinkRuntimeException("Error in s3 aws URI -
" + str, e);
}
}
}
diff --git
a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 7e2d12aebb0..4eeb2d4512a 100644
---
a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -20,6 +20,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -31,6 +32,7 @@
import java.lang.reflect.Field;
import java.net.URI;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -75,6 +77,19 @@ public void testConfigPropagationAlternateStyle() throws
Exception{
validateBasicCredentials(fs);
}
+ @Test
+ public void testShadingOfAwsCredProviderConfig() {
+ final Configuration conf = new Configuration();
+ conf.setString("presto.s3.credentials-provider",
"com.amazonaws.auth.ContainerCredentialsProvider");
+
+ HadoopConfigLoader configLoader =
S3FileSystemFactory.createHadoopConfigLoader();
+ configLoader.setFlinkConfig(conf);
+
+ org.apache.hadoop.conf.Configuration hadoopConfig =
configLoader.getOrLoadHadoopConfig();
+
assertEquals("org.apache.flink.fs.s3presto.shaded.com.amazonaws.auth.ContainerCredentialsProvider",
+ hadoopConfig.get("presto.s3.credentials-provider"));
+ }
+
//
------------------------------------------------------------------------
// utilities
//
------------------------------------------------------------------------
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Flink shading to AWS credential provider s3 hadoop config
> -------------------------------------------------------------
>
> Key: FLINK-8439
> URL: https://issues.apache.org/jira/browse/FLINK-8439
> Project: Flink
> Issue Type: Improvement
> Components: Documentation
> Reporter: Dyana Rose
> Assignee: Andrey Zagrebin
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0
>
>
> This came up when using the s3 for the file system backend and running under
> ECS.
> With no credentials in the container, hadoop-aws will default to EC2 instance
> level credentials when accessing S3. However when running under ECS, you will
> generally want to default to the task definition's IAM role.
> In this case you need to set the hadoop property
> {code:java}
> fs.s3a.aws.credentials.provider{code}
> to a fully qualified class name(s). see [hadoop-aws
> docs|https://github.com/apache/hadoop/blob/1ba491ff907fc5d2618add980734a3534e2be098/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md]
> This works as expected when you add this setting to flink-conf.yaml but there
> is a further 'gotcha.' Because the AWS sdk is shaded, the actual full class
> name for, in this case, the ContainerCredentialsProvider is
> {code:java}
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
>
> meaning the full setting is:
> {code:java}
> fs.s3a.aws.credentials.provider:
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.auth.ContainerCredentialsProvider{code}
> If you instead set it to the unshaded class name you will see a very
> confusing error stating that the ContainerCredentialsProvider doesn't
> implement AWSCredentialsProvider (which it most certainly does.)
> Adding this information (how to specify alternate Credential Providers, and
> the name space gotcha) to the [AWS deployment
> docs|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
> would be useful to anyone else using S3.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)