HADOOP-15660. ABFS: Add support for OAuth Contributed by Da Zhou, Rajeev Bansal, and Junhua Gu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9149b970 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9149b970 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9149b970 Branch: refs/heads/HADOOP-15407 Commit: 9149b9703e3ab09abdc087db129e82ad3f4cefa1 Parents: d6a4f39 Author: Thomas Marquardt <tm...@microsoft.com> Authored: Sat Aug 18 18:53:32 2018 +0000 Committer: Thomas Marquardt <tm...@microsoft.com> Committed: Mon Sep 17 19:54:01 2018 +0000 ---------------------------------------------------------------------- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 149 ++++++-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 26 +- .../azurebfs/constants/ConfigurationKeys.java | 19 + .../TokenAccessProviderException.java | 36 ++ .../services/AzureServiceErrorCode.java | 1 + .../services/ListResultEntrySchema.java | 89 ++++- .../fs/azurebfs/oauth2/AccessTokenProvider.java | 98 ++++++ .../azurebfs/oauth2/AzureADAuthenticator.java | 344 +++++++++++++++++++ .../hadoop/fs/azurebfs/oauth2/AzureADToken.java | 47 +++ .../oauth2/ClientCredsTokenProvider.java | 62 ++++ .../oauth2/CustomTokenProviderAdaptee.java | 75 ++++ .../oauth2/CustomTokenProviderAdapter.java | 57 +++ .../fs/azurebfs/oauth2/MsiTokenProvider.java | 48 +++ .../hadoop/fs/azurebfs/oauth2/QueryParams.java | 69 ++++ .../oauth2/RefreshTokenBasedTokenProvider.java | 57 +++ .../oauth2/UserPasswordTokenProvider.java | 66 ++++ .../hadoop/fs/azurebfs/oauth2/package-info.java | 18 + .../hadoop/fs/azurebfs/services/AbfsClient.java | 18 +- .../fs/azurebfs/services/AbfsHttpHeader.java | 2 +- .../fs/azurebfs/services/AbfsRestOperation.java | 19 +- .../hadoop/fs/azurebfs/services/AuthType.java | 27 ++ .../azurebfs/AbstractAbfsIntegrationTest.java | 35 +- .../hadoop/fs/azurebfs/ITestAbfsClient.java | 2 +- .../ITestAzureBlobFileSystemBackCompat.java | 4 + .../ITestAzureBlobFileSystemFileStatus.java | 3 - .../ITestAzureBlobFileSystemFinalize.java | 8 +- .../azurebfs/ITestAzureBlobFileSystemFlush.java | 8 +- .../azurebfs/ITestAzureBlobFileSystemOauth.java | 176 ++++++++++ .../ITestAzureBlobFileSystemRandomRead.java | 3 + .../azurebfs/ITestFileSystemInitialization.java | 5 +- .../azurebfs/ITestFileSystemRegistration.java | 11 +- .../fs/azurebfs/ITestWasbAbfsCompatibility.java | 2 + .../constants/TestConfigurationKeys.java | 6 + .../contract/ABFSContractTestBinding.java | 14 +- .../ITestAbfsFileSystemContractAppend.java | 19 +- .../ITestAbfsFileSystemContractConcat.java | 17 +- .../ITestAbfsFileSystemContractCreate.java | 17 +- .../ITestAbfsFileSystemContractDelete.java | 17 +- .../ITestAbfsFileSystemContractDistCp.java | 2 +- ...TestAbfsFileSystemContractGetFileStatus.java | 17 +- .../ITestAbfsFileSystemContractMkdir.java | 17 +- .../ITestAbfsFileSystemContractOpen.java | 17 +- .../ITestAbfsFileSystemContractRename.java | 17 +- ...TestAbfsFileSystemContractRootDirectory.java | 16 +- ...ITestAbfsFileSystemContractSecureDistCp.java | 2 +- .../ITestAbfsFileSystemContractSeek.java | 17 +- .../ITestAbfsFileSystemContractSetTimes.java | 17 +- .../ITestAzureBlobFileSystemBasics.java | 2 +- .../fs/azurebfs/services/TestAbfsClient.java | 6 +- .../fs/azurebfs/services/TestQueryParams.java | 72 ++++ .../utils/CleanUpAbfsTestContainer.java | 13 +- .../src/test/resources/azure-bfs-test.xml | 128 ++++++- 52 files changed, 1768 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e647ae8..f26f562 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; import java.lang.reflect.Field; import java.util.Map; @@ -26,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; @@ -37,16 +37,26 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException; import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdaptee; +import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter; +import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.KeyProvider; import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider; import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx; +import org.apache.hadoop.util.ReflectionUtils; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE; /** @@ -58,81 +68,81 @@ public class AbfsConfiguration{ private final Configuration configuration; private final boolean isSecure; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE, MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) private int writeBufferSize; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) private int readBufferSize; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) private int minBackoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) private int maxBackoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) private int backoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_RETRIES, MinValue = 0, DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; - @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, + @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME, MinValue = 0, MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) private long azureBlockSize; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + @StringConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) private String azureBlockLocationHost; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT, MinValue = 1, DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) private int maxConcurrentWriteThreads; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN, MinValue = 1, DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) private int maxConcurrentReadThreads; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_TOLERATE_CONCURRENT_APPEND, DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) private boolean tolerateOobAppends; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ATOMIC_RENAME_KEY, DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION) private boolean skipUserGroupMetadataDuringInitialization; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_QUEUE_DEPTH, DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) private int readAheadQueueDepth; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH, DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH) private boolean enableFlush; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY, DefaultValue = "") private String userAgentId; @@ -140,7 +150,7 @@ public class AbfsConfiguration{ public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { this.configuration = configuration; - this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); + this.isSecure = this.configuration.getBoolean(FS_AZURE_SECURE_MODE, false); validateStorageAccountKeys(); Field[] fields = this.getClass().getDeclaredFields(); @@ -161,17 +171,17 @@ public class AbfsConfiguration{ } public boolean isEmulator() { - return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + return this.getConfiguration().getBoolean(FS_AZURE_EMULATOR_ENABLED, false); } public boolean isSecureMode() { - return this.isSecure; + return isSecure; } public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException { String key; String keyProviderClass = - configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); + configuration.get(AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); KeyProvider keyProvider; if (keyProviderClass == null) { @@ -278,19 +288,88 @@ public class AbfsConfiguration{ return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE); } + public AuthType getAuthType(final String accountName) { + return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey); + } + + public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException { + AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey); + if (authType == AuthType.OAuth) { + try { + Class<? extends AccessTokenProvider> tokenProviderClass = + configuration.getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName, null, + AccessTokenProvider.class); + AccessTokenProvider tokenProvider = null; + if (tokenProviderClass == ClientCredsTokenProvider.class) { + String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountName); + tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret); + } else if (tokenProviderClass == UserPasswordTokenProvider.class) { + String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName); + String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountName); + String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountName); + tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password); + } else if (tokenProviderClass == MsiTokenProvider.class) { + String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + tokenProvider = new MsiTokenProvider(tenantGuid, clientId); + } else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) { + String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken); + } else { + throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass); + } + return tokenProvider; + } catch(IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new TokenAccessProviderException("Unable to load key provider class.", e); + } + + } else if (authType == AuthType.Custom) { + try { + String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName; + Class<? extends CustomTokenProviderAdaptee> customTokenProviderClass = + configuration.getClass(configKey, null, + CustomTokenProviderAdaptee.class); + if (customTokenProviderClass == null) { + throw new IllegalArgumentException( + String.format("The configuration value for \"%s\" is invalid.", configKey)); + } + CustomTokenProviderAdaptee azureTokenProvider = ReflectionUtils + .newInstance(customTokenProviderClass, configuration); + if (azureTokenProvider == null) { + throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass); + } + azureTokenProvider.initialize(configuration, accountName); + return new CustomTokenProviderAdapter(azureTokenProvider); + } catch(IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new TokenAccessProviderException("Unable to load custom token provider class.", e); + } + + } else { + throw new TokenAccessProviderException(String.format( + "Invalid auth type: %s is being used, expecting OAuth", authType)); + } + } + void validateStorageAccountKeys() throws InvalidConfigurationValueException { Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( - ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); - this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); + FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); + this.storageAccountKeys = configuration.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); - for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) { + for (Map.Entry<String, String> account : storageAccountKeys.entrySet()) { validator.validate(account.getValue()); } } int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new IntegerConfigurationBasicValidator( @@ -303,7 +382,7 @@ public class AbfsConfiguration{ long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new LongConfigurationBasicValidator( @@ -316,7 +395,7 @@ public class AbfsConfiguration{ String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new StringConfigurationBasicValidator( @@ -327,7 +406,7 @@ public class AbfsConfiguration{ String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new Base64StringConfigurationBasicValidator( @@ -338,7 +417,7 @@ public class AbfsConfiguration{ boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new BooleanConfigurationBasicValidator( @@ -347,6 +426,14 @@ public class AbfsConfiguration{ validator.ThrowIfInvalid()).validate(value); } + String getPasswordString(String key) throws IOException { + char[] passchars = configuration.getPassword(key); + if (passchars != null) { + return new String(passchars); + } + return null; + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index ba72149..b8da35b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -67,10 +67,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.permission.FsAction; @@ -487,16 +489,22 @@ public class AzureBlobFileSystemStore { throw new InvalidUriException(uri.toString()); } - int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT); - if (dotIndex <= 0) { - throw new InvalidUriException( - uri.toString() + " - account name is not fully qualified."); + SharedKeyCredentials creds = null; + AccessTokenProvider tokenProvider = null; + + if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) { + int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT); + if (dotIndex <= 0) { + throw new InvalidUriException( + uri.toString() + " - account name is not fully qualified."); + } + creds = new SharedKeyCredentials(accountName.substring(0, dotIndex), + abfsConfiguration.getStorageAccountKey(accountName)); + } else { + tokenProvider = abfsConfiguration.getTokenProvider(accountName); } - SharedKeyCredentials creds = - new SharedKeyCredentials(accountName.substring(0, dotIndex), - this.abfsConfiguration.getStorageAccountKey(accountName)); - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy()); + this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider); } private String getRelativePath(final Path path) { @@ -537,7 +545,7 @@ public class AzureBlobFileSystemStore { Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime); parsedTime = utcDate.getTime(); } catch (ParseException e) { - LOG.error("Failed to parse the date {0}", lastModifiedTime); + LOG.error("Failed to parse the date {}", lastModifiedTime); } finally { return parsedTime; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 16ddd90..ffdf700 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -60,5 +60,24 @@ public final class ConfigurationKeys { public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider."; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + /** Prefix for auth type properties: {@value}. */ + public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type."; + /** Prefix for oauth token provider type: {@value}. */ + public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type."; + /** Prefix for oauth AAD client id: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id."; + /** Prefix for oauth AAD client secret: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret."; + /** Prefix for oauth AAD client endpoint: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint."; + /** Prefix for oauth msi tenant id: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant."; + /** Prefix for oauth user name: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name."; + /** Prefix for oauth user password: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password."; + /** Prefix for oauth refresh token: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token."; + private ConfigurationKeys() {} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java new file mode 100644 index 0000000..b40b34a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown if there is a problem instantiating a TokenAccessProvider or retrieving a configuration + * using a TokenAccessProvider object. + */ +@InterfaceAudience.Private +public class TokenAccessProviderException extends AzureBlobFileSystemException { + + public TokenAccessProviderException(String message) { + super(message); + } + + public TokenAccessProviderException(String message, Throwable cause) { + super(message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index a89f339..63bf8d0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -44,6 +44,7 @@ public enum AzureServiceErrorCode { INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), + AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), UNKNOWN(null, -1, null); private final String errorCode; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java index 903ff69..1de9dfa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java @@ -58,12 +58,30 @@ public class ListResultEntrySchema { private Long contentLength; /** + * The owner property. + */ + @JsonProperty(value = "owner") + private String owner; + + /** + * The group property. + */ + @JsonProperty(value = "group") + private String group; + + /** + * The permissions property. + */ + @JsonProperty(value = "permissions") + private String permissions; + + /** * Get the name value. * * @return the name value */ public String name() { - return this.name; + return name; } /** @@ -83,7 +101,7 @@ public class ListResultEntrySchema { * @return the isDirectory value */ public Boolean isDirectory() { - return this.isDirectory; + return isDirectory; } /** @@ -103,7 +121,7 @@ public class ListResultEntrySchema { * @return the lastModified value */ public String lastModified() { - return this.lastModified; + return lastModified; } /** @@ -123,7 +141,7 @@ public class ListResultEntrySchema { * @return the etag value */ public String eTag() { - return this.eTag; + return eTag; } /** @@ -143,7 +161,7 @@ public class ListResultEntrySchema { * @return the contentLength value */ public Long contentLength() { - return this.contentLength; + return contentLength; } /** @@ -157,4 +175,65 @@ public class ListResultEntrySchema { return this; } + /** + * + Get the owner value. + * + * @return the owner value + */ + public String owner() { + return owner; + } + + /** + * Set the owner value. + * + * @param owner the owner value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withOwner(final String owner) { + this.owner = owner; + return this; + } + + /** + * Get the group value. + * + * @return the group value + */ + public String group() { + return group; + } + + /** + * Set the group value. + * + * @param group the group value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withGroup(final String group) { + this.group = group; + return this; + } + + /** + * Get the permissions value. + * + * @return the permissions value + */ + public String permissions() { + return permissions; + } + + /** + * Set the permissions value. + * + * @param permissions the permissions value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withPermissions(final String permissions) { + this.permissions = permissions; + return this; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java new file mode 100644 index 0000000..72f37a1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Returns an Azure Active Directory token when requested. The provider can + * cache the token if it has already retrieved one. If it does, then the + * provider is responsible for checking expiry and refreshing as needed. + * + * In other words, this is is a token cache that fetches tokens when + * requested, if the cached token has expired. + * + */ +public abstract class AccessTokenProvider { + + private AzureADToken token; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + /** + * returns the {@link AzureADToken} cached (or retrieved) by this instance. + * + * @return {@link AzureADToken} containing the access token + * @throws IOException if there is an error fetching the token + */ + public synchronized AzureADToken getToken() throws IOException { + if (isTokenAboutToExpire()) { + LOG.debug("AAD Token is missing or expired:" + + " Calling refresh-token from abstract base class"); + token = refreshToken(); + } + return token; + } + + /** + * the method to fetch the access token. Derived classes should override + * this method to actually get the token from Azure Active Directory. + * + * This method will be called initially, and then once when the token + * is about to expire. + * + * + * @return {@link AzureADToken} containing the access token + * @throws IOException if there is an error fetching the token + */ + protected abstract AzureADToken refreshToken() throws IOException; + + /** + * Checks if the token is about to expire in the next 5 minutes. + * The 5 minute allowance is to allow for clock skew and also to + * allow for token to be refreshed in that much time. + * + * @return true if the token is expiring in next 5 minutes + */ + private boolean isTokenAboutToExpire() { + if (token == null) { + LOG.debug("AADToken: no token. Returning expiring=true"); + return true; // no token should have same response as expired token + } + boolean expiring = false; + // allow 5 minutes for clock skew + long approximatelyNow = System.currentTimeMillis() + FIVE_MINUTES; + if (token.getExpiry().getTime() < approximatelyNow) { + expiring = true; + } + if (expiring) { + LOG.debug("AADToken: token expiring: " + + token.getExpiry().toString() + + " : Five-minute window: " + + new Date(approximatelyNow).toString()); + } + + return expiring; + } + + // 5 minutes in milliseconds + private static final long FIVE_MINUTES = 300 * 1000; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java new file mode 100644 index 0000000..e82dc95 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.Hashtable; +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; + +/** + * This class provides convenience methods to obtain AAD tokens. + * While convenient, it is not necessary to use these methods to + * obtain the tokens. Customers can use any other method + * (e.g., using the adal4j client) to obtain tokens. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class AzureADAuthenticator { + + private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class); + private static final String RESOURCE_NAME = "https://storage.azure.com/"; + private static final int CONNECT_TIMEOUT = 30 * 1000; + private static final int READ_TIMEOUT = 30 * 1000; + + private AzureADAuthenticator() { + // no operation + } + + /** + * gets Azure Active Directory token using the user ID and password of + * a service principal (that is, Web App in Azure Active Directory). + * + * Azure Active Directory allows users to set up a web app as a + * service principal. Users can optionally obtain service principal keys + * from AAD. This method gets a token using a service principal's client ID + * and keys. In addition, it needs the token endpoint associated with the + * user's directory. + * + * + * @param authEndpoint the OAuth 2.0 token endpoint associated + * with the user's directory (obtain from + * Active Directory configuration) + * @param clientId the client ID (GUID) of the client web app + * btained from Azure Active Directory configuration + * @param clientSecret the secret key of the client web app + * @return {@link AzureADToken} obtained using the creds + * @throws IOException throws IOException if there is a failure in connecting to Azure AD + */ + public static AzureADToken getTokenUsingClientCreds(String authEndpoint, + String clientId, String clientSecret) + throws IOException { + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(clientSecret, "clientSecret"); + + QueryParams qp = new QueryParams(); + qp.add("resource", RESOURCE_NAME); + qp.add("grant_type", "client_credentials"); + qp.add("client_id", clientId); + qp.add("client_secret", clientSecret); + LOG.debug("AADToken: starting to fetch token using client creds for client ID " + clientId); + + return getTokenCall(authEndpoint, qp.serialize(), null, null); + } + + /** + * Gets AAD token from the local virtual machine's VM extension. This only works on + * an Azure VM with MSI extension + * enabled. + * + * @param tenantGuid (optional) The guid of the AAD tenant. Can be {@code null}. + * @param clientId (optional) The clientId guid of the MSI service + * principal to use. Can be {@code null}. + * @param bypassCache {@code boolean} specifying whether a cached token is acceptable or a fresh token + * request should me made to AAD + * @return {@link AzureADToken} obtained using the creds + * @throws IOException throws IOException if there is a failure in obtaining the token + */ + public static AzureADToken getTokenFromMsi(String tenantGuid, String clientId, + boolean bypassCache) throws IOException { + Preconditions.checkNotNull(tenantGuid, "tenantGuid"); + Preconditions.checkNotNull(clientId, "clientId"); + + String authEndpoint = "http://169.254.169.254/metadata/identity/oauth2/token"; + + QueryParams qp = new QueryParams(); + qp.add("api-version", "2018-02-01"); + qp.add("resource", RESOURCE_NAME); + + + if (tenantGuid.length() > 0) { + String authority = "https://login.microsoftonline.com/" + tenantGuid; + qp.add("authority", authority); + } + + if (clientId.length() > 0) { + qp.add("client_id", clientId); + } + + if (bypassCache) { + qp.add("bypass_cache", "true"); + } + + Hashtable<String, String> headers = new Hashtable<>(); + headers.put("Metadata", "true"); + + LOG.debug("AADToken: starting to fetch token using MSI"); + return getTokenCall(authEndpoint, qp.serialize(), headers, "GET"); + } + + /** + * Gets Azure Active Directory token using refresh token. + * + * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration + * @param refreshToken the refresh token + * @return {@link AzureADToken} obtained using the refresh token + * @throws IOException throws IOException if there is a failure in connecting to Azure AD + */ + public static AzureADToken getTokenUsingRefreshToken(String clientId, + String refreshToken) throws IOException { + String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token"; + QueryParams qp = new QueryParams(); + qp.add("grant_type", "refresh_token"); + qp.add("refresh_token", refreshToken); + if (clientId != null) { + qp.add("client_id", clientId); + } + LOG.debug("AADToken: starting to fetch token using refresh token for client ID " + clientId); + return getTokenCall(authEndpoint, qp.serialize(), null, null); + } + + private static class HttpException extends IOException { + private int httpErrorCode; + private String requestId; + + public int getHttpErrorCode() { + return this.httpErrorCode; + } + + public String getRequestId() { + return this.requestId; + } + + HttpException(int httpErrorCode, String requestId, String message) { + super(message); + this.httpErrorCode = httpErrorCode; + this.requestId = requestId; + } + } + + private static AzureADToken getTokenCall(String authEndpoint, String body, + Hashtable<String, String> headers, String httpMethod) + throws IOException { + AzureADToken token = null; + ExponentialRetryPolicy retryPolicy + = new ExponentialRetryPolicy(3, 0, 1000, 2); + + int httperror = 0; + String requestId; + String httpExceptionMessage = null; + IOException ex = null; + boolean succeeded = false; + int retryCount = 0; + do { + httperror = 0; + requestId = ""; + ex = null; + try { + token = getTokenSingleCall(authEndpoint, body, headers, httpMethod); + } catch (HttpException e) { + httperror = e.httpErrorCode; + requestId = e.requestId; + httpExceptionMessage = e.getMessage(); + } catch (IOException e) { + ex = e; + } + succeeded = ((httperror == 0) && (ex == null)); + retryCount++; + } while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror)); + if (!succeeded) { + if (ex != null) { + throw ex; + } + if (httperror != 0) { + throw new IOException(httpExceptionMessage); + } + } + return token; + } + + private static AzureADToken getTokenSingleCall( + String authEndpoint, String payload, Hashtable<String, String> headers, String httpMethod) + throws IOException { + + AzureADToken token = null; + HttpURLConnection conn = null; + String urlString = authEndpoint; + + httpMethod = (httpMethod == null) ? "POST" : httpMethod; + if (httpMethod.equals("GET")) { + urlString = urlString + "?" + payload; + } + + try { + URL url = new URL(urlString); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod(httpMethod); + conn.setReadTimeout(READ_TIMEOUT); + conn.setConnectTimeout(CONNECT_TIMEOUT); + + if (headers != null && headers.size() > 0) { + for (Map.Entry<String, String> entry : headers.entrySet()) { + conn.setRequestProperty(entry.getKey(), entry.getValue()); + } + } + conn.setRequestProperty("Connection", "close"); + + if (httpMethod.equals("POST")) { + conn.setDoOutput(true); + conn.getOutputStream().write(payload.getBytes("UTF-8")); + } + + int httpResponseCode = conn.getResponseCode(); + String requestId = conn.getHeaderField("x-ms-request-id"); + String responseContentType = conn.getHeaderField("Content-Type"); + long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0); + + requestId = requestId == null ? "" : requestId; + if (httpResponseCode == HttpURLConnection.HTTP_OK + && responseContentType.startsWith("application/json") && responseContentLength > 0) { + InputStream httpResponseStream = conn.getInputStream(); + token = parseTokenFromStream(httpResponseStream); + } else { + String responseBody = consumeInputStream(conn.getInputStream(), 1024); + String proxies = "none"; + String httpProxy = System.getProperty("http.proxy"); + String httpsProxy = System.getProperty("https.proxy"); + if (httpProxy != null || httpsProxy != null) { + proxies = "http:" + httpProxy + "; https:" + httpsProxy; + } + String logMessage = + "AADToken: HTTP connection failed for getting token from AzureAD. Http response: " + + httpResponseCode + " " + conn.getResponseMessage() + + " Content-Type: " + responseContentType + + " Content-Length: " + responseContentLength + + " Request ID: " + requestId.toString() + + " Proxies: " + proxies + + " First 1K of Body: " + responseBody; + LOG.debug(logMessage); + throw new HttpException(httpResponseCode, requestId, logMessage); + } + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return token; + } + + private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException { + AzureADToken token = new AzureADToken(); + try { + int expiryPeriod = 0; + + JsonFactory jf = new JsonFactory(); + JsonParser jp = jf.createJsonParser(httpResponseStream); + String fieldName, fieldValue; + jp.nextToken(); + while (jp.hasCurrentToken()) { + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = jp.getCurrentName(); + jp.nextToken(); // field value + fieldValue = jp.getText(); + + if (fieldName.equals("access_token")) { + token.setAccessToken(fieldValue); + } + if (fieldName.equals("expires_in")) { + expiryPeriod = Integer.parseInt(fieldValue); + } + } + jp.nextToken(); + } + jp.close(); + long expiry = System.currentTimeMillis(); + expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add + token.setExpiry(new Date(expiry)); + LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString()); + } catch (Exception ex) { + LOG.debug("AADToken: got exception when parsing json token " + ex.toString()); + throw ex; + } finally { + httpResponseStream.close(); + } + return token; + } + + private static String consumeInputStream(InputStream inStream, int length) throws IOException { + byte[] b = new byte[length]; + int totalBytesRead = 0; + int bytesRead = 0; + + do { + bytesRead = inStream.read(b, totalBytesRead, length - totalBytesRead); + if (bytesRead > 0) { + totalBytesRead += bytesRead; + } + } while (bytesRead >= 0 && totalBytesRead < length); + + return new String(b, 0, totalBytesRead, StandardCharsets.UTF_8); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java new file mode 100644 index 0000000..daa5a93 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.util.Date; + + +/** + * Object representing the AAD access token to use when making HTTP requests to Azure Data Lake Storage. + */ +public class AzureADToken { + private String accessToken; + private Date expiry; + + public String getAccessToken() { + return this.accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public Date getExpiry() { + return new Date(this.expiry.getTime()); + } + + public void setExpiry(Date expiry) { + this.expiry = new Date(expiry.getTime()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java new file mode 100644 index 0000000..9a46018 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on client credentials. + */ +public class ClientCredsTokenProvider extends AccessTokenProvider { + + private final String authEndpoint; + + private final String clientId; + + private final String clientSecret; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + + public ClientCredsTokenProvider(final String authEndpoint, + final String clientId, final String clientSecret) { + + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(clientSecret, "clientSecret"); + + this.authEndpoint = authEndpoint; + this.clientId = clientId; + this.clientSecret = clientSecret; + } + + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing client-credential based token"); + return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, clientId, clientSecret); + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java new file mode 100644 index 0000000..7366a8d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.util.Date; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + + +/** + * This interface provides an extensibility model for customizing the acquisition + * of Azure Active Directory Access Tokens. When "fs.azure.account.auth.type" is + * set to "Custom", implementors may use the + * "fs.azure.account.oauth.provider.type.{accountName}" configuration property + * to specify a class with a custom implementation of CustomTokenProviderAdaptee. + * This class will be dynamically loaded, initialized, and invoked to provide + * AAD Access Tokens and their Expiry. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface CustomTokenProviderAdaptee { + + /** + * Initialize with supported configuration. This method is invoked when the + * (URI, Configuration)} method is invoked. + * + * @param configuration Configuration object + * @param accountName Account Name + * @throws IOException if instance can not be configured. + */ + void initialize(Configuration configuration, String accountName) + throws IOException; + + /** + * Obtain the access token that should be added to https connection's header. + * Will be called depending upon {@link #getExpiryTime()} expiry time is set, + * so implementations should be performant. Implementations are responsible + * for any refreshing of the token. + * + * @return String containing the access token + * @throws IOException if there is an error fetching the token + */ + String getAccessToken() throws IOException; + + /** + * Obtain expiry time of the token. If implementation is performant enough to + * maintain expiry and expect {@link #getAccessToken()} call for every + * connection then safe to return current or past time. + * + * However recommended to use the token expiry time received from Azure Active + * Directory. + * + * @return Date to expire access token retrieved from AAD. + */ + Date getExpiryTime(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java new file mode 100644 index 0000000..7bae415 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on custom implementation, following the Adapter Design + * Pattern. + */ +public final class CustomTokenProviderAdapter extends AccessTokenProvider { + + private CustomTokenProviderAdaptee adaptee; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + /** + * Constructs a token provider based on the custom token provider. + * + * @param adaptee the custom token provider + */ + public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) { + Preconditions.checkNotNull(adaptee, "adaptee"); + this.adaptee = adaptee; + } + + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing custom based token"); + + AzureADToken azureADToken = new AzureADToken(); + azureADToken.setAccessToken(adaptee.getAccessToken()); + azureADToken.setExpiry(adaptee.getExpiryTime()); + + return azureADToken; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java new file mode 100644 index 0000000..2deb9d2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides tokens based on Azure VM's Managed Service Identity. + */ +public class MsiTokenProvider extends AccessTokenProvider { + + private final String tenantGuid; + + private final String clientId; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + public MsiTokenProvider(final String tenantGuid, final String clientId) { + this.tenantGuid = tenantGuid; + this.clientId = clientId; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing token from MSI"); + AzureADToken token = AzureADAuthenticator.getTokenFromMsi(tenantGuid, clientId, false); + return token; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java new file mode 100644 index 0000000..ff6e06f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities class http query parameters. + */ +public class QueryParams { + private Map<String, String> params = new HashMap<>(); + private String apiVersion = null; + private String separator = ""; + private String serializedString = null; + + public void add(String name, String value) { + params.put(name, value); + serializedString = null; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + serializedString = null; + } + + public String serialize() { + if (serializedString == null) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, String> entry : params.entrySet()) { + String name = entry.getKey(); + try { + sb.append(separator); + sb.append(URLEncoder.encode(name, "UTF-8")); + sb.append('='); + sb.append(URLEncoder.encode(entry.getValue(), "UTF-8")); + separator = "&"; + } catch (UnsupportedEncodingException ex) { + } + } + + if (apiVersion != null) { + sb.append(separator); + sb.append("api-version="); + sb.append(apiVersion); + separator = "&"; + } + serializedString = sb.toString(); + } + return serializedString; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java new file mode 100644 index 0000000..949d5bf --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on refresh token. + */ +public class RefreshTokenBasedTokenProvider extends AccessTokenProvider { + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + private final String clientId; + + private final String refreshToken; + + /** + * Constructs a token provider based on the refresh token provided. + * + * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration + * @param refreshToken the refresh token + */ + public RefreshTokenBasedTokenProvider(String clientId, String refreshToken) { + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(refreshToken, "refreshToken"); + this.clientId = clientId; + this.refreshToken = refreshToken; + } + + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing refresh-token based token"); + return AzureADAuthenticator.getTokenUsingRefreshToken(clientId, refreshToken); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java new file mode 100644 index 0000000..7504e9d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +/** + * Provides tokens based on username and password. + */ +public class UserPasswordTokenProvider extends AccessTokenProvider { + + private final String authEndpoint; + + private final String username; + + private final String password; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + public UserPasswordTokenProvider(final String authEndpoint, + final String username, final String password) { + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(username, "username"); + Preconditions.checkNotNull(password, "password"); + + this.authEndpoint = authEndpoint; + this.username = username; + this.password = password; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing user-password based token"); + return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, username, password); + } + + private static String getPasswordString(Configuration conf, String key) + throws IOException { + char[] passchars = conf.getPassword(key); + if (passchars == null) { + throw new IOException("Password " + key + " not found"); + } + return new String(passchars); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java new file mode 100644 index 0000000..bad1a85 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index e003ffd..f5c9f18 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; @@ -42,7 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.* import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; /** - * AbfsClient + * AbfsClient. */ public class AbfsClient { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); @@ -54,9 +55,13 @@ public class AbfsClient { private final AbfsConfiguration abfsConfiguration; private final String userAgent; + private final AccessTokenProvider tokenProvider; + + public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, - final ExponentialRetryPolicy exponentialRetryPolicy) { + final ExponentialRetryPolicy exponentialRetryPolicy, + final AccessTokenProvider tokenProvider) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -76,6 +81,7 @@ public class AbfsClient { } this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); + this.tokenProvider = tokenProvider; } public String getFileSystem() { @@ -409,6 +415,14 @@ public class AbfsClient { return encodedString; } + public synchronized String getAccessToken() throws IOException { + if (tokenProvider != null) { + return "Bearer " + tokenProvider.getToken().getAccessToken(); + } else { + return null; + } + } + @VisibleForTesting String initializeUserAgent(final AbfsConfiguration abfsConfiguration, final String sslProviderName) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java index 46b4c6d..0067b75 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java @@ -19,7 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; /** - * The Http Request / Response Headers for Rest AbfsClient + * The Http Request / Response Headers for Rest AbfsClient. */ public class AbfsHttpHeader { private final String name; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 6dd32fa..c0407f5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; /** * The AbfsRestOperation for Rest AbfsClient. @@ -48,7 +49,7 @@ public class AbfsRestOperation { // request body and all the download methods have a response body. private final boolean hasRequestBody; - private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); + private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); // For uploads, this is the request entity body. For downloads, // this will hold the response entity body. @@ -139,9 +140,15 @@ public class AbfsRestOperation { httpOperation = new AbfsHttpOperation(url, method, requestHeaders); // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - hasRequestBody ? bufferLength : 0); + if (client.getAccessToken() == null) { + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + hasRequestBody ? bufferLength : 0); + } else { + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + } if (hasRequestBody) { // HttpUrlConnection requires @@ -163,9 +170,7 @@ public class AbfsRestOperation { return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("HttpRequest: " + httpOperation.toString()); - } + LOG.debug("HttpRequest: " + httpOperation.toString()); if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9149b970/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java new file mode 100644 index 0000000..c95b92c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Auth Type Enum. + */ +public enum AuthType { + SharedKey, + OAuth, + Custom +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org