HADOOP-15659. Code changes for bug fix and new tests. Contributed by Da Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee6866de Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee6866de Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee6866de Branch: refs/heads/HADOOP-15407 Commit: ee6866de626b898c9e2085afc71cbe90df946841 Parents: 75b184c Author: Thomas Marquardt <tm...@microsoft.com> Authored: Sat Aug 11 00:10:26 2018 +0000 Committer: Thomas Marquardt <tm...@microsoft.com> Committed: Sat Aug 11 03:42:27 2018 +0000 ---------------------------------------------------------------------- hadoop-tools/hadoop-azure/pom.xml | 26 +- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 356 +++++++++++++++++++ .../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 55 ++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 39 +- .../azurebfs/constants/ConfigurationKeys.java | 6 + .../constants/FileSystemConfigurations.java | 4 +- .../exceptions/KeyProviderException.java | 42 +++ .../services/AzureServiceErrorCode.java | 1 + .../services/ListResultEntrySchema.java | 2 +- .../contracts/services/ListResultSchema.java | 2 +- .../hadoop/fs/azurebfs/services/AbfsClient.java | 26 +- .../fs/azurebfs/services/AbfsConfiguration.java | 297 ---------------- .../fs/azurebfs/services/AbfsHttpOperation.java | 19 +- .../fs/azurebfs/services/AbfsInputStream.java | 2 +- .../fs/azurebfs/services/AbfsOutputStream.java | 25 +- .../fs/azurebfs/services/AbfsRestOperation.java | 2 +- .../azurebfs/services/AbfsUriQueryBuilder.java | 8 +- .../fs/azurebfs/services/KeyProvider.java | 42 +++ .../services/ShellDecryptionKeyProvider.java | 63 ++++ .../fs/azurebfs/services/SimpleKeyProvider.java | 54 +++ .../azurebfs/AbstractAbfsIntegrationTest.java | 17 +- .../hadoop/fs/azurebfs/ITestAbfsClient.java | 45 +++ .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java | 89 +++++ .../azurebfs/ITestAzureBlobFileSystemE2E.java | 2 +- .../ITestAzureBlobFileSystemE2EScale.java | 4 +- .../ITestAzureBlobFileSystemFinalize.java | 60 ++++ .../azurebfs/ITestAzureBlobFileSystemFlush.java | 136 ++++++- .../ITestAzureBlobFileSystemInitAndCreate.java | 4 +- .../ITestAzureBlobFileSystemRename.java | 3 +- .../fs/azurebfs/ITestFileSystemProperties.java | 4 - .../TestAbfsConfigurationFieldsValidation.java | 149 ++++++++ .../contract/AbfsFileSystemContract.java | 5 +- .../services/ITestAbfsReadWriteAndSeek.java | 91 ----- .../fs/azurebfs/services/TestAbfsClient.java | 60 ++++ .../TestAbfsConfigurationFieldsValidation.java | 147 -------- .../TestShellDecryptionKeyProvider.java | 89 +++++ 36 files changed, 1344 insertions(+), 632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index cbd4dfb..7d0406c 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -149,17 +149,6 @@ <scope>provided</scope> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <scope>compile</scope> - </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> @@ -198,17 +187,24 @@ </dependency> <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-util-ajax</artifactId> <scope>compile</scope> </dependency> <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-util-ajax</artifactId> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-core-asl</artifactId> <scope>compile</scope> </dependency> + + <!-- dependencies use for test only --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java new file mode 100644 index 0000000..1fb5df9 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.lang.reflect.Field; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.services.KeyProvider; +import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider; + +/** + * Configuration for Azure Blob FileSystem. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AbfsConfiguration{ + private final Configuration configuration; + private final boolean isSecure; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) + private int writeBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) + private int readBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) + private int minBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) + private int maxBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) + private int backoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, + MinValue = 0, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) + private int maxIoRetries; + + @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, + MinValue = 0, + MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, + DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) + private long azureBlockSize; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) + private String azureBlockLocationHost; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) + private int maxConcurrentWriteThreads; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) + private int maxConcurrentReadThreads; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) + private boolean tolerateOobAppends; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, + DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) + private String azureAtomicDirs; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, + DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) + private boolean createRemoteFileSystemDuringInitialization; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, + DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION) + private boolean skipUserGroupMetadataDuringInitialization; + + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) + private int readAheadQueueDepth; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH, + DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH) + private boolean enableFlush; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, + DefaultValue = "") + private String userAgentId; + + private Map<String, String> storageAccountKeys; + + public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { + this.configuration = configuration; + this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); + + validateStorageAccountKeys(); + Field[] fields = this.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { + field.set(this, validateInt(field)); + } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { + field.set(this, validateLong(field)); + } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateString(field)); + } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateBase64String(field)); + } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { + field.set(this, validateBoolean(field)); + } + } + } + + public boolean isEmulator() { + return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + } + + public boolean isSecureMode() { + return this.isSecure; + } + + public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException { + String key; + String keyProviderClass = + configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); + KeyProvider keyProvider; + + if (keyProviderClass == null) { + // No key provider was provided so use the provided key as is. + keyProvider = new SimpleKeyProvider(); + } else { + // create an instance of the key provider class and verify it + // implements KeyProvider + Object keyProviderObject; + try { + Class<?> clazz = configuration.getClassByName(keyProviderClass); + keyProviderObject = clazz.newInstance(); + } catch (Exception e) { + throw new KeyProviderException("Unable to load key provider class.", e); + } + if (!(keyProviderObject instanceof KeyProvider)) { + throw new KeyProviderException(keyProviderClass + + " specified in config is not a valid KeyProvider class."); + } + keyProvider = (KeyProvider) keyProviderObject; + } + key = keyProvider.getStorageAccountKey(accountName, configuration); + + if (key == null) { + throw new ConfigurationPropertyNotFoundException(accountName); + } + + return key; + } + + public Configuration getConfiguration() { + return this.configuration; + } + + public int getWriteBufferSize() { + return this.writeBufferSize; + } + + public int getReadBufferSize() { + return this.readBufferSize; + } + + public int getMinBackoffIntervalMilliseconds() { + return this.minBackoffInterval; + } + + public int getMaxBackoffIntervalMilliseconds() { + return this.maxBackoffInterval; + } + + public int getBackoffIntervalMilliseconds() { + return this.backoffInterval; + } + + public int getMaxIoRetries() { + return this.maxIoRetries; + } + + public long getAzureBlockSize() { + return this.azureBlockSize; + } + + public String getAzureBlockLocationHost() { + return this.azureBlockLocationHost; + } + + public int getMaxConcurrentWriteThreads() { + return this.maxConcurrentWriteThreads; + } + + public int getMaxConcurrentReadThreads() { + return this.maxConcurrentReadThreads; + } + + public boolean getTolerateOobAppends() { + return this.tolerateOobAppends; + } + + public String getAzureAtomicRenameDirs() { + return this.azureAtomicDirs; + } + + public boolean getCreateRemoteFileSystemDuringInitialization() { + return this.createRemoteFileSystemDuringInitialization; + } + + public boolean getSkipUserGroupMetadataDuringInitialization() { + return this.skipUserGroupMetadataDuringInitialization; + } + + public int getReadAheadQueueDepth() { + return this.readAheadQueueDepth; + } + + public boolean isFlushEnabled() { + return this.enableFlush; + } + + public String getCustomUserAgentPrefix() { + return this.userAgentId; + } + + void validateStorageAccountKeys() throws InvalidConfigurationValueException { + Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( + ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); + this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); + + for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) { + validator.validate(account.getValue()); + } + } + + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new IntegerConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new LongConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new Base64StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new BooleanConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + @VisibleForTesting + void setReadBufferSize(int bufferSize) { + this.readBufferSize = bufferSize; + } + + @VisibleForTesting + void setWriteBufferSize(int bufferSize) { + this.writeBufferSize = bufferSize; + } + + @VisibleForTesting + void setEnableFlush(boolean enableFlush) { + this.enableFlush = enableFlush; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9f58f6b..b0a30a0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,7 +91,6 @@ public class AzureBlobFileSystem extends FileSystem { this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.userGroupInformation = UserGroupInformation.getCurrentUser(); this.user = userGroupInformation.getUserName(); - this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation); LOG.debug("Initializing NativeAzureFileSystem for {}", uri); @@ -98,7 +98,16 @@ public class AzureBlobFileSystem extends FileSystem { this.setWorkingDirectory(this.getHomeDirectory()); if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) { - this.createFileSystem(); + if (!this.fileSystemExists()) { + this.createFileSystem(); + } + } + + if (!abfsStore.getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) { + this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); + } else { + //Provide a default group name + this.primaryUserGroup = this.user; } } @@ -375,7 +384,7 @@ public class AzureBlobFileSystem extends FileSystem { if (file.getLen() < start) { return new BlockLocation[0]; } - final String blobLocationHost = this.abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); + final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); final String[] name = { blobLocationHost }; final String[] host = { blobLocationHost }; @@ -397,6 +406,13 @@ public class AzureBlobFileSystem extends FileSystem { return locations; } + @Override + protected void finalize() throws Throwable { + LOG.debug("finalize() called."); + close(); + super.finalize(); + } + public String getOwnerUser() { return user; } @@ -450,13 +466,31 @@ public class AzureBlobFileSystem extends FileSystem { } } + private boolean fileSystemExists() throws IOException { + LOG.debug( + "AzureBlobFileSystem.fileSystemExists uri: {}", uri); + try { + abfsStore.getFilesystemProperties(); + } catch (AzureBlobFileSystemException ex) { + try { + checkException(null, ex); + // Because HEAD request won't contain message body, + // there is not way to get the storage error code + // workaround here is to check its status code. + } catch (FileNotFoundException e) { + return false; + } + } + return true; + } + private void createFileSystem() throws IOException { LOG.debug( "AzureBlobFileSystem.createFileSystem uri: {}", uri); try { - this.abfsStore.createFilesystem(); + abfsStore.createFilesystem(); } catch (AzureBlobFileSystemException ex) { - checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); + checkException(null, ex); } } @@ -556,10 +590,10 @@ public class AzureBlobFileSystem extends FileSystem { //AbfsRestOperationException.getMessage() contains full error info including path/uri. if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) { - throw (IOException)new FileNotFoundException(ere.getMessage()) + throw (IOException) new FileNotFoundException(ere.getMessage()) .initCause(exception); } else if (statusCode == HttpURLConnection.HTTP_CONFLICT) { - throw (IOException)new FileAlreadyExistsException(ere.getMessage()) + throw (IOException) new FileAlreadyExistsException(ere.getMessage()) .initCause(exception); } else { throw ere; @@ -615,6 +649,11 @@ public class AzureBlobFileSystem extends FileSystem { @VisibleForTesting AzureBlobFileSystemStore getAbfsStore() { - return this.abfsStore; + return abfsStore; + } + + @VisibleForTesting + AbfsClient getAbfsClient() { + return abfsStore.getClient(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 8ac31ce..ba72149 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -31,8 +31,11 @@ import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; import java.util.HashSet; import java.util.Hashtable; import java.util.Map; @@ -65,7 +68,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; -import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; @@ -75,8 +77,6 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -223,7 +223,7 @@ public class AzureBlobFileSystemStore { final OutputStream outputStream; outputStream = new FSDataOutputStream( new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize()), null); + abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); return outputStream; } @@ -287,7 +287,7 @@ public class AzureBlobFileSystemStore { final OutputStream outputStream; outputStream = new FSDataOutputStream( new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), - offset, abfsConfiguration.getWriteBufferSize()), null); + offset, abfsConfiguration.getWriteBufferSize(), abfsConfiguration.isFlushEnabled()), null); return outputStream; } @@ -366,7 +366,7 @@ public class AzureBlobFileSystemStore { true, 1, blockSize, - parseLastModifiedTime(lastModified).getMillis(), + parseLastModifiedTime(lastModified), path, eTag); } else { @@ -385,7 +385,7 @@ public class AzureBlobFileSystemStore { parseIsDirectory(resourceType), 1, blockSize, - parseLastModifiedTime(lastModified).getMillis(), + parseLastModifiedTime(lastModified), path, eTag); } @@ -419,10 +419,7 @@ public class AzureBlobFileSystemStore { long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { - final DateTime dateTime = DateTime.parse( - entry.lastModified(), - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); - lastModifiedMillis = dateTime.getMillis(); + lastModifiedMillis = parseLastModifiedTime(entry.lastModified()); } Path entryPath = new Path(File.separator + entry.name()); @@ -534,10 +531,16 @@ public class AzureBlobFileSystemStore { && resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); } - private DateTime parseLastModifiedTime(final String lastModifiedTime) { - return DateTime.parse( - lastModifiedTime, - DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + private long parseLastModifiedTime(final String lastModifiedTime) { + long parsedTime = 0; + try { + Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime); + parsedTime = utcDate.getTime(); + } catch (ParseException e) { + LOG.error("Failed to parse the date {0}", lastModifiedTime); + } finally { + return parsedTime; + } } private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws @@ -663,7 +666,7 @@ public class AzureBlobFileSystemStore { } if (other instanceof VersionedFileStatus) { - return this.version.equals(((VersionedFileStatus)other).version); + return this.version.equals(((VersionedFileStatus) other).version); } return true; @@ -702,5 +705,9 @@ public class AzureBlobFileSystemStore { } } + @VisibleForTesting + AbfsClient getClient() { + return this.client; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index ead1003..9c805a2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -49,9 +49,15 @@ public final class ConfigurationKeys { public static final String AZURE_CONCURRENT_CONNECTION_VALUE_IN = "fs.azure.concurrentRequestCount.in"; public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; + public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; + public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; + public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix"; + + public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider."; + public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; private ConfigurationKeys() {} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 482158c..1655d04 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -39,7 +39,7 @@ public final class FileSystemConfigurations { private static final int ONE_MB = ONE_KB * ONE_KB; // Default upload and download buffer size - public static final int DEFAULT_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4 MB + public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB public static final int DEFAULT_READ_BUFFER_SIZE = 4 * ONE_MB; // 4 MB public static final int MIN_BUFFER_SIZE = 16 * ONE_KB; // 16 KB public static final int MAX_BUFFER_SIZE = 100 * ONE_MB; // 100 MB @@ -50,10 +50,12 @@ public final class FileSystemConfigurations { public static final int MAX_CONCURRENT_WRITE_THREADS = 8; public static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; public static final boolean DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = false; + public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; + public static final boolean DEFAULT_ENABLE_FLUSH = true; private FileSystemConfigurations() {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java new file mode 100644 index 0000000..6723d69 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/KeyProviderException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown if there is a problem instantiating a KeyProvider or retrieving a key + * using a KeyProvider object. + */ +@InterfaceAudience.Private +public class KeyProviderException extends AzureBlobFileSystemException { + private static final long serialVersionUID = 1L; + + public KeyProviderException(String message) { + super(message); + } + + public KeyProviderException(String message, Throwable cause) { + super(message); + } + + public KeyProviderException(Throwable t) { + super(t.getMessage()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index 90e580f..a89f339 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -43,6 +43,7 @@ public enum AzureServiceErrorCode { INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), + INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), UNKNOWN(null, -1, null); private final String errorCode; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java index 02a7ac9..903ff69 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java @@ -18,7 +18,7 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonProperty; import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java index baf06dc..3259742 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultSchema.java @@ -20,7 +20,7 @@ package org.apache.hadoop.fs.azurebfs.contracts.services; import java.util.List; -import com.fasterxml.jackson.annotation.JsonProperty; +import org.codehaus.jackson.annotate.JsonProperty; import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 2b3ccc0..60369be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -26,12 +26,13 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; - +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; @@ -44,7 +45,7 @@ public class AbfsClient { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; - private final String xMsVersion = "2018-03-28"; + private final String xMsVersion = "2018-06-17"; private final ExponentialRetryPolicy retryPolicy; private final String filesystem; private final AbfsConfiguration abfsConfiguration; @@ -59,7 +60,7 @@ public class AbfsClient { this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1); this.abfsConfiguration = abfsConfiguration; this.retryPolicy = exponentialRetryPolicy; - this.userAgent = initializeUserAgent(); + this.userAgent = initializeUserAgent(abfsConfiguration); } public String getFileSystem() { @@ -137,7 +138,7 @@ public class AbfsClient { final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); - abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : relativePath); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); @@ -380,8 +381,8 @@ public class AbfsClient { return url; } - private static String urlEncode(final String value) throws AzureBlobFileSystemException { - String encodedString = null; + public static String urlEncode(final String value) throws AzureBlobFileSystemException { + String encodedString; try { encodedString = URLEncoder.encode(value, UTF_8) .replace(PLUS, PLUS_ENCODE) @@ -393,14 +394,23 @@ public class AbfsClient { return encodedString; } - private String initializeUserAgent() { + @VisibleForTesting + String initializeUserAgent(final AbfsConfiguration abfsConfiguration) { final String userAgentComment = String.format(Locale.ROOT, "(JavaJRE %s; %s %s)", System.getProperty(JAVA_VERSION), System.getProperty(OS_NAME) .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING), System.getProperty(OS_VERSION)); - + String customUserAgentId = abfsConfiguration.getCustomUserAgentPrefix(); + if (customUserAgentId != null && !customUserAgentId.isEmpty()) { + return String.format(Locale.ROOT, CLIENT_VERSION + " %s %s", userAgentComment, customUserAgentId); + } return String.format(CLIENT_VERSION + " %s", userAgentComment); } + + @VisibleForTesting + URL getBaseUrl() { + return baseUrl; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java deleted file mode 100644 index 8def1bb..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java +++ /dev/null @@ -1,297 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.services; - -import java.lang.reflect.Field; -import java.util.Map; - -import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; -import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; -import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; -import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; - -/** - * Configuration for Azure Blob FileSystem. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class AbfsConfiguration{ - private final Configuration configuration; - private final boolean isSecure; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, - MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, - MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, - DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) - private int writeBufferSize; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, - MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, - MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) - private int readBufferSize; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) - private int minBackoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) - private int maxBackoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, - DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) - private int backoffInterval; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, - MinValue = 0, - DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) - private int maxIoRetries; - - @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, - MinValue = 0, - MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, - DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) - private long azureBlockSize; - - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, - DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) - private String azureBlockLocationHost; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, - MinValue = 1, - DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) - private int maxConcurrentWriteThreads; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, - MinValue = 1, - DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) - private int maxConcurrentReadThreads; - - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) - private boolean tolerateOobAppends; - - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, - DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) - private String azureAtomicDirs; - - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, - DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) - private boolean createRemoteFileSystemDuringInitialization; - - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, - DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) - private int readAheadQueueDepth; - - private Map<String, String> storageAccountKeys; - - public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { - this.configuration = configuration; - this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); - - validateStorageAccountKeys(); - Field[] fields = this.getClass().getDeclaredFields(); - for (Field field : fields) { - field.setAccessible(true); - if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { - field.set(this, validateInt(field)); - } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { - field.set(this, validateLong(field)); - } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { - field.set(this, validateString(field)); - } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { - field.set(this, validateBase64String(field)); - } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { - field.set(this, validateBoolean(field)); - } - } - } - - public boolean isEmulator() { - return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); - } - - public boolean isSecureMode() { - return this.isSecure; - } - - public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { - String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); - if (accountKey == null) { - throw new ConfigurationPropertyNotFoundException(accountName); - } - - return accountKey; - } - - public Configuration getConfiguration() { - return this.configuration; - } - - public int getWriteBufferSize() { - return this.writeBufferSize; - } - - public int getReadBufferSize() { - return this.readBufferSize; - } - - public int getMinBackoffIntervalMilliseconds() { - return this.minBackoffInterval; - } - - public int getMaxBackoffIntervalMilliseconds() { - return this.maxBackoffInterval; - } - - public int getBackoffIntervalMilliseconds() { - return this.backoffInterval; - } - - public int getMaxIoRetries() { - return this.maxIoRetries; - } - - public long getAzureBlockSize() { - return this.azureBlockSize; - } - - public String getAzureBlockLocationHost() { - return this.azureBlockLocationHost; - } - - public int getMaxConcurrentWriteThreads() { - return this.maxConcurrentWriteThreads; - } - - public int getMaxConcurrentReadThreads() { - return this.maxConcurrentReadThreads; - } - - public boolean getTolerateOobAppends() { - return this.tolerateOobAppends; - } - - public String getAzureAtomicRenameDirs() { - return this.azureAtomicDirs; - } - - public boolean getCreateRemoteFileSystemDuringInitialization() { - return this.createRemoteFileSystemDuringInitialization; - } - - public int getReadAheadQueueDepth() { - return this.readAheadQueueDepth; - } - - void validateStorageAccountKeys() throws InvalidConfigurationValueException { - Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( - ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); - this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); - - for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) { - validator.validate(account.getValue()); - } - } - - int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new IntegerConfigurationBasicValidator( - validator.MinValue(), - validator.MaxValue(), - validator.DefaultValue(), - validator.ConfigurationKey(), - validator.ThrowIfInvalid()).validate(value); - } - - long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new LongConfigurationBasicValidator( - validator.MinValue(), - validator.MaxValue(), - validator.DefaultValue(), - validator.ConfigurationKey(), - validator.ThrowIfInvalid()).validate(value); - } - - String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new StringConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new Base64StringConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { - BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); - - // validate - return new BooleanConfigurationBasicValidator( - validator.ConfigurationKey(), - validator.DefaultValue(), - validator.ThrowIfInvalid()).validate(value); - } - - @VisibleForTesting - void setReadBufferSize(int bufferSize) { - this.readBufferSize = bufferSize; - } - - @VisibleForTesting - void setWriteBufferSize(int bufferSize) { - this.writeBufferSize = bufferSize; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 53f6900..2bfcff2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -26,10 +26,11 @@ import java.net.URL; import java.util.List; import java.util.UUID; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -167,7 +168,7 @@ public class AbfsHttpOperation { */ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders) throws IOException { - this.isTraceEnabled = this.LOG.isTraceEnabled(); + this.isTraceEnabled = LOG.isTraceEnabled(); this.url = url; this.method = method; this.clientRequestId = UUID.randomUUID().toString(); @@ -303,7 +304,7 @@ public class AbfsHttpOperation { } } } catch (IOException ex) { - this.LOG.error("UnexpectedError: ", ex); + LOG.error("UnexpectedError: ", ex); throw ex; } finally { if (this.isTraceEnabled) { @@ -355,7 +356,7 @@ public class AbfsHttpOperation { return; } JsonFactory jf = new JsonFactory(); - try (JsonParser jp = jf.createParser(stream)) { + try (JsonParser jp = jf.createJsonParser(stream)) { String fieldName, fieldValue; jp.nextToken(); // START_OBJECT - { jp.nextToken(); // FIELD_NAME - "error": @@ -384,7 +385,7 @@ public class AbfsHttpOperation { // Ignore errors that occur while attempting to parse the storage // error, since the response may have been handled by the HTTP driver // or for other reasons have an unexpected - this.LOG.debug("ExpectedError: ", ex); + LOG.debug("ExpectedError: ", ex); } } @@ -415,7 +416,7 @@ public class AbfsHttpOperation { final ObjectMapper objectMapper = new ObjectMapper(); this.listResultSchema = objectMapper.readValue(stream, ListResultSchema.class); } catch (IOException ex) { - this.LOG.error("Unable to deserialize list results", ex); + LOG.error("Unable to deserialize list results", ex); throw ex; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 848ce8a..960579d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -64,7 +64,7 @@ public class AbfsInputStream extends FSInputStream { this.path = path; this.contentLength = contentLength; this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : 2 * Runtime.getRuntime().availableProcessors(); + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); this.eTag = eTag; this.tolerateOobAppends = false; this.readAheadEnabled = true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 2dbcee5..b69ec83 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -43,6 +43,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { private final String path; private long position; private boolean closed; + private boolean supportFlush; private volatile IOException lastError; private long lastFlushOffset; @@ -61,11 +62,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable { final AbfsClient client, final String path, final long position, - final int bufferSize) { + final int bufferSize, + final boolean supportFlush) { this.client = client; this.path = path; this.position = position; this.closed = false; + this.supportFlush = supportFlush; this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; @@ -162,7 +165,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void flush() throws IOException { - flushInternalAsync(); + if (supportFlush) { + flushInternalAsync(); + } } /** Similar to posix fsync, flush out the data in client's user buffer @@ -171,7 +176,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hsync() throws IOException { - flushInternal(); + if (supportFlush) { + flushInternal(); + } } /** Flush out the data in client's user buffer. After the return of @@ -180,7 +187,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hflush() throws IOException { - flushInternal(); + if (supportFlush) { + flushInternal(); + } } /** @@ -262,7 +271,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { writeOperation.task.get(); } catch (Exception ex) { if (ex.getCause() instanceof AzureBlobFileSystemException) { - ex = (AzureBlobFileSystemException)ex.getCause(); + ex = (AzureBlobFileSystemException) ex.getCause(); } lastError = new IOException(ex); throw lastError; @@ -277,8 +286,6 @@ public class AbfsOutputStream extends OutputStream implements Syncable { if (this.lastTotalAppendOffset > this.lastFlushOffset) { this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true); } - - this.lastTotalAppendOffset = 0; } private synchronized void flushWrittenBytesToServiceInternal(final long offset, @@ -304,7 +311,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { - lastError = (AzureBlobFileSystemException)e.getCause(); + lastError = (AzureBlobFileSystemException) e.getCause(); } else { lastError = new IOException(e); } @@ -322,7 +329,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { try { completionService.take(); } catch (InterruptedException e) { - lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e); + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); throw lastError; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 6126398..6dd32fa 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -121,7 +121,7 @@ public class AbfsRestOperation { } } - if (result.getStatusCode() > HttpURLConnection.HTTP_BAD_REQUEST) { + if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) { throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), result.getStorageErrorMessage(), null, result); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java index 3624853..a200b40 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** * The UrlQueryBuilder for Rest AbfsClient. @@ -51,7 +52,12 @@ public class AbfsUriQueryBuilder { } else { sb.append(AbfsHttpConstants.AND_MARK); } - sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(entry.getValue()); + try { + sb.append(entry.getKey()).append(AbfsHttpConstants.EQUAL).append(AbfsClient.urlEncode(entry.getValue())); + } + catch (AzureBlobFileSystemException ex) { + throw new IllegalArgumentException("Query string param is not encode-able: " + entry.getKey() + "=" + entry.getValue()); + } } return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java new file mode 100644 index 0000000..27f76f8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/KeyProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; + +/** + * The interface that every Azure file system key provider must implement. + */ +public interface KeyProvider { + /** + * Key providers must implement this method. Given a list of configuration + * parameters for the specified Azure storage account, retrieve the plaintext + * storage account key. + * + * @param accountName + * the storage account name + * @param conf + * Hadoop configuration parameters + * @return the plaintext storage account key + * @throws KeyProviderException + */ + String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java new file mode 100644 index 0000000..3fc05ff --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ShellDecryptionKeyProvider.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.util.Shell; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Shell decryption key provider which invokes an external script that will + * perform the key decryption. + */ +public class ShellDecryptionKeyProvider extends SimpleKeyProvider { + private static final Logger LOG = LoggerFactory.getLogger(ShellDecryptionKeyProvider.class); + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + String envelope = super.getStorageAccountKey(accountName, conf); + + final String command = conf.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT); + if (command == null) { + throw new KeyProviderException( + "Script path is not specified via fs.azure.shellkeyprovider.script"); + } + + String[] cmd = command.split(" "); + String[] cmdWithEnvelope = Arrays.copyOf(cmd, cmd.length + 1); + cmdWithEnvelope[cmdWithEnvelope.length - 1] = envelope; + + String decryptedKey = null; + try { + decryptedKey = Shell.execCommand(cmdWithEnvelope); + } catch (IOException ex) { + throw new KeyProviderException(ex); + } + + // trim any whitespace + return decryptedKey.trim(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java new file mode 100644 index 0000000..cedae57 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SimpleKeyProvider.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.security.ProviderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Key provider that simply returns the storage account key from the + * configuration as plaintext. + */ +public class SimpleKeyProvider implements KeyProvider { + private static final Logger LOG = LoggerFactory.getLogger(SimpleKeyProvider.class); + + @Override + public String getStorageAccountKey(String accountName, Configuration conf) + throws KeyProviderException { + String key = null; + try { + Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders( + conf, AzureBlobFileSystem.class); + char[] keyChars = c.getPassword(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); + if (keyChars != null) { + key = new String(keyChars); + } + } catch(IOException ioe) { + LOG.warn("Unable to get key from credential providers. {}", ioe); + } + return key; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 106fa09..b1f1485 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -27,10 +27,6 @@ import java.util.concurrent.Callable; import com.google.common.base.Preconditions; import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.TestName; -import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,9 +37,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; -import org.apache.hadoop.fs.azure.integration.AzureTestConstants; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; @@ -175,6 +169,17 @@ public abstract class AbstractAbfsIntegrationTest extends return abfs; } + public AzureBlobFileSystem getFileSystem(Configuration configuration) throws Exception{ + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + return fs; + } + + public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception { + configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri); + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); + return fs; + } + /** * Creates the filesystem; updates the {@link #abfs} field. * @return the created filesystem. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java new file mode 100644 index 0000000..9c369bb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test continuation token which has equal sign. + */ +public final class ITestAbfsClient extends AbstractAbfsIntegrationTest { + private static final int LIST_MAX_RESULTS = 5000; + @Test + public void testContinuationTokenHavingEqualSign() throws Exception { + final AzureBlobFileSystem fs = this.getFileSystem(); + AbfsClient abfsClient = fs.getAbfsClient(); + + try { + AbfsRestOperation op = abfsClient.listPath("/", true, LIST_MAX_RESULTS, "==========="); + Assert.assertTrue(false); + } catch (AbfsRestOperationException ex) { + Assert.assertEquals("InvalidQueryParameterValue", ex.getErrorCode().getErrorCode()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java new file mode 100644 index 0000000..f62ea6e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Arrays; +import java.util.Random; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; + +/** + * Test read, write and seek. + * Uses package-private methods in AbfsConfiguration, which is why it is in + * this package. + */ +@RunWith(Parameterized.class) +public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest { + private static final Path TEST_PATH = new Path("/testfile"); + + @Parameterized.Parameters(name = "Size={0}") + public static Iterable<Object[]> sizes() { + return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE}, + {DEFAULT_READ_BUFFER_SIZE}, + {MAX_BUFFER_SIZE}}); + } + + private final int size; + + public ITestAbfsReadWriteAndSeek(final int size) { + this.size = size; + } + + @Test + public void testReadAndWriteWithDifferentBufferSizesAndSeek() throws Exception { + testReadWriteAndSeek(size); + } + + private void testReadWriteAndSeek(int bufferSize) throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final AbfsConfiguration abfsConfiguration = new AbfsConfiguration(getConfiguration()); + + abfsConfiguration.setWriteBufferSize(bufferSize); + abfsConfiguration.setReadBufferSize(bufferSize); + + + final byte[] b = new byte[2 * bufferSize]; + new Random().nextBytes(b); + try (FSDataOutputStream stream = fs.create(TEST_PATH)) { + stream.write(b); + } + + final byte[] readBuffer = new byte[2 * bufferSize]; + int result; + try (FSDataInputStream inputStream = fs.open(TEST_PATH)) { + inputStream.seek(bufferSize); + result = inputStream.read(readBuffer, bufferSize, bufferSize); + assertNotEquals(-1, result); + inputStream.seek(0); + result = inputStream.read(readBuffer, 0, bufferSize); + } + assertNotEquals("data read in final read()", -1, result); + assertArrayEquals(readBuffer, b); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 057dfa0..f1800c0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -108,7 +108,7 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest { final byte[] b = new byte[1024 * 1000]; new Random().nextBytes(b); - try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE)) { stream.write(b, TEST_OFFSET, b.length - TEST_OFFSET); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee6866de/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java index 04690de..522b635 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java @@ -91,7 +91,7 @@ public class ITestAzureBlobFileSystemE2EScale extends final FileSystem.Statistics abfsStatistics; int testBufferSize; final byte[] sourceData; - try(final FSDataOutputStream stream = fs.create(TEST_FILE)) { + try (FSDataOutputStream stream = fs.create(TEST_FILE)) { abfsStatistics = fs.getFsStatistics(); abfsStatistics.reset(); @@ -112,7 +112,7 @@ public class ITestAzureBlobFileSystemE2EScale extends remoteData.length, abfsStatistics.getBytesRead()); assertEquals("bytes written in " + stats, sourceData.length, abfsStatistics.getBytesWritten()); - assertEquals("bytesRead from read() call", testBufferSize, bytesRead ); + assertEquals("bytesRead from read() call", testBufferSize, bytesRead); assertArrayEquals("round tripped data", sourceData, remoteData); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org