HADOOP-15583. Stabilize S3A Assumed Role support. Contributed by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/da9a39ee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/da9a39ee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/da9a39ee Branch: refs/heads/HDFS-12090 Commit: da9a39eed138210de29b59b90c449b28da1c04f9 Parents: d81cd36 Author: Steve Loughran <[email protected]> Authored: Wed Aug 8 22:57:10 2018 -0700 Committer: Steve Loughran <[email protected]> Committed: Wed Aug 8 22:57:24 2018 -0700 ---------------------------------------------------------------------- .../src/main/resources/core-default.xml | 18 +- .../fs/s3a/AWSCredentialProviderList.java | 101 ++++++-- .../org/apache/hadoop/fs/s3a/Constants.java | 19 +- .../hadoop/fs/s3a/DefaultS3ClientFactory.java | 190 ++++---------- .../fs/s3a/InconsistentAmazonS3Client.java | 10 + .../fs/s3a/InconsistentS3ClientFactory.java | 11 + .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 35 ++- .../apache/hadoop/fs/s3a/S3ARetryPolicy.java | 4 +- .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 245 +++++++++++++++++-- .../apache/hadoop/fs/s3a/S3ClientFactory.java | 7 +- .../s3a/auth/AssumedRoleCredentialProvider.java | 78 +++++- .../fs/s3a/auth/NoAuthWithAWSException.java | 37 +++ .../apache/hadoop/fs/s3a/auth/RoleModel.java | 8 + .../apache/hadoop/fs/s3a/auth/RolePolicies.java | 143 +++++++++-- .../hadoop/fs/s3a/auth/STSClientFactory.java | 78 ++++++ .../fs/s3a/s3guard/DynamoDBClientFactory.java | 18 +- .../fs/s3a/s3guard/DynamoDBMetadataStore.java | 62 ++++- .../markdown/tools/hadoop-aws/assumed_roles.md | 191 +++++++++++---- .../src/site/markdown/tools/hadoop-aws/index.md | 6 +- .../hadoop/fs/s3a/ITestS3AConfiguration.java | 117 ++++----- .../fs/s3a/ITestS3ATemporaryCredentials.java | 71 +++--- .../fs/s3a/ITestS3GuardListConsistency.java | 68 +++-- .../hadoop/fs/s3a/ITestS3GuardWriteBack.java | 57 +++-- .../hadoop/fs/s3a/MockS3ClientFactory.java | 6 +- .../fs/s3a/TestS3AAWSCredentialsProvider.java | 76 +++++- .../hadoop/fs/s3a/auth/ITestAssumeRole.java | 151 ++++++++++-- .../auth/ITestAssumedRoleCommitOperations.java | 5 +- .../hadoop/fs/s3a/auth/RoleTestUtils.java | 24 +- .../s3guard/AbstractS3GuardToolTestBase.java | 7 +- .../s3a/s3guard/ITestS3GuardConcurrentOps.java | 147 ++++++----- 30 files changed, 1461 insertions(+), 529 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 75acf48..29c2bc2 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1033,7 +1033,19 @@ <name>fs.s3a.assumed.role.sts.endpoint</name> <value/> <description> - AWS Simple Token Service Endpoint. If unset, uses the default endpoint. + AWS Security Token Service Endpoint. + If unset, uses the default endpoint. + Only used if AssumedRoleCredentialProvider is the AWS credential provider. + </description> +</property> + +<property> + <name>fs.s3a.assumed.role.sts.endpoint.region</name> + <value>us-west-1</value> + <description> + AWS Security Token Service Endpoint's region; + Needed if fs.s3a.assumed.role.sts.endpoint points to an endpoint + other than the default one and the v4 signature is used. Only used if AssumedRoleCredentialProvider is the AWS credential provider. </description> </property> @@ -1058,7 +1070,9 @@ <property> <name>fs.s3a.connection.ssl.enabled</name> <value>true</value> - <description>Enables or disables SSL connections to S3.</description> + <description>Enables or disables SSL connections to AWS services. + Also sets the default port to use for the s3a proxy settings, + when not explicitly set in fs.s3a.proxy.port.</description> </property> <property> http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java index 10201f0..f9052fa 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSCredentialProviderList.java @@ -18,25 +18,29 @@ package org.apache.hadoop.fs.s3a; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AnonymousAWSCredentials; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - /** * A list of providers. * @@ -62,10 +66,18 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, public static final String NO_AWS_CREDENTIAL_PROVIDERS = "No AWS Credential Providers"; + static final String + CREDENTIALS_REQUESTED_WHEN_CLOSED + = "Credentials requested after provider list was closed"; + private final List<AWSCredentialsProvider> providers = new ArrayList<>(1); private boolean reuseLastProvider = true; private AWSCredentialsProvider lastProvider; + private final AtomicInteger refCount = new AtomicInteger(1); + + private final AtomicBoolean closed = new AtomicBoolean(false); + /** * Empty instance. This is not ready to be used. */ @@ -94,6 +106,9 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, */ @Override public void refresh() { + if (isClosed()) { + return; + } for (AWSCredentialsProvider provider : providers) { provider.refresh(); } @@ -106,6 +121,11 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, */ @Override public AWSCredentials getCredentials() { + if (isClosed()) { + LOG.warn(CREDENTIALS_REQUESTED_WHEN_CLOSED); + throw new NoAuthWithAWSException( + CREDENTIALS_REQUESTED_WHEN_CLOSED); + } checkNotEmpty(); if (reuseLastProvider && lastProvider != null) { return lastProvider.getCredentials(); @@ -136,8 +156,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, if (lastException != null) { message += ": " + lastException; } - throw new AmazonClientException(message, lastException); - + throw new NoAuthWithAWSException(message, lastException); } /** @@ -156,7 +175,7 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, */ public void checkNotEmpty() { if (providers.isEmpty()) { - throw new AmazonClientException(NO_AWS_CREDENTIAL_PROVIDERS); + throw new NoAuthWithAWSException(NO_AWS_CREDENTIAL_PROVIDERS); } } @@ -178,8 +197,38 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, */ @Override public String toString() { - return "AWSCredentialProviderList: " + - StringUtils.join(providers, " "); + return "AWSCredentialProviderList[" + + "refcount= " + refCount.get() + ": [" + + StringUtils.join(providers, ", ") + ']'; + } + + /** + * Get a reference to this object with an updated reference count. + * + * @return a reference to this + */ + public synchronized AWSCredentialProviderList share() { + Preconditions.checkState(!closed.get(), "Provider list is closed"); + refCount.incrementAndGet(); + return this; + } + + /** + * Get the current reference count. + * @return the current ref count + */ + @VisibleForTesting + public int getRefCount() { + return refCount.get(); + } + + /** + * Get the closed flag. + * @return true iff the list is closed. + */ + @VisibleForTesting + public boolean isClosed() { + return closed.get(); } /** @@ -190,9 +239,29 @@ public class AWSCredentialProviderList implements AWSCredentialsProvider, */ @Override public void close() { - for(AWSCredentialsProvider p: providers) { + synchronized (this) { + if (closed.get()) { + // already closed: no-op + return; + } + int remainder = refCount.decrementAndGet(); + if (remainder != 0) { + // still actively used, or somehow things are + // now negative + LOG.debug("Not closing {}", this); + return; + } + // at this point, the closing is going to happen + LOG.debug("Closing {}", this); + closed.set(true); + } + + // do this outside the synchronized block. + for (AWSCredentialsProvider p : providers) { if (p instanceof Closeable) { - IOUtils.closeStream((Closeable)p); + IOUtils.closeStream((Closeable) p); + } else if (p instanceof AutoCloseable) { + S3AUtils.closeAutocloseables(LOG, (AutoCloseable)p); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index c521936..a8da6ec 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -84,10 +84,27 @@ public final class Constants { public static final String ASSUMED_ROLE_SESSION_DURATION = "fs.s3a.assumed.role.session.duration"; - /** Simple Token Service Endpoint. If unset, uses the default endpoint. */ + /** Security Token Service Endpoint. If unset, uses the default endpoint. */ public static final String ASSUMED_ROLE_STS_ENDPOINT = "fs.s3a.assumed.role.sts.endpoint"; + /** + * Region for the STS endpoint; only relevant if the endpoint + * is set. + */ + public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION = + "fs.s3a.assumed.role.sts.endpoint.region"; + + /** + * Default value for the STS endpoint region; needed for + * v4 signing. + */ + public static final String ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT = + "us-west-1"; + + /** + * Default duration of an assumed role. + */ public static final String ASSUMED_ROLE_SESSION_DURATION_DEFAULT = "30m"; /** list of providers to authenticate for the assumed role. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java index f33b25e..ade317f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java @@ -18,59 +18,45 @@ package org.apache.hadoop.fs.s3a; +import java.io.IOException; +import java.net.URI; + import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.util.VersionInfo; import org.slf4j.Logger; -import java.io.IOException; -import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; -import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; -import static org.apache.hadoop.fs.s3a.S3AUtils.intOption; +import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT; +import static org.apache.hadoop.fs.s3a.Constants.PATH_STYLE_ACCESS; /** - * The default factory implementation, which calls the AWS SDK to configure - * and create an {@link AmazonS3Client} that communicates with the S3 service. + * The default {@link S3ClientFactory} implementation. + * This which calls the AWS SDK to configure and create an + * {@link AmazonS3Client} that communicates with the S3 service. */ -public class DefaultS3ClientFactory extends Configured implements - S3ClientFactory { +public class DefaultS3ClientFactory extends Configured + implements S3ClientFactory { protected static final Logger LOG = S3AFileSystem.LOG; @Override - public AmazonS3 createS3Client(URI name) throws IOException { + public AmazonS3 createS3Client(URI name, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Configuration conf = getConf(); - AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(name, conf); - final ClientConfiguration awsConf = createAwsConf(getConf()); - AmazonS3 s3 = newAmazonS3Client(credentials, awsConf); - return createAmazonS3Client(s3, conf, credentials, awsConf); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket); + return configureAmazonS3Client( + newAmazonS3Client(credentials, awsConf), conf); } /** - * Create a new {@link ClientConfiguration}. - * @param conf The Hadoop configuration - * @return new AWS client configuration - */ - public static ClientConfiguration createAwsConf(Configuration conf) { - final ClientConfiguration awsConf = new ClientConfiguration(); - initConnectionSettings(conf, awsConf); - initProxySupport(conf, awsConf); - initUserAgent(conf, awsConf); - return awsConf; - } - - /** - * Wrapper around constructor for {@link AmazonS3} client. Override this to - * provide an extended version of the client + * Wrapper around constructor for {@link AmazonS3} client. + * Override this to provide an extended version of the client * @param credentials credentials to use * @param awsConf AWS configuration * @return new AmazonS3 client @@ -81,120 +67,17 @@ public class DefaultS3ClientFactory extends Configured implements } /** - * Initializes all AWS SDK settings related to connection management. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initConnectionSettings(Configuration conf, - ClientConfiguration awsConf) { - awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, - DEFAULT_MAXIMUM_CONNECTIONS, 1)); - boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, - DEFAULT_SECURE_CONNECTIONS); - awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); - awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, - DEFAULT_MAX_ERROR_RETRIES, 0)); - awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, - DEFAULT_ESTABLISH_TIMEOUT, 0)); - awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, - DEFAULT_SOCKET_TIMEOUT, 0)); - int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, - DEFAULT_SOCKET_SEND_BUFFER, 2048); - int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, - DEFAULT_SOCKET_RECV_BUFFER, 2048); - awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); - String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); - if (!signerOverride.isEmpty()) { - LOG.debug("Signer override = {}", signerOverride); - awsConf.setSignerOverride(signerOverride); - } - } - - /** - * Initializes AWS SDK proxy support if configured. - * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - * @throws IllegalArgumentException if misconfigured - */ - private static void initProxySupport(Configuration conf, - ClientConfiguration awsConf) throws IllegalArgumentException { - String proxyHost = conf.getTrimmed(PROXY_HOST, ""); - int proxyPort = conf.getInt(PROXY_PORT, -1); - if (!proxyHost.isEmpty()) { - awsConf.setProxyHost(proxyHost); - if (proxyPort >= 0) { - awsConf.setProxyPort(proxyPort); - } else { - if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { - LOG.warn("Proxy host set without port. Using HTTPS default 443"); - awsConf.setProxyPort(443); - } else { - LOG.warn("Proxy host set without port. Using HTTP default 80"); - awsConf.setProxyPort(80); - } - } - String proxyUsername = conf.getTrimmed(PROXY_USERNAME); - String proxyPassword = conf.getTrimmed(PROXY_PASSWORD); - if ((proxyUsername == null) != (proxyPassword == null)) { - String msg = "Proxy error: " + PROXY_USERNAME + " or " + - PROXY_PASSWORD + " set without the other."; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - awsConf.setProxyUsername(proxyUsername); - awsConf.setProxyPassword(proxyPassword); - awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); - awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); - if (LOG.isDebugEnabled()) { - LOG.debug("Using proxy server {}:{} as user {} with password {} on " + - "domain {} as workstation {}", awsConf.getProxyHost(), - awsConf.getProxyPort(), - String.valueOf(awsConf.getProxyUsername()), - awsConf.getProxyPassword(), awsConf.getProxyDomain(), - awsConf.getProxyWorkstation()); - } - } else if (proxyPort >= 0) { - String msg = - "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; - LOG.error(msg); - throw new IllegalArgumentException(msg); - } - } - - /** - * Initializes the User-Agent header to send in HTTP requests to the S3 - * back-end. We always include the Hadoop version number. The user also - * may set an optional custom prefix to put in front of the Hadoop version - * number. The AWS SDK interally appends its own information, which seems - * to include the AWS SDK version, OS and JVM version. + * Configure S3 client from the Hadoop configuration. * - * @param conf Hadoop configuration - * @param awsConf AWS SDK configuration - */ - private static void initUserAgent(Configuration conf, - ClientConfiguration awsConf) { - String userAgent = "Hadoop " + VersionInfo.getVersion(); - String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); - if (!userAgentPrefix.isEmpty()) { - userAgent = userAgentPrefix + ", " + userAgent; - } - LOG.debug("Using User-Agent: {}", userAgent); - awsConf.setUserAgentPrefix(userAgent); - } - - /** - * Creates an {@link AmazonS3Client} from the established configuration. + * This includes: endpoint, Path Access and possibly other + * options. * * @param conf Hadoop configuration - * @param credentials AWS credentials - * @param awsConf AWS SDK configuration * @return S3 client * @throws IllegalArgumentException if misconfigured */ - private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf, - AWSCredentialsProvider credentials, ClientConfiguration awsConf) + private static AmazonS3 configureAmazonS3Client(AmazonS3 s3, + Configuration conf) throws IllegalArgumentException { String endPoint = conf.getTrimmed(ENDPOINT, ""); if (!endPoint.isEmpty()) { @@ -206,21 +89,29 @@ public class DefaultS3ClientFactory extends Configured implements throw new IllegalArgumentException(msg, e); } } - enablePathStyleAccessIfRequired(s3, conf); - return s3; + return applyS3ClientOptions(s3, conf); } /** - * Enables path-style access to S3 buckets if configured. By default, the + * Perform any tuning of the {@code S3ClientOptions} settings based on + * the Hadoop configuration. + * This is different from the general AWS configuration creation as + * it is unique to S3 connections. + * + * The {@link Constants#PATH_STYLE_ACCESS} option enables path-style access + * to S3 buckets if configured. By default, the * behavior is to use virtual hosted-style access with URIs of the form - * http://bucketname.s3.amazonaws.com. Enabling path-style access and a + * {@code http://bucketname.s3.amazonaws.com} + * Enabling path-style access and a * region-specific endpoint switches the behavior to use URIs of the form - * http://s3-eu-west-1.amazonaws.com/bucketname. - * + * {@code http://s3-eu-west-1.amazonaws.com/bucketname}. + * It is common to use this when connecting to private S3 servers, as it + * avoids the need to play with DNS entries. * @param s3 S3 client * @param conf Hadoop configuration + * @return the S3 client */ - private static void enablePathStyleAccessIfRequired(AmazonS3 s3, + private static AmazonS3 applyS3ClientOptions(AmazonS3 s3, Configuration conf) { final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false); if (pathStyleAccess) { @@ -229,5 +120,6 @@ public class DefaultS3ClientFactory extends Configured implements .setPathStyleAccess(true) .build()); } + return s3; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java index 99ed87d..2cd1aae 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -114,6 +114,16 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { /** Map of key to delay -> time it was created. */ private Map<String, Long> delayedPutKeys = new HashMap<>(); + /** + * Instantiate. + * This subclasses a deprecated constructor of the parent + * {@code AmazonS3Client} class; we can't use the builder API because, + * that only creates the consistent client. + * @param credentials credentials to auth. + * @param clientConfiguration connection settings + * @param conf hadoop configuration. + */ + @SuppressWarnings("deprecation") public InconsistentAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration clientConfiguration, Configuration conf) { super(credentials, clientConfiguration); http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java index 17d268b..932c472 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java @@ -21,16 +21,27 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; /** * S3 Client factory used for testing with eventual consistency fault injection. + * This client is for testing <i>only</i>; it is in the production + * {@code hadoop-aws} module to enable integration tests to use this + * just by editing the Hadoop configuration used to bring up the client. */ @InterfaceAudience.Private @InterfaceStability.Unstable public class InconsistentS3ClientFactory extends DefaultS3ClientFactory { + /** + * Create the inconsistent client. + * Logs a warning that this is being done. + * @param credentials credentials to use + * @param awsConf AWS configuration + * @return an inconsistent client. + */ @Override protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials, ClientConfiguration awsConf) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 737d7da..72a5fde 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -77,8 +77,9 @@ import com.amazonaws.event.ProgressListener; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -124,9 +125,6 @@ import static org.apache.hadoop.fs.s3a.Statistic.*; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.commons.lang3.StringUtils.isNotEmpty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * The core S3A Filesystem implementation. * @@ -205,6 +203,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { private boolean useListV1; private MagicCommitIntegration committerIntegration; + private AWSCredentialProviderList credentials; + /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -252,8 +252,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass( S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, S3ClientFactory.class); + + credentials = createAWSCredentialProviderSet(name, conf); s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) - .createS3Client(name); + .createS3Client(name, bucket, credentials); invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()), onRetry); @@ -2470,12 +2472,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { transfers.shutdownNow(true); transfers = null; } - if (metadataStore != null) { - metadataStore.close(); - metadataStore = null; - } - IOUtils.closeQuietly(instrumentation); + S3AUtils.closeAll(LOG, metadataStore, instrumentation); + metadataStore = null; instrumentation = null; + closeAutocloseables(LOG, credentials); + credentials = null; } } @@ -2885,6 +2886,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { } sb.append(", boundedExecutor=").append(boundedThreadPool); sb.append(", unboundedExecutor=").append(unboundedThreadPool); + sb.append(", credentials=").append(credentials); sb.append(", statistics {") .append(statistics) .append("}"); @@ -3319,4 +3321,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities { return false; } } + + /** + * Get a shared copy of the AWS credentials, with its reference + * counter updated. + * Caller is required to call {@code close()} on this after + * they have finished using it. + * @param purpose what is this for? This is initially for logging + * @return a reference to shared credentials. + */ + public AWSCredentialProviderList shareCredentials(final String purpose) { + LOG.debug("Sharing credentials for: {}", purpose); + return credentials.share(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java index 2b361fd..e6e7895 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java @@ -37,6 +37,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.net.ConnectTimeoutException; @@ -154,8 +155,9 @@ public class S3ARetryPolicy implements RetryPolicy { policyMap.put(InterruptedException.class, fail); // note this does not pick up subclasses (like socket timeout) policyMap.put(InterruptedIOException.class, fail); - // interesting question: should this be retried ever? + // Access denial and auth exceptions are not retried policyMap.put(AccessDeniedException.class, fail); + policyMap.put(NoAuthWithAWSException.class, fail); policyMap.put(FileNotFoundException.class, fail); policyMap.put(InvalidRequestException.class, fail); http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index a5f7d75..9908fd1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AbortedException; import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; import com.amazonaws.SdkBaseException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; @@ -44,15 +46,18 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.util.VersionInfo; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -174,11 +179,17 @@ public final class S3AUtils { // call considered an sign of connectivity failure return (EOFException)new EOFException(message).initCause(exception); } + if (exception instanceof NoAuthWithAWSException) { + // the exception raised by AWSCredentialProvider list if the + // credentials were not accepted. + return (AccessDeniedException)new AccessDeniedException(path, null, + exception.toString()).initCause(exception); + } return new AWSClientIOException(message, exception); } else { if (exception instanceof AmazonDynamoDBException) { // special handling for dynamo DB exceptions - return translateDynamoDBException(message, + return translateDynamoDBException(path, message, (AmazonDynamoDBException)exception); } IOException ioe; @@ -373,20 +384,45 @@ public final class S3AUtils { /** * Translate a DynamoDB exception into an IOException. + * + * @param path path in the DDB * @param message preformatted message for the exception - * @param ex exception + * @param ddbException exception * @return an exception to throw. */ - public static IOException translateDynamoDBException(String message, - AmazonDynamoDBException ex) { - if (isThrottleException(ex)) { - return new AWSServiceThrottledException(message, ex); + public static IOException translateDynamoDBException(final String path, + final String message, + final AmazonDynamoDBException ddbException) { + if (isThrottleException(ddbException)) { + return new AWSServiceThrottledException(message, ddbException); } - if (ex instanceof ResourceNotFoundException) { + if (ddbException instanceof ResourceNotFoundException) { return (FileNotFoundException) new FileNotFoundException(message) - .initCause(ex); + .initCause(ddbException); + } + final int statusCode = ddbException.getStatusCode(); + final String errorCode = ddbException.getErrorCode(); + IOException result = null; + // 400 gets used a lot by DDB + if (statusCode == 400) { + switch (errorCode) { + case "AccessDeniedException": + result = (IOException) new AccessDeniedException( + path, + null, + ddbException.toString()) + .initCause(ddbException); + break; + + default: + result = new AWSBadRequestException(message, ddbException); + } + } - return new AWSServiceIOException(message, ex); + if (result == null) { + result = new AWSServiceIOException(message, ddbException); + } + return result; } /** @@ -738,6 +774,29 @@ public final class S3AUtils { String baseKey, String overrideVal) throws IOException { + return lookupPassword(bucket, conf, baseKey, overrideVal, ""); + } + + /** + * Get a password from a configuration, including JCEKS files, handling both + * the absolute key and bucket override. + * @param bucket bucket or "" if none known + * @param conf configuration + * @param baseKey base key to look up, e.g "fs.s3a.secret.key" + * @param overrideVal override value: if non empty this is used instead of + * querying the configuration. + * @param defVal value to return if there is no password + * @return a password or the value of defVal. + * @throws IOException on any IO problem + * @throws IllegalArgumentException bad arguments + */ + public static String lookupPassword( + String bucket, + Configuration conf, + String baseKey, + String overrideVal, + String defVal) + throws IOException { String initialVal; Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX), "%s does not start with $%s", baseKey, FS_S3A_PREFIX); @@ -757,7 +816,7 @@ public final class S3AUtils { // no bucket, make the initial value the override value initialVal = overrideVal; } - return getPassword(conf, baseKey, initialVal); + return getPassword(conf, baseKey, initialVal, defVal); } /** @@ -1059,6 +1118,134 @@ public final class S3AUtils { } } + /** + * Create a new AWS {@code ClientConfiguration}. + * All clients to AWS services <i>MUST</i> use this for consistent setup + * of connectivity, UA, proxy settings. + * @param conf The Hadoop configuration + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @return new AWS client configuration + */ + public static ClientConfiguration createAwsConf(Configuration conf, + String bucket) + throws IOException { + final ClientConfiguration awsConf = new ClientConfiguration(); + initConnectionSettings(conf, awsConf); + initProxySupport(conf, bucket, awsConf); + initUserAgent(conf, awsConf); + return awsConf; + } + + /** + * Initializes all AWS SDK settings related to connection management. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration + */ + public static void initConnectionSettings(Configuration conf, + ClientConfiguration awsConf) { + awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS, + DEFAULT_MAXIMUM_CONNECTIONS, 1)); + boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS, + DEFAULT_SECURE_CONNECTIONS); + awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP); + awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES, + DEFAULT_MAX_ERROR_RETRIES, 0)); + awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT, + DEFAULT_ESTABLISH_TIMEOUT, 0)); + awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT, + DEFAULT_SOCKET_TIMEOUT, 0)); + int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER, + DEFAULT_SOCKET_SEND_BUFFER, 2048); + int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER, + DEFAULT_SOCKET_RECV_BUFFER, 2048); + awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer); + String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, ""); + if (!signerOverride.isEmpty()) { + LOG.debug("Signer override = {}", signerOverride); + awsConf.setSignerOverride(signerOverride); + } + } + + /** + * Initializes AWS SDK proxy support in the AWS client configuration + * if the S3A settings enable it. + * + * @param conf Hadoop configuration + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param awsConf AWS SDK configuration to update + * @throws IllegalArgumentException if misconfigured + * @throws IOException problem getting username/secret from password source. + */ + public static void initProxySupport(Configuration conf, + String bucket, + ClientConfiguration awsConf) throws IllegalArgumentException, + IOException { + String proxyHost = conf.getTrimmed(PROXY_HOST, ""); + int proxyPort = conf.getInt(PROXY_PORT, -1); + if (!proxyHost.isEmpty()) { + awsConf.setProxyHost(proxyHost); + if (proxyPort >= 0) { + awsConf.setProxyPort(proxyPort); + } else { + if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) { + LOG.warn("Proxy host set without port. Using HTTPS default 443"); + awsConf.setProxyPort(443); + } else { + LOG.warn("Proxy host set without port. Using HTTP default 80"); + awsConf.setProxyPort(80); + } + } + final String proxyUsername = lookupPassword(bucket, conf, PROXY_USERNAME, + null, null); + final String proxyPassword = lookupPassword(bucket, conf, PROXY_PASSWORD, + null, null); + if ((proxyUsername == null) != (proxyPassword == null)) { + String msg = "Proxy error: " + PROXY_USERNAME + " or " + + PROXY_PASSWORD + " set without the other."; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + awsConf.setProxyUsername(proxyUsername); + awsConf.setProxyPassword(proxyPassword); + awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN)); + awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION)); + if (LOG.isDebugEnabled()) { + LOG.debug("Using proxy server {}:{} as user {} with password {} on " + + "domain {} as workstation {}", awsConf.getProxyHost(), + awsConf.getProxyPort(), + String.valueOf(awsConf.getProxyUsername()), + awsConf.getProxyPassword(), awsConf.getProxyDomain(), + awsConf.getProxyWorkstation()); + } + } else if (proxyPort >= 0) { + String msg = + "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST; + LOG.error(msg); + throw new IllegalArgumentException(msg); + } + } + + /** + * Initializes the User-Agent header to send in HTTP requests to AWS + * services. We always include the Hadoop version number. The user also + * may set an optional custom prefix to put in front of the Hadoop version + * number. The AWS SDK internally appends its own information, which seems + * to include the AWS SDK version, OS and JVM version. + * + * @param conf Hadoop configuration + * @param awsConf AWS SDK configuration to update + */ + private static void initUserAgent(Configuration conf, + ClientConfiguration awsConf) { + String userAgent = "Hadoop " + VersionInfo.getVersion(); + String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, ""); + if (!userAgentPrefix.isEmpty()) { + userAgent = userAgentPrefix + ", " + userAgent; + } + LOG.debug("Using User-Agent: {}", userAgent); + awsConf.setUserAgentPrefix(userAgent); + } /** * An interface for use in lambda-expressions working with @@ -1289,18 +1476,40 @@ public final class S3AUtils { * @param closeables the objects to close */ public static void closeAll(Logger log, - java.io.Closeable... closeables) { - for (java.io.Closeable c : closeables) { + Closeable... closeables) { + if (log == null) { + log = LOG; + } + for (Closeable c : closeables) { if (c != null) { try { - if (log != null) { - log.debug("Closing {}", c); - } + log.debug("Closing {}", c); c.close(); } catch (Exception e) { - if (log != null && log.isDebugEnabled()) { - log.debug("Exception in closing {}", c, e); - } + log.debug("Exception in closing {}", c, e); + } + } + } + } + /** + * Close the Closeable objects and <b>ignore</b> any Exception or + * null pointers. + * (This is the SLF4J equivalent of that in {@code IOUtils}). + * @param log the log to log at debug level. Can be null. + * @param closeables the objects to close + */ + public static void closeAutocloseables(Logger log, + AutoCloseable... closeables) { + if (log == null) { + log = LOG; + } + for (AutoCloseable c : closeables) { + if (c != null) { + try { + log.debug("Closing {}", c); + c.close(); + } catch (Exception e) { + log.debug("Exception in closing {}", c, e); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java index 9abb362..b237e85 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java @@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a; import java.io.IOException; import java.net.URI; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.classification.InterfaceAudience; @@ -37,9 +38,13 @@ public interface S3ClientFactory { * Creates a new {@link AmazonS3} client. * * @param name raw input S3A file system URI + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentialSet credentials to use * @return S3 client * @throws IOException IO problem */ - AmazonS3 createS3Client(URI name) throws IOException; + AmazonS3 createS3Client(URI name, + final String bucket, + final AWSCredentialsProvider credentialSet) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java index fdaf9bd..e5a3639 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/AssumedRoleCredentialProvider.java @@ -24,9 +24,11 @@ import java.net.URI; import java.util.Locale; import java.util.concurrent.TimeUnit; +import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -37,6 +39,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.Invoker; +import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider; import org.apache.hadoop.security.UserGroupInformation; @@ -77,17 +82,21 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, private final String arn; + private final AWSCredentialProviderList credentialsToSTS; + + private final Invoker invoker; + /** * Instantiate. * This calls {@link #getCredentials()} to fail fast on the inner * role credential retrieval. - * @param uri URI of endpoint. + * @param fsUri URI of the filesystem. * @param conf configuration * @throws IOException on IO problems and some parameter checking * @throws IllegalArgumentException invalid parameters * @throws AWSSecurityTokenServiceException problems getting credentials */ - public AssumedRoleCredentialProvider(URI uri, Configuration conf) + public AssumedRoleCredentialProvider(URI fsUri, Configuration conf) throws IOException { arn = conf.getTrimmed(ASSUMED_ROLE_ARN, ""); @@ -99,13 +108,14 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, Class<?>[] awsClasses = loadAWSProviderClasses(conf, ASSUMED_ROLE_CREDENTIALS_PROVIDER, SimpleAWSCredentialsProvider.class); - AWSCredentialProviderList credentials = new AWSCredentialProviderList(); + credentialsToSTS = new AWSCredentialProviderList(); for (Class<?> aClass : awsClasses) { if (this.getClass().equals(aClass)) { throw new IOException(E_FORBIDDEN_PROVIDER); } - credentials.add(createAWSCredentialProvider(conf, aClass, uri)); + credentialsToSTS.add(createAWSCredentialProvider(conf, aClass, fsUri)); } + LOG.debug("Credentials to obtain role credentials: {}", credentialsToSTS); // then the STS binding sessionName = conf.getTrimmed(ASSUMED_ROLE_SESSION_NAME, @@ -122,14 +132,27 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, LOG.debug("Scope down policy {}", policy); builder.withScopeDownPolicy(policy); } - String epr = conf.get(ASSUMED_ROLE_STS_ENDPOINT, ""); - if (StringUtils.isNotEmpty(epr)) { - LOG.debug("STS Endpoint: {}", epr); - builder.withServiceEndpoint(epr); - } - LOG.debug("Credentials to obtain role credentials: {}", credentials); - builder.withLongLivedCredentialsProvider(credentials); + String endpoint = conf.get(ASSUMED_ROLE_STS_ENDPOINT, ""); + String region = conf.get(ASSUMED_ROLE_STS_ENDPOINT_REGION, + ASSUMED_ROLE_STS_ENDPOINT_REGION_DEFAULT); + AWSSecurityTokenServiceClientBuilder stsbuilder = + STSClientFactory.builder( + conf, + fsUri.getHost(), + credentialsToSTS, + endpoint, + region); + // the STS client is not tracked for a shutdown in close(), because it + // (currently) throws an UnsupportedOperationException in shutdown(). + builder.withStsClient(stsbuilder.build()); + + //now build the provider stsProvider = builder.build(); + + // to handle STS throttling by the AWS account, we + // need to retry + invoker = new Invoker(new S3ARetryPolicy(conf), this::operationRetried); + // and force in a fail-fast check just to keep the stack traces less // convoluted getCredentials(); @@ -143,7 +166,17 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, @Override public AWSCredentials getCredentials() { try { - return stsProvider.getCredentials(); + return invoker.retryUntranslated("getCredentials", + true, + stsProvider::getCredentials); + } catch (IOException e) { + // this is in the signature of retryUntranslated; + // its hard to see how this could be raised, but for + // completeness, it is wrapped as an Amazon Client Exception + // and rethrown. + throw new AmazonClientException( + "getCredentials failed: " + e, + e); } catch (AWSSecurityTokenServiceException e) { LOG.error("Failed to get credentials for role {}", arn, e); @@ -161,7 +194,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, */ @Override public void close() { - stsProvider.close(); + S3AUtils.closeAutocloseables(LOG, stsProvider, credentialsToSTS); } @Override @@ -205,4 +238,23 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider, return r.toString(); } + /** + * Callback from {@link Invoker} when an operation is retried. + * @param text text of the operation + * @param ex exception + * @param retries number of retries + * @param idempotent is the method idempotent + */ + public void operationRetried( + String text, + Exception ex, + int retries, + boolean idempotent) { + if (retries == 0) { + // log on the first retry attempt of the credential access. + // At worst, this means one log entry every intermittent renewal + // time. + LOG.info("Retried {}", text); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java new file mode 100644 index 0000000..f48e17a --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/NoAuthWithAWSException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import com.amazonaws.AmazonClientException; + +/** + * A specific subclass of {@code AmazonClientException} which can + * be used in the retry logic to fail fast when there is any + * authentication problem. + */ +public class NoAuthWithAWSException extends AmazonClientException { + + public NoAuthWithAWSException(final String message, final Throwable t) { + super(message, t); + } + + public NoAuthWithAWSException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java index ca2c993..d4568b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RoleModel.java @@ -205,6 +205,14 @@ public class RoleModel { return new Policy(statements); } + /** + * From a set of statements, create a policy. + * @param statements statements + * @return the policy + */ + public static Policy policy(final List<RoleModel.Statement> statements) { + return new Policy(statements); + } /** * Effect options. http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java index 6711eee..34ed295 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/RolePolicies.java @@ -29,6 +29,55 @@ public final class RolePolicies { private RolePolicies() { } + /** All KMS operations: {@value}.*/ + public static final String KMS_ALL_OPERATIONS = "kms:*"; + + /** KMS encryption. This is <i>Not</i> used by SSE-KMS: {@value}. */ + public static final String KMS_ENCRYPT = "kms:Encrypt"; + + /** + * Decrypt data encrypted with SSE-KMS: {@value}. + */ + public static final String KMS_DECRYPT = "kms:Decrypt"; + + /** + * Arn for all KMS keys: {@value}. + */ + public static final String KMS_ALL_KEYS = "arn:aws:kms:*"; + + /** + * This is used by S3 to generate a per-object encryption key and + * the encrypted value of this, the latter being what it tags + * the object with for later decryption: {@value}. + */ + public static final String KMS_GENERATE_DATA_KEY = "kms:GenerateDataKey"; + + /** + * Actions needed to read and write SSE-KMS data. + */ + private static final String[] KMS_KEY_RW = + new String[]{KMS_DECRYPT, KMS_GENERATE_DATA_KEY}; + + /** + * Actions needed to read SSE-KMS data. + */ + private static final String[] KMS_KEY_READ = + new String[] {KMS_DECRYPT}; + + /** + * Statement to allow KMS R/W access access, so full use of + * SSE-KMS. + */ + public static final Statement STATEMENT_ALLOW_SSE_KMS_RW = + statement(true, KMS_ALL_KEYS, KMS_KEY_RW); + + /** + * Statement to allow read access to KMS keys, so the ability + * to read SSE-KMS data,, but not decrypt it. + */ + public static final Statement STATEMENT_ALLOW_SSE_KMS_READ = + statement(true, KMS_ALL_KEYS, KMS_KEY_READ); + /** * All S3 operations: {@value}. */ @@ -52,7 +101,6 @@ public final class RolePolicies { public static final String S3_LIST_BUCKET_MULTPART_UPLOADS = "s3:ListBucketMultipartUploads"; - /** * List multipart upload is needed for the S3A Commit protocols. */ @@ -97,6 +145,8 @@ public final class RolePolicies { public static final String S3_GET_OBJECT_VERSION = "s3:GetObjectVersion"; + public static final String S3_GET_BUCKET_LOCATION = "s3:GetBucketLocation"; + public static final String S3_GET_OBJECT_VERSION_ACL = "s3:GetObjectVersionAcl"; @@ -128,7 +178,8 @@ public final class RolePolicies { public static final String S3_RESTORE_OBJECT = "s3:RestoreObject"; /** - * Actions needed to read data from S3 through S3A. + * Actions needed to read a file in S3 through S3A, excluding + * S3Guard and SSE-KMS. */ public static final String[] S3_PATH_READ_OPERATIONS = new String[]{ @@ -136,18 +187,20 @@ public final class RolePolicies { }; /** - * Actions needed to read data from S3 through S3A. + * Base actions needed to read data from S3 through S3A, + * excluding SSE-KMS data and S3Guard-ed buckets. */ public static final String[] S3_ROOT_READ_OPERATIONS = new String[]{ S3_LIST_BUCKET, S3_LIST_BUCKET_MULTPART_UPLOADS, - S3_GET_OBJECT, + S3_ALL_GET, }; /** * Actions needed to write data to an S3A Path. - * This includes the appropriate read operations. + * This includes the appropriate read operations, but + * not SSE-KMS or S3Guard support. */ public static final String[] S3_PATH_RW_OPERATIONS = new String[]{ @@ -163,6 +216,7 @@ public final class RolePolicies { * This is purely the extra operations needed for writing atop * of the read operation set. * Deny these and a path is still readable, but not writeable. + * Excludes: SSE-KMS and S3Guard permissions. */ public static final String[] S3_PATH_WRITE_OPERATIONS = new String[]{ @@ -173,6 +227,7 @@ public final class RolePolicies { /** * Actions needed for R/W IO from the root of a bucket. + * Excludes: SSE-KMS and S3Guard permissions. */ public static final String[] S3_ROOT_RW_OPERATIONS = new String[]{ @@ -190,26 +245,57 @@ public final class RolePolicies { */ public static final String DDB_ALL_OPERATIONS = "dynamodb:*"; - public static final String DDB_ADMIN = "dynamodb:*"; + /** + * Operations needed for DDB/S3Guard Admin. + * For now: make this {@link #DDB_ALL_OPERATIONS}. + */ + public static final String DDB_ADMIN = DDB_ALL_OPERATIONS; + /** + * Permission for DDB describeTable() operation: {@value}. + * This is used during initialization. + */ + public static final String DDB_DESCRIBE_TABLE = "dynamodb:DescribeTable"; - public static final String DDB_BATCH_WRITE = "dynamodb:BatchWriteItem"; + /** + * Permission to query the DDB table: {@value}. + */ + public static final String DDB_QUERY = "dynamodb:Query"; /** - * All DynamoDB tables: {@value}. + * Permission for DDB operation to get a record: {@value}. */ - public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:::*"; + public static final String DDB_GET_ITEM = "dynamodb:GetItem"; + /** + * Permission for DDB write record operation: {@value}. + */ + public static final String DDB_PUT_ITEM = "dynamodb:PutItem"; + /** + * Permission for DDB update single item operation: {@value}. + */ + public static final String DDB_UPDATE_ITEM = "dynamodb:UpdateItem"; - public static final String WILDCARD = "*"; + /** + * Permission for DDB delete operation: {@value}. + */ + public static final String DDB_DELETE_ITEM = "dynamodb:DeleteItem"; /** - * Allow all S3 Operations. + * Permission for DDB operation: {@value}. */ - public static final Statement STATEMENT_ALL_S3 = statement(true, - S3_ALL_BUCKETS, - S3_ALL_OPERATIONS); + public static final String DDB_BATCH_GET_ITEM = "dynamodb:BatchGetItem"; + + /** + * Batch write permission for DDB: {@value}. + */ + public static final String DDB_BATCH_WRITE_ITEM = "dynamodb:BatchWriteItem"; + + /** + * All DynamoDB tables: {@value}. + */ + public static final String ALL_DDB_TABLES = "arn:aws:dynamodb:*"; /** * Statement to allow all DDB access. @@ -218,11 +304,36 @@ public final class RolePolicies { ALL_DDB_TABLES, DDB_ALL_OPERATIONS); /** - * Allow all S3 and S3Guard operations. + * Statement to allow all client operations needed for S3Guard, + * but none of the admin operations. + */ + public static final Statement STATEMENT_S3GUARD_CLIENT = statement(true, + ALL_DDB_TABLES, + DDB_BATCH_GET_ITEM, + DDB_BATCH_WRITE_ITEM, + DDB_DELETE_ITEM, + DDB_DESCRIBE_TABLE, + DDB_GET_ITEM, + DDB_PUT_ITEM, + DDB_QUERY, + DDB_UPDATE_ITEM + ); + + /** + * Allow all S3 Operations. + * This does not cover DDB or S3-KMS + */ + public static final Statement STATEMENT_ALL_S3 = statement(true, + S3_ALL_BUCKETS, + S3_ALL_OPERATIONS); + + /** + * Policy for all S3 and S3Guard operations, and SSE-KMS. */ public static final Policy ALLOW_S3_AND_SGUARD = policy( STATEMENT_ALL_S3, - STATEMENT_ALL_DDB + STATEMENT_ALL_DDB, + STATEMENT_ALLOW_SSE_KMS_RW ); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java new file mode 100644 index 0000000..10bf88c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/auth/STSClientFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import java.io.IOException; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.S3AUtils; + +/** + * Factory for creating STS Clients. + */ [email protected] [email protected] +public class STSClientFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(STSClientFactory.class); + + /** + * Create the builder ready for any final configuration options. + * Picks up connection settings from the Hadoop configuration, including + * proxy secrets. + * @param conf Configuration to act as source of options. + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials AWS credential chain to use + * @param stsEndpoint optional endpoint "https://sns.us-west-1.amazonaws.com" + * @param stsRegion the region, e.g "us-west-1" + * @return the builder to call {@code build()} + * @throws IOException problem reading proxy secrets + */ + public static AWSSecurityTokenServiceClientBuilder builder( + final Configuration conf, + final String bucket, + final AWSCredentialsProvider credentials, final String stsEndpoint, + final String stsRegion) throws IOException { + Preconditions.checkArgument(credentials != null, "No credentials"); + final AWSSecurityTokenServiceClientBuilder builder + = AWSSecurityTokenServiceClientBuilder.standard(); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket); + builder.withClientConfiguration(awsConf); + builder.withCredentials(credentials); + if (StringUtils.isNotEmpty(stsEndpoint)) { + LOG.debug("STS Endpoint ={}", stsEndpoint); + builder.withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(stsEndpoint, stsRegion)); + } + return builder; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java index 91e64cd..9e1d2f4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java @@ -34,10 +34,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory; +import org.apache.hadoop.fs.s3a.S3AUtils; import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY; -import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet; /** * Interface to create a DynamoDB client. @@ -58,10 +57,14 @@ public interface DynamoDBClientFactory extends Configurable { * it will indicate an error. * * @param defaultRegion the default region of the AmazonDynamoDB client + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials credentials to use for authentication. * @return a new DynamoDB client * @throws IOException if any IO error happens */ - AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException; + AmazonDynamoDB createDynamoDBClient(final String defaultRegion, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException; /** * The default implementation for creating an AmazonDynamoDB. @@ -69,16 +72,15 @@ public interface DynamoDBClientFactory extends Configurable { class DefaultDynamoDBClientFactory extends Configured implements DynamoDBClientFactory { @Override - public AmazonDynamoDB createDynamoDBClient(String defaultRegion) + public AmazonDynamoDB createDynamoDBClient(String defaultRegion, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Preconditions.checkNotNull(getConf(), "Should have been configured before usage"); final Configuration conf = getConf(); - final AWSCredentialsProvider credentials = - createAWSCredentialProviderSet(null, conf); - final ClientConfiguration awsConf = - DefaultS3ClientFactory.createAwsConf(conf); + final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket); final String region = getRegion(conf, defaultRegion); LOG.debug("Creating DynamoDB client in region {}", region); http://git-wip-us.apache.org/repos/asf/hadoop/blob/da9a39ee/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java index 43849b1..ba80b88 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.net.URI; +import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; import com.amazonaws.services.dynamodbv2.document.DynamoDB; @@ -67,6 +69,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSCredentialProviderList; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.Invoker; import org.apache.hadoop.fs.s3a.Retries; @@ -75,13 +78,14 @@ import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ARetryPolicy; import org.apache.hadoop.fs.s3a.S3AUtils; import org.apache.hadoop.fs.s3a.Tristate; +import org.apache.hadoop.fs.s3a.auth.RolePolicies; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import static org.apache.hadoop.fs.s3a.Constants.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*; import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*; @@ -207,6 +211,7 @@ public class DynamoDBMetadataStore implements MetadataStore { new ValueMap().withBoolean(":false", false); private DynamoDB dynamoDB; + private AWSCredentialProviderList credentials; private String region; private Table table; private String tableName; @@ -242,10 +247,16 @@ public class DynamoDBMetadataStore implements MetadataStore { * A utility function to create DynamoDB instance. * @param conf the file system configuration * @param s3Region region of the associated S3 bucket (if any). + * @param bucket Optional bucket to use to look up per-bucket proxy secrets + * @param credentials credentials. * @return DynamoDB instance. * @throws IOException I/O error. */ - private static DynamoDB createDynamoDB(Configuration conf, String s3Region) + private static DynamoDB createDynamoDB( + final Configuration conf, + final String s3Region, + final String bucket, + final AWSCredentialsProvider credentials) throws IOException { Preconditions.checkNotNull(conf); final Class<? extends DynamoDBClientFactory> cls = conf.getClass( @@ -254,10 +265,18 @@ public class DynamoDBMetadataStore implements MetadataStore { DynamoDBClientFactory.class); LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region); final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf) - .createDynamoDBClient(s3Region); + .createDynamoDBClient(s3Region, bucket, credentials); return new DynamoDB(dynamoDBClient); } + /** + * {@inheritDoc}. + * The credentials for authenticating with S3 are requested from the + * FS via {@link S3AFileSystem#shareCredentials(String)}; this will + * increment the reference counter of these credentials. + * @param fs {@code S3AFileSystem} associated with the MetadataStore + * @throws IOException on a failure + */ @Override @Retries.OnceRaw public void initialize(FileSystem fs) throws IOException { @@ -274,11 +293,23 @@ public class DynamoDBMetadataStore implements MetadataStore { LOG.debug("Overriding S3 region with configured DynamoDB region: {}", region); } else { - region = owner.getBucketLocation(); + try { + region = owner.getBucketLocation(); + } catch (AccessDeniedException e) { + // access denied here == can't call getBucket. Report meaningfully + URI uri = owner.getUri(); + LOG.error("Failed to get bucket location from S3 bucket {}", + uri); + throw (IOException)new AccessDeniedException( + "S3 client role lacks permission " + + RolePolicies.S3_GET_BUCKET_LOCATION + " for " + uri) + .initCause(e); + } LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region); } username = owner.getUsername(); - dynamoDB = createDynamoDB(conf, region); + credentials = owner.shareCredentials("s3guard"); + dynamoDB = createDynamoDB(conf, region, bucket, credentials); // use the bucket as the DynamoDB table name if not specified in config tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket); @@ -311,6 +342,9 @@ public class DynamoDBMetadataStore implements MetadataStore { * must declare the table name and region in the * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively. + * It also creates a new credential provider list from the configuration, + * using the base fs.s3a.* options, as there is no bucket to infer per-bucket + * settings from. * * @see #initialize(FileSystem) * @throws IOException if there is an error @@ -327,7 +361,8 @@ public class DynamoDBMetadataStore implements MetadataStore { region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY); Preconditions.checkArgument(!StringUtils.isEmpty(region), "No DynamoDB region configured"); - dynamoDB = createDynamoDB(conf, region); + credentials = createAWSCredentialProviderSet(null, conf); + dynamoDB = createDynamoDB(conf, region, null, credentials); username = UserGroupInformation.getCurrentUser().getShortUserName(); initDataAccessRetries(conf); @@ -778,12 +813,17 @@ public class DynamoDBMetadataStore implements MetadataStore { if (instrumentation != null) { instrumentation.storeClosed(); } - if (dynamoDB != null) { - LOG.debug("Shutting down {}", this); - dynamoDB.shutdown(); - dynamoDB = null; + try { + if (dynamoDB != null) { + LOG.debug("Shutting down {}", this); + dynamoDB.shutdown(); + dynamoDB = null; + } + } finally { + closeAutocloseables(LOG, credentials); + credentials = null; } - } +} @Override @Retries.OnceTranslated --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
