HADOOP-15560. ABFS: removed dependency injection and unnecessary dependencies. Contributed by Da Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a271fd0e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a271fd0e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a271fd0e Branch: refs/heads/HADOOP-15407 Commit: a271fd0eca75cef8b8ba940cdac8ad4fd21b4462 Parents: f044dee Author: Steve Loughran <ste...@apache.org> Authored: Tue Jul 3 18:55:10 2018 +0200 Committer: Thomas Marquardt <tm...@microsoft.com> Committed: Mon Sep 17 19:54:01 2018 +0000 ---------------------------------------------------------------------- hadoop-tools/hadoop-azure/pom.xml | 18 - .../src/config/checkstyle-suppressions.xml | 2 +- .../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 88 ++- .../fs/azurebfs/AzureBlobFileSystemStore.java | 701 +++++++++++++++++++ .../exceptions/ServiceResolutionException.java | 36 - .../services/AbfsHttpClientFactory.java | 39 -- .../contracts/services/AbfsHttpService.java | 162 ----- .../contracts/services/AbfsServiceProvider.java | 40 -- .../services/ConfigurationService.java | 143 ---- .../contracts/services/InjectableService.java | 30 - .../contracts/services/TracingService.java | 66 -- .../hadoop/fs/azurebfs/services/AbfsClient.java | 7 +- .../fs/azurebfs/services/AbfsConfiguration.java | 297 ++++++++ .../services/AbfsHttpClientFactoryImpl.java | 116 --- .../azurebfs/services/AbfsHttpServiceImpl.java | 693 ------------------ .../services/AbfsServiceInjectorImpl.java | 81 --- .../services/AbfsServiceProviderImpl.java | 96 --- .../services/ConfigurationServiceImpl.java | 317 --------- .../services/ExponentialRetryPolicy.java | 9 +- .../azurebfs/services/LoggerSpanReceiver.java | 74 -- .../azurebfs/services/TracingServiceImpl.java | 134 ---- .../fs/azurebfs/DependencyInjectedTest.java | 59 +- .../azurebfs/ITestAzureBlobFileSystemE2E.java | 7 +- .../ITestAzureBlobFileSystemRandomRead.java | 7 +- .../azurebfs/ITestFileSystemInitialization.java | 23 +- .../fs/azurebfs/ITestFileSystemProperties.java | 126 ++++ .../azurebfs/ITestFileSystemRegistration.java | 23 +- .../ITestAzureBlobFileSystemBasics.java | 11 +- .../services/ITestAbfsHttpServiceImpl.java | 122 ---- .../services/ITestReadWriteAndSeek.java | 8 +- .../services/ITestTracingServiceImpl.java | 79 --- .../services/MockAbfsHttpClientFactoryImpl.java | 69 -- .../services/MockAbfsServiceInjectorImpl.java | 50 -- .../services/MockServiceProviderImpl.java | 36 - .../TestAbfsConfigurationFieldsValidation.java | 149 ++++ ...estConfigurationServiceFieldsValidation.java | 149 ---- .../utils/CleanUpAbfsTestContainer.java | 68 ++ 37 files changed, 1432 insertions(+), 2703 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index d4046ef..cbd4dfb 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -150,12 +150,6 @@ </dependency> <dependency> - <groupId>org.threadly</groupId> - <artifactId>threadly</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <scope>compile</scope> @@ -186,18 +180,6 @@ </dependency> <dependency> - <groupId>org.apache.htrace</groupId> - <artifactId>htrace-core</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> - <groupId>org.apache.htrace</groupId> - <artifactId>htrace-core4</artifactId> - <scope>compile</scope> - </dependency> - - <dependency> <groupId>com.google.inject</groupId> <artifactId>guice</artifactId> <scope>compile</scope> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 0204355..751a227 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -43,5 +43,5 @@ <suppressions> <suppress checks="ParameterNumber|MagicNumber" - files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsHttpServiceImpl.java"/> + files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]AzureBlobFileSystemStore.java"/> </suppressions> http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 707c81e..cf5acbb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -39,10 +39,8 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.threadly.util.ExceptionUtils; import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -58,10 +56,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; -import org.apache.hadoop.fs.azurebfs.contracts.services.TracingService; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsHttpService; -import org.apache.hadoop.fs.azurebfs.contracts.services.AbfsServiceProvider; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; @@ -70,7 +64,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; -import org.apache.htrace.core.TraceScope; /** * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a @@ -85,10 +78,7 @@ public class AzureBlobFileSystem extends FileSystem { private UserGroupInformation userGroupInformation; private String user; private String primaryUserGroup; - private AbfsServiceProvider abfsServiceProvider; - private TracingService tracingService; - private AbfsHttpService abfsHttpService; - private ConfigurationService configurationService; + private AzureBlobFileSystemStore abfsStore; private boolean isClosed; @Override @@ -96,18 +86,8 @@ public class AzureBlobFileSystem extends FileSystem { throws IOException { uri = ensureAuthority(uri, configuration); super.initialize(uri, configuration); - setConf(configuration); - try { - this.abfsServiceProvider = AbfsServiceProviderImpl.create(configuration); - this.tracingService = abfsServiceProvider.get(TracingService.class); - this.abfsHttpService = abfsServiceProvider.get(AbfsHttpService.class); - this.configurationService = abfsServiceProvider.get(ConfigurationService.class); - } catch (AzureBlobFileSystemException exception) { - throw new IOException(exception); - } - this.LOG.debug( "Initializing AzureBlobFileSystem for {}", uri); @@ -115,13 +95,14 @@ public class AzureBlobFileSystem extends FileSystem { this.userGroupInformation = UserGroupInformation.getCurrentUser(); this.user = userGroupInformation.getUserName(); this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecure(), configuration, userGroupInformation); this.LOG.debug( "Initializing NativeAzureFileSystem for {}", uri); this.setWorkingDirectory(this.getHomeDirectory()); - if (this.configurationService.getCreateRemoteFileSystemDuringInitialization()) { + if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) { this.createFileSystem(); } @@ -143,7 +124,7 @@ public class AzureBlobFileSystem extends FileSystem { "AzureBlobFileSystem.open path: {} bufferSize: {}", path.toString(), bufferSize); try { - InputStream inputStream = abfsHttpService.openFileForRead(this, makeQualified(path), statistics); + InputStream inputStream = abfsStore.openFileForRead(makeQualified(path), statistics); return new FSDataInputStream(inputStream); } catch(AzureBlobFileSystemException ex) { checkException(path, ex); @@ -162,7 +143,7 @@ public class AzureBlobFileSystem extends FileSystem { blockSize); try { - OutputStream outputStream = abfsHttpService.createFile(this, makeQualified(f), overwrite); + OutputStream outputStream = abfsStore.createFile(makeQualified(f), overwrite); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); @@ -221,7 +202,7 @@ public class AzureBlobFileSystem extends FileSystem { bufferSize); try { - OutputStream outputStream = abfsHttpService.openFileForWrite(this, makeQualified(f), false); + OutputStream outputStream = abfsStore.openFileForWrite(makeQualified(f), false); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); @@ -251,7 +232,7 @@ public class AzureBlobFileSystem extends FileSystem { adjustedDst = new Path(dst, sourceFileName); } - abfsHttpService.rename(this, makeQualified(src), makeQualified(adjustedDst)); + abfsStore.rename(makeQualified(src), makeQualified(adjustedDst)); return true; } catch(AzureBlobFileSystemException ex) { checkException( @@ -281,7 +262,7 @@ public class AzureBlobFileSystem extends FileSystem { } try { - abfsHttpService.delete(this, makeQualified(f), recursive); + abfsStore.delete(makeQualified(f), recursive); return true; } catch (AzureBlobFileSystemException ex) { checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND); @@ -296,7 +277,7 @@ public class AzureBlobFileSystem extends FileSystem { "AzureBlobFileSystem.listStatus path: {}", f.toString()); try { - FileStatus[] result = abfsHttpService.listStatus(this, makeQualified(f)); + FileStatus[] result = abfsStore.listStatus(makeQualified(f)); return result; } catch (AzureBlobFileSystemException ex) { checkException(f, ex); @@ -316,7 +297,7 @@ public class AzureBlobFileSystem extends FileSystem { } try { - abfsHttpService.createDirectory(this, makeQualified(f)); + abfsStore.createDirectory(makeQualified(f)); return true; } catch (AzureBlobFileSystemException ex) { checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS); @@ -332,13 +313,7 @@ public class AzureBlobFileSystem extends FileSystem { super.close(); this.LOG.debug("AzureBlobFileSystem.close"); - - try { - abfsHttpService.closeFileSystem(this); - } catch (AzureBlobFileSystemException ex) { - checkException(null, ex); - this.isClosed = true; - } + this.isClosed = true; } @Override @@ -346,7 +321,7 @@ public class AzureBlobFileSystem extends FileSystem { this.LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f.toString()); try { - return abfsHttpService.getFileStatus(this, makeQualified(f)); + return abfsStore.getFileStatus(makeQualified(f)); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); return null; @@ -397,7 +372,7 @@ public class AzureBlobFileSystem extends FileSystem { if (file.getLen() < start) { return new BlockLocation[0]; } - final String blobLocationHost = this.configurationService.getAzureBlockLocationHost(); + final String blobLocationHost = this.abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); final String[] name = { blobLocationHost }; final String[] host = { blobLocationHost }; @@ -477,12 +452,10 @@ public class AzureBlobFileSystem extends FileSystem { this.LOG.debug( "AzureBlobFileSystem.createFileSystem uri: {}", uri); try { - abfsHttpService.createFilesystem(this); + this.abfsStore.createFilesystem(); } catch (AzureBlobFileSystemException ex) { checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); } - - } private URI ensureAuthority(URI uri, final Configuration conf) { @@ -540,25 +513,19 @@ public class AzureBlobFileSystem extends FileSystem { final Callable<T> callableFileOperation, T defaultResultValue) throws IOException { - final TraceScope traceScope = tracingService.traceBegin(scopeDescription); try { final T executionResult = callableFileOperation.call(); return new FileSystemOperation(executionResult, null); } catch (AbfsRestOperationException abfsRestOperationException) { return new FileSystemOperation(defaultResultValue, abfsRestOperationException); } catch (AzureBlobFileSystemException azureBlobFileSystemException) { - tracingService.traceException(traceScope, azureBlobFileSystemException); throw new IOException(azureBlobFileSystemException); } catch (Exception exception) { if (exception instanceof ExecutionException) { - exception = (Exception) ExceptionUtils.getRootCause(exception); + exception = (Exception) getRootCause(exception); } - final FileSystemOperationUnhandledException fileSystemOperationUnhandledException = new FileSystemOperationUnhandledException(exception); - tracingService.traceException(traceScope, fileSystemOperationUnhandledException); throw new IOException(fileSystemOperationUnhandledException); - } finally { - tracingService.traceEnd(traceScope); } } @@ -590,6 +557,26 @@ public class AzureBlobFileSystem extends FileSystem { } } + /** + * Gets the root cause of a provided {@link Throwable}. If there is no cause for the + * {@link Throwable} provided into this function, the original {@link Throwable} is returned. + * + * @param throwable starting {@link Throwable} + * @return root cause {@link Throwable} + */ + private Throwable getRootCause(Throwable throwable) { + if (throwable == null) { + throw new IllegalArgumentException("throwable can not be null"); + } + + Throwable result = throwable; + while (result.getCause() != null) { + result = result.getCause(); + } + + return result; + } + @VisibleForTesting FileSystem.Statistics getFsStatistics() { return this.statistics; @@ -609,4 +596,9 @@ public class AzureBlobFileSystem extends FileSystem { return this.exception != null; } } + + @VisibleForTesting + AzureBlobFileSystemStore getAbfsStore() { + return this.abfsStore; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java new file mode 100644 index 0000000..134277f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -0,0 +1,701 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Map; +import java.util.Set; +import javax.xml.bind.DatatypeConverter; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; +import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.http.client.utils.URIBuilder; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.util.Time.now; + +/** + * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class AzureBlobFileSystemStore { + private static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystemStore.class); + + private AbfsClient client; + private URI uri; + private final UserGroupInformation userGroupInformation; + private static final String DATE_TIME_PATTERN = "E, dd MMM yyyy HH:mm:ss 'GMT'"; + private static final String XMS_PROPERTIES_ENCODING = "ISO-8859-1"; + private static final int LIST_MAX_RESULTS = 5000; + private static final int DELETE_DIRECTORY_TIMEOUT_MILISECONDS = 180000; + private static final int RENAME_TIMEOUT_MILISECONDS = 180000; + + private final AbfsConfiguration abfsConfiguration; + private final Set<String> azureAtomicRenameDirSet; + + + public AzureBlobFileSystemStore(URI uri, boolean isSeure, Configuration configuration, UserGroupInformation userGroupInformation) + throws AzureBlobFileSystemException { + this.uri = uri; + try { + this.abfsConfiguration = new AbfsConfiguration(configuration); + } catch (IllegalAccessException exception) { + throw new FileSystemOperationUnhandledException(exception); + } + + this.userGroupInformation = userGroupInformation; + this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA))); + + initializeClient(uri, isSeure); + } + + @VisibleForTesting + URIBuilder getURIBuilder(final String hostName, boolean isSecure) { + String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME; + + final URIBuilder uriBuilder = new URIBuilder(); + uriBuilder.setScheme(scheme); + uriBuilder.setHost(hostName); + + return uriBuilder; + } + + public AbfsConfiguration getAbfsConfiguration() { + return this.abfsConfiguration; + } + + public Hashtable<String, String> getFilesystemProperties() throws AzureBlobFileSystemException { + this.LOG.debug( + "getFilesystemProperties for filesystem: {}", + client.getFileSystem()); + + final Hashtable<String, String> parsedXmsProperties; + + final AbfsRestOperation op = client.getFilesystemProperties(); + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + + return parsedXmsProperties; + } + + public void setFilesystemProperties(final Hashtable<String, String> properties) throws AzureBlobFileSystemException { + if (properties == null || properties.size() == 0) { + return; + } + + this.LOG.debug( + "setFilesystemProperties for filesystem: {} with properties: {}", + client.getFileSystem(), + properties); + + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + + client.setFilesystemProperties(commaSeparatedProperties); + } + + public Hashtable<String, String> getPathProperties(final Path path) throws AzureBlobFileSystemException { + this.LOG.debug( + "getPathProperties for filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + final Hashtable<String, String> parsedXmsProperties; + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String xMsProperties = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_PROPERTIES); + + parsedXmsProperties = parseCommaSeparatedXmsProperties(xMsProperties); + + return parsedXmsProperties; + } + + public void setPathProperties(final Path path, final Hashtable<String, String> properties) throws AzureBlobFileSystemException { + this.LOG.debug( + "setFilesystemProperties for filesystem: {} path: {} with properties: {}", + client.getFileSystem(), + path.toString(), + properties); + + final String commaSeparatedProperties; + try { + commaSeparatedProperties = convertXmsPropertiesToCommaSeparatedString(properties); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + client.setPathProperties("/" + getRelativePath(path), commaSeparatedProperties); + } + + public void createFilesystem() throws AzureBlobFileSystemException { + this.LOG.debug( + "createFilesystem for filesystem: {}", + client.getFileSystem()); + + client.createFilesystem(); + } + + public void deleteFilesystem() throws AzureBlobFileSystemException { + this.LOG.debug( + "deleteFilesystem for filesystem: {}", + client.getFileSystem()); + + client.deleteFilesystem(); + } + + public OutputStream createFile(final Path path, final boolean overwrite) throws AzureBlobFileSystemException { + this.LOG.debug( + "createFile filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString(), + overwrite); + + client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite); + + final OutputStream outputStream; + outputStream = new FSDataOutputStream( + new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, + abfsConfiguration.getWriteBufferSize()), null); + return outputStream; + } + + public Void createDirectory(final Path path) throws AzureBlobFileSystemException { + this.LOG.debug( + "createDirectory filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString()); + + client.createPath("/" + getRelativePath(path), false, true); + + return null; + } + + public InputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) throws AzureBlobFileSystemException { + + this.LOG.debug( + "openFileForRead filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final long contentLength = Long.parseLong(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + // Add statistics for InputStream + return new FSDataInputStream( + new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, + abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), eTag)); + } + + public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws + AzureBlobFileSystemException { + this.LOG.debug( + "openFileForWrite filesystem: {} path: {} overwrite: {}", + client.getFileSystem(), + path.toString(), + overwrite); + + final AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + final Long contentLength = Long.valueOf(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)); + + if (parseIsDirectory(resourceType)) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "openFileForRead must be used with files and not directories", + null); + } + + final long offset = overwrite ? 0 : contentLength; + + final OutputStream outputStream; + outputStream = new FSDataOutputStream( + new AbfsOutputStream(client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + offset, abfsConfiguration.getWriteBufferSize()), null); + return outputStream; + } + + public void rename(final Path source, final Path destination) throws + AzureBlobFileSystemException { + + if (isAtomicRenameKey(source.getName())) { + this.LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," + +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account."); + } + + this.LOG.debug( + "renameAsync filesystem: {} source: {} destination: {}", + client.getFileSystem(), + source.toString(), + destination.toString()); + + String continuation = null; + long deadline = now() + RENAME_TIMEOUT_MILISECONDS; + + do { + if (now() > deadline) { + LOG.debug( + "Rename {} to {} timed out.", + source, + destination); + + throw new TimeoutException("Rename timed out."); + } + + AbfsRestOperation op = client.renamePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(source), + AbfsHttpConstants.FORWARD_SLASH + getRelativePath(destination), continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + + } while (continuation != null && !continuation.isEmpty()); + } + + public void delete(final Path path, final boolean recursive) throws + AzureBlobFileSystemException { + + this.LOG.debug( + "delete filesystem: {} path: {} recursive: {}", + client.getFileSystem(), + path.toString(), + String.valueOf(recursive)); + + String continuation = null; + long deadline = now() + DELETE_DIRECTORY_TIMEOUT_MILISECONDS; + + do { + if (now() > deadline) { + this.LOG.debug( + "Delete directory {} timed out.", path); + + throw new TimeoutException("Delete directory timed out."); + } + + AbfsRestOperation op = client.deletePath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), recursive, continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + + } while (continuation != null && !continuation.isEmpty()); + } + + public FileStatus getFileStatus(final Path path) throws IOException { + + this.LOG.debug( + "getFileStatus filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + if (path.isRoot()) { + AbfsRestOperation op = client.getFilesystemProperties(); + final long blockSize = abfsConfiguration.getAzureBlockSize(); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + return new VersionedFileStatus( + userGroupInformation.getUserName(), + userGroupInformation.getPrimaryGroupName(), + 0, + true, + 1, + blockSize, + parseLastModifiedTime(lastModified).getMillis(), + path, + eTag); + } else { + AbfsRestOperation op = client.getPathProperties(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path)); + + final long blockSize = abfsConfiguration.getAzureBlockSize(); + final String eTag = op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG); + final String lastModified = op.getResult().getResponseHeader(HttpHeaderConfigurations.LAST_MODIFIED); + final String contentLength = op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH); + final String resourceType = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); + + return new VersionedFileStatus( + userGroupInformation.getUserName(), + userGroupInformation.getPrimaryGroupName(), + parseContentLength(contentLength), + parseIsDirectory(resourceType), + 1, + blockSize, + parseLastModifiedTime(lastModified).getMillis(), + path, + eTag); + } + } + + public FileStatus[] listStatus(final Path path) throws IOException { + this.LOG.debug( + "listStatus filesystem: {} path: {}", + client.getFileSystem(), + path.toString()); + + String relativePath = path.isRoot() ? AbfsHttpConstants.EMPTY_STRING : getRelativePath(path); + String continuation = null; + ArrayList<FileStatus> fileStatuses = new ArrayList<>(); + + do { + AbfsRestOperation op = client.listPath(relativePath, false, LIST_MAX_RESULTS, continuation); + continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); + ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); + if (retrievedSchema == null) { + throw new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + "listStatusAsync path not found", + null, op.getResult()); + } + + long blockSize = abfsConfiguration.getAzureBlockSize(); + + for (ListResultEntrySchema entry : retrievedSchema.paths()) { + long lastModifiedMillis = 0; + long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); + boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); + if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { + final DateTime dateTime = DateTime.parse( + entry.lastModified(), + DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + lastModifiedMillis = dateTime.getMillis(); + } + + Path entryPath = new Path(File.separator + entry.name()); + entryPath = entryPath.makeQualified(this.uri, entryPath); + + fileStatuses.add( + new VersionedFileStatus( + userGroupInformation.getUserName(), + userGroupInformation.getPrimaryGroupName(), + contentLength, + isDirectory, + 1, + blockSize, + lastModifiedMillis, + entryPath, + entry.eTag())); + } + + } while (continuation != null && !continuation.isEmpty()); + + return fileStatuses.toArray(new FileStatus[0]); + } + + public boolean isAtomicRenameKey(String key) { + return isKeyForDirectorySet(key, azureAtomicRenameDirSet); + } + + private void initializeClient(URI uri, boolean isSeure) throws AzureBlobFileSystemException { + if (this.client != null) { + return; + } + + final String authority = uri.getRawAuthority(); + if (null == authority) { + throw new InvalidUriAuthorityException(uri.toString()); + } + + if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) { + throw new InvalidUriAuthorityException(uri.toString()); + } + + final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2); + + if (authorityParts.length < 2 || "".equals(authorityParts[0])) { + final String errMsg = String + .format("URI '%s' has a malformed authority, expected container name. " + + "Authority takes the form "+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>", + uri.toString()); + throw new InvalidUriException(errMsg); + } + + final String fileSystemName = authorityParts[0]; + final String accountName = authorityParts[1]; + + final URIBuilder uriBuilder = getURIBuilder(accountName, isSeure); + + final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName; + + URL baseUrl; + try { + baseUrl = new URL(url); + } catch (MalformedURLException e) { + throw new InvalidUriException(String.format("URI '%s' is malformed", uri.toString())); + } + + SharedKeyCredentials creds = + new SharedKeyCredentials(accountName.substring(0, accountName.indexOf(AbfsHttpConstants.DOT)), + this.abfsConfiguration.getStorageAccountKey(accountName)); + + this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy()); + } + + private String getRelativePath(final Path path) { + Preconditions.checkNotNull(path, "path"); + final String relativePath = path.toUri().getPath(); + + if (relativePath.length() == 0) { + return relativePath; + } + + if (relativePath.charAt(0) == Path.SEPARATOR_CHAR) { + if (relativePath.length() == 1) { + return AbfsHttpConstants.EMPTY_STRING; + } + + return relativePath.substring(1); + } + + return relativePath; + } + + private long parseContentLength(final String contentLength) { + if (contentLength == null) { + return -1; + } + + return Long.parseLong(contentLength); + } + + private boolean parseIsDirectory(final String resourceType) { + return resourceType == null ? false : resourceType.equalsIgnoreCase(AbfsHttpConstants.DIRECTORY); + } + + private DateTime parseLastModifiedTime(final String lastModifiedTime) { + return DateTime.parse( + lastModifiedTime, + DateTimeFormat.forPattern(DATE_TIME_PATTERN).withZoneUTC()); + } + + private String convertXmsPropertiesToCommaSeparatedString(final Hashtable<String, String> properties) throws + CharacterCodingException { + StringBuilder commaSeparatedProperties = new StringBuilder(); + + final CharsetEncoder encoder = Charset.forName(XMS_PROPERTIES_ENCODING).newEncoder(); + + for (Map.Entry<String, String> propertyEntry : properties.entrySet()) { + String key = propertyEntry.getKey(); + String value = propertyEntry.getValue(); + + Boolean canEncodeValue = encoder.canEncode(value); + if (!canEncodeValue) { + throw new CharacterCodingException(); + } + + String encodedPropertyValue = DatatypeConverter.printBase64Binary(encoder.encode(CharBuffer.wrap(value)).array()); + commaSeparatedProperties.append(key) + .append(AbfsHttpConstants.EQUAL) + .append(encodedPropertyValue); + + commaSeparatedProperties.append(AbfsHttpConstants.COMMA); + } + + if (commaSeparatedProperties.length() != 0) { + commaSeparatedProperties.deleteCharAt(commaSeparatedProperties.length() - 1); + } + + return commaSeparatedProperties.toString(); + } + + private Hashtable<String, String> parseCommaSeparatedXmsProperties(String xMsProperties) throws + InvalidFileSystemPropertyException, InvalidAbfsRestOperationException { + Hashtable<String, String> properties = new Hashtable<>(); + + final CharsetDecoder decoder = Charset.forName(XMS_PROPERTIES_ENCODING).newDecoder(); + + if (xMsProperties != null && !xMsProperties.isEmpty()) { + String[] userProperties = xMsProperties.split(AbfsHttpConstants.COMMA); + + if (userProperties.length == 0) { + return properties; + } + + for (String property : userProperties) { + if (property.isEmpty()) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + String[] nameValue = property.split(AbfsHttpConstants.EQUAL, 2); + if (nameValue.length != 2) { + throw new InvalidFileSystemPropertyException(xMsProperties); + } + + byte[] decodedValue = DatatypeConverter.parseBase64Binary(nameValue[1]); + + final String value; + try { + value = decoder.decode(ByteBuffer.wrap(decodedValue)).toString(); + } catch (CharacterCodingException ex) { + throw new InvalidAbfsRestOperationException(ex); + } + properties.put(nameValue[0], value); + } + } + + return properties; + } + + private boolean isKeyForDirectorySet(String key, Set<String> dirSet) { + for (String dir : dirSet) { + if (dir.isEmpty() || key.startsWith(dir + AbfsHttpConstants.FORWARD_SLASH)) { + return true; + } + + try { + URI uri = new URI(dir); + if (null == uri.getAuthority()) { + if (key.startsWith(dir + "/")){ + return true; + } + } + } catch (URISyntaxException e) { + this.LOG.info("URI syntax error creating URI for {}", dir); + } + } + + return false; + } + + private static class VersionedFileStatus extends FileStatus { + private final String version; + + VersionedFileStatus( + final String owner, final String group, + final long length, final boolean isdir, final int blockReplication, + final long blocksize, final long modificationTime, final Path path, + String version) { + super(length, isdir, blockReplication, blocksize, modificationTime, 0, + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL), + owner, + group, + path); + + this.version = version; + } + + /** Compare if this object is equal to another object. + * @param obj the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (obj == null) { + return false; + } + + if (this.getClass() == obj.getClass()) { + VersionedFileStatus other = (VersionedFileStatus) obj; + return this.getPath().equals(other.getPath()) && this.version.equals(other.version); + } + + return false; + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name and version + */ + @Override + public int hashCode() { + int hash = getPath().hashCode(); + hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0); + return hash; + } + + /** + * Returns the version of this FileStatus + * + * @return a string value for the FileStatus version + */ + public String getVersion() { + return this.version; + } + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java deleted file mode 100644 index 694d902..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ServiceResolutionException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.exceptions; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.services.AbfsServiceProviderImpl; - -/** - * Thrown a service is either not configured to be injected or the service is not existing. - * For service registration - * @see AbfsServiceProviderImpl - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public final class ServiceResolutionException extends AzureBlobFileSystemException { - public ServiceResolutionException(String serviceName, Exception innerException) { - super(String.format("%s cannot be resolved.", serviceName), innerException); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java deleted file mode 100644 index c433f9a..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpClientFactory.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.services.AbfsClient; - -/** - * AbfsClient factory. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface AbfsHttpClientFactory extends InjectableService { - /** - * Creates and configures an instance of new AbfsClient - * @return AbfsClient instance - */ - AbfsClient create(AzureBlobFileSystem fs) throws AzureBlobFileSystemException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java deleted file mode 100644 index 3107fa3..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsHttpService.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Hashtable; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; - -/** - * File System http service to provide network calls for file system operations. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface AbfsHttpService extends InjectableService { - /** - * Gets filesystem properties on the Azure service. - * @param azureBlobFileSystem filesystem to get the properties. - * @return Hashtable<String, String> hash table containing all the filesystem properties. - */ - Hashtable<String, String> getFilesystemProperties(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException; - - - /** - * Sets filesystem properties on the Azure service. - * @param azureBlobFileSystem filesystem to get the properties. - * @param properties file system properties to set. - */ - void setFilesystemProperties(AzureBlobFileSystem azureBlobFileSystem, Hashtable<String, String> properties) throws - AzureBlobFileSystemException; - - /** - * Gets path properties on the Azure service. - * @param azureBlobFileSystem filesystem to get the properties of the path. - * @param path path to get properties. - * @return Hashtable<String, String> hash table containing all the path properties. - */ - Hashtable<String, String> getPathProperties(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException; - - /** - * Sets path properties on the Azure service. - * @param azureBlobFileSystem filesystem to get the properties of the path. - * @param path path to set properties. - * @param properties hash table containing all the path properties. - */ - void setPathProperties(AzureBlobFileSystem azureBlobFileSystem, Path path, Hashtable<String, String> properties) throws - AzureBlobFileSystemException; - - /** - * Creates filesystem on the Azure service. - * @param azureBlobFileSystem filesystem to be created. - */ - void createFilesystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException; - - /** - * Deletes filesystem on the Azure service. - * @param azureBlobFileSystem filesystem to be deleted. - */ - void deleteFilesystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException; - - /** - * Creates a file on the Azure service. - * @param azureBlobFileSystem filesystem to create file or directory. - * @param path path of the file to be created. - * @param overwrite should overwrite. - * @return OutputStream stream to the file. - */ - OutputStream createFile(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean overwrite) throws AzureBlobFileSystemException; - - /** - * Creates a directory on the Azure service. - * @param azureBlobFileSystem filesystem to create file or directory. - * @param path path of the directory to be created. - * @return OutputStream stream to the file. - */ - Void createDirectory(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException; - - /** - * Opens a file to read and returns the stream. - * @param azureBlobFileSystem filesystem to read a file from. - * @param path file path to read. - * @return InputStream a stream to the file to read. - */ - InputStream openFileForRead(AzureBlobFileSystem azureBlobFileSystem, Path path, FileSystem.Statistics statistics) throws AzureBlobFileSystemException; - - /** - * Opens a file to write and returns the stream. - * @param azureBlobFileSystem filesystem to write a file to. - * @param path file path to write. - * @param overwrite should overwrite. - * @return OutputStream a stream to the file to write. - */ - OutputStream openFileForWrite(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean overwrite) throws AzureBlobFileSystemException; - - /** - * Renames a file or directory from source to destination. - * @param azureBlobFileSystem filesystem to rename a path. - * @param source source path. - * @param destination destination path. - */ - void rename(AzureBlobFileSystem azureBlobFileSystem, Path source, Path destination) throws AzureBlobFileSystemException; - - /** - * Deletes a file or directory. - * @param azureBlobFileSystem filesystem to delete the path. - * @param path file path to be deleted. - * @param recursive true if path is a directory and recursive deletion is desired. - */ - void delete(AzureBlobFileSystem azureBlobFileSystem, Path path, boolean recursive) throws AzureBlobFileSystemException; - - /** - * Gets path's status under the provided path on the Azure service. - * @param azureBlobFileSystem filesystem to perform the get file status operation. - * @param path path delimiter. - * @return FileStatus FileStatus of the path in the file system. - */ - FileStatus getFileStatus(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException; - - /** - * Lists all the paths under the provided path on the Azure service. - * @param azureBlobFileSystem filesystem to perform the list operation. - * @param path path delimiter. - * @return FileStatus[] list of all paths in the file system. - */ - FileStatus[] listStatus(AzureBlobFileSystem azureBlobFileSystem, Path path) throws AzureBlobFileSystemException; - - /** - * Closes the client to filesystem to Azure service. - * @param azureBlobFileSystem filesystem to perform the list operation. - */ - void closeFileSystem(AzureBlobFileSystem azureBlobFileSystem) throws AzureBlobFileSystemException; - - /** - * Checks for the given path if it is marked as atomic rename directory or not. - * @param key - * @return True if the given path is listed under atomic rename property otherwise False. - */ - boolean isAtomicRenameKey(String key); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java deleted file mode 100644 index bd98bae..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AbfsServiceProvider.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ServiceResolutionException; - -/** - * Dependency injected Azure Storage services provider interface. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface AbfsServiceProvider { - /** - * Returns an instance of resolved injectable service by class name. - * The injectable service must be configured first to be resolvable. - * @param clazz the injectable service which is expected to be returned. - * @param <T> The type of injectable service. - * @return T instance - * @throws ServiceResolutionException if the service is not resolvable. - */ - <T extends InjectableService> T get(Class<T> clazz) throws ServiceResolutionException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java deleted file mode 100644 index ee40c9d..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ConfigurationService.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; - -/** - * Configuration service collects required Azure Hadoop configurations and provides it to the consumers. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface ConfigurationService extends InjectableService { - /** - * Checks if ABFS is running from Emulator; - * @return is emulator mode. - */ - boolean isEmulator(); - - /** - * Retrieves storage secure mode from Hadoop configuration; - * @return storage secure mode; - */ - boolean isSecureMode(); - - /** - * Retrieves storage account key for provided account name from Hadoop configuration. - * @param accountName the account name to retrieve the key. - * @return storage account key; - */ - String getStorageAccountKey(String accountName) throws ConfigurationPropertyNotFoundException; - - /** - * Returns Hadoop configuration. - * @return Hadoop configuration. - */ - Configuration getConfiguration(); - - /** - * Retrieves configured write buffer size - * @return the size of the write buffer - */ - int getWriteBufferSize(); - - /** - * Retrieves configured read buffer size - * @return the size of the read buffer - */ - int getReadBufferSize(); - - /** - * Retrieves configured min backoff interval - * @return min backoff interval - */ - int getMinBackoffIntervalMilliseconds(); - - /** - * Retrieves configured max backoff interval - * @return max backoff interval - */ - int getMaxBackoffIntervalMilliseconds(); - - /** - * Retrieves configured backoff interval - * @return backoff interval - */ - int getBackoffIntervalMilliseconds(); - - /** - * Retrieves configured num of retries - * @return num of retries - */ - int getMaxIoRetries(); - - /** - * Retrieves configured azure block size - * @return azure block size - */ - long getAzureBlockSize(); - - /** - * Retrieves configured azure block location host - * @return azure block location host - */ - String getAzureBlockLocationHost(); - - /** - * Retrieves configured number of concurrent threads - * @return number of concurrent write threads - */ - int getMaxConcurrentWriteThreads(); - - /** - * Retrieves configured number of concurrent threads - * @return number of concurrent read threads - */ - int getMaxConcurrentReadThreads(); - - /** - * Retrieves configured boolean for tolerating out of band writes to files - * @return configured boolean for tolerating out of band writes to files - */ - boolean getTolerateOobAppends(); - - /** - * Retrieves the comma-separated list of directories to receive special treatment so that folder - * rename is made atomic. The default value for this setting is just '/hbase'. - * Example directories list : <value>/hbase,/data</value> - * @see <a href="https://hadoop.apache.org/docs/stable/hadoop-azure/index.html#Configuring_Credentials">AtomicRenameProperty</a> - * @return atomic rename directories - */ - String getAzureAtomicRenameDirs(); - - /** - * Retrieves configured boolean for creating remote file system during initialization - * @return configured boolean for creating remote file system during initialization - */ - boolean getCreateRemoteFileSystemDuringInitialization(); - - /** - * Retrieves configured value of read ahead queue - * @return depth of read ahead - */ - int getReadAheadQueueDepth(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java deleted file mode 100644 index 8b3801f..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/InjectableService.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Marker interface for all the injectable services. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public interface InjectableService { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java deleted file mode 100644 index 267d11f..0000000 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/TracingService.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs.contracts.services; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.htrace.core.SpanId; -import org.apache.htrace.core.TraceScope; - -/** - * Azure Blob File System tracing service. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface TracingService extends InjectableService { - /** - * Creates a {@link TraceScope} object with the provided description. - * @param description the trace description. - * @return created traceScope. - */ - TraceScope traceBegin(String description); - - /** - * Creates a {@link TraceScope} object with the provided description. - * @param description the trace description. - * @param parentSpanId the span id of the parent trace scope. - * @return create traceScope - */ - TraceScope traceBegin(String description, SpanId parentSpanId); - - /** - * Gets current thread latest generated traceScope id. - * @return current thread latest generated traceScope id. - */ - SpanId getCurrentTraceScopeSpanId(); - - /** - * Appends the provided exception to the trace scope. - * @param traceScope the scope which exception needs to be attached to. - * @param azureBlobFileSystemException the exception to be attached to the scope. - */ - void traceException(TraceScope traceScope, AzureBlobFileSystemException azureBlobFileSystemException); - - /** - * Ends the provided traceScope. - * @param traceScope the scope that needs to be ended. - */ - void traceEnd(TraceScope traceScope); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c17a5c1..a78e7af 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -28,7 +28,6 @@ import java.util.Locale; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; -import org.apache.hadoop.fs.azurebfs.contracts.services.ConfigurationService; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; @@ -45,17 +44,17 @@ public class AbfsClient { private final String xMsVersion = "2018-03-28"; private final ExponentialRetryPolicy retryPolicy; private final String filesystem; - private final ConfigurationService configurationService; + private final AbfsConfiguration abfsConfiguration; private final String userAgent; public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, - final ConfigurationService configurationService, + final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1); - this.configurationService = configurationService; + this.abfsConfiguration = abfsConfiguration; this.retryPolicy = exponentialRetryPolicy; this.userAgent = initializeUserAgent(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a271fd0e/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java new file mode 100644 index 0000000..8def1bb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConfiguration.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.lang.reflect.Field; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.Base64StringConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.BooleanConfigurationValidatorAnnotation; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; +import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; + +/** + * Configuration for Azure Blob FileSystem. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AbfsConfiguration{ + private final Configuration configuration; + private final boolean isSecure; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) + private int writeBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, + MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, + MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) + private int readBufferSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) + private int minBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) + private int maxBackoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, + DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) + private int backoffInterval; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, + MinValue = 0, + DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) + private int maxIoRetries; + + @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, + MinValue = 0, + MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, + DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) + private long azureBlockSize; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) + private String azureBlockLocationHost; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) + private int maxConcurrentWriteThreads; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, + MinValue = 1, + DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) + private int maxConcurrentReadThreads; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) + private boolean tolerateOobAppends; + + @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, + DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) + private String azureAtomicDirs; + + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, + DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) + private boolean createRemoteFileSystemDuringInitialization; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, + DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) + private int readAheadQueueDepth; + + private Map<String, String> storageAccountKeys; + + public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { + this.configuration = configuration; + this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); + + validateStorageAccountKeys(); + Field[] fields = this.getClass().getDeclaredFields(); + for (Field field : fields) { + field.setAccessible(true); + if (field.isAnnotationPresent(IntegerConfigurationValidatorAnnotation.class)) { + field.set(this, validateInt(field)); + } else if (field.isAnnotationPresent(LongConfigurationValidatorAnnotation.class)) { + field.set(this, validateLong(field)); + } else if (field.isAnnotationPresent(StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateString(field)); + } else if (field.isAnnotationPresent(Base64StringConfigurationValidatorAnnotation.class)) { + field.set(this, validateBase64String(field)); + } else if (field.isAnnotationPresent(BooleanConfigurationValidatorAnnotation.class)) { + field.set(this, validateBoolean(field)); + } + } + } + + public boolean isEmulator() { + return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + } + + public boolean isSecureMode() { + return this.isSecure; + } + + public String getStorageAccountKey(final String accountName) throws ConfigurationPropertyNotFoundException { + String accountKey = this.storageAccountKeys.get(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName); + if (accountKey == null) { + throw new ConfigurationPropertyNotFoundException(accountName); + } + + return accountKey; + } + + public Configuration getConfiguration() { + return this.configuration; + } + + public int getWriteBufferSize() { + return this.writeBufferSize; + } + + public int getReadBufferSize() { + return this.readBufferSize; + } + + public int getMinBackoffIntervalMilliseconds() { + return this.minBackoffInterval; + } + + public int getMaxBackoffIntervalMilliseconds() { + return this.maxBackoffInterval; + } + + public int getBackoffIntervalMilliseconds() { + return this.backoffInterval; + } + + public int getMaxIoRetries() { + return this.maxIoRetries; + } + + public long getAzureBlockSize() { + return this.azureBlockSize; + } + + public String getAzureBlockLocationHost() { + return this.azureBlockLocationHost; + } + + public int getMaxConcurrentWriteThreads() { + return this.maxConcurrentWriteThreads; + } + + public int getMaxConcurrentReadThreads() { + return this.maxConcurrentReadThreads; + } + + public boolean getTolerateOobAppends() { + return this.tolerateOobAppends; + } + + public String getAzureAtomicRenameDirs() { + return this.azureAtomicDirs; + } + + public boolean getCreateRemoteFileSystemDuringInitialization() { + return this.createRemoteFileSystemDuringInitialization; + } + + public int getReadAheadQueueDepth() { + return this.readAheadQueueDepth; + } + + void validateStorageAccountKeys() throws InvalidConfigurationValueException { + Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( + ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); + this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); + + for (Map.Entry<String, String> account : this.storageAccountKeys.entrySet()) { + validator.validate(account.getValue()); + } + } + + int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new IntegerConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new LongConfigurationBasicValidator( + validator.MinValue(), + validator.MaxValue(), + validator.DefaultValue(), + validator.ConfigurationKey(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new Base64StringConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { + BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); + String value = this.configuration.get(validator.ConfigurationKey()); + + // validate + return new BooleanConfigurationBasicValidator( + validator.ConfigurationKey(), + validator.DefaultValue(), + validator.ThrowIfInvalid()).validate(value); + } + + @VisibleForTesting + void setReadBufferSize(int bufferSize) { + this.readBufferSize = bufferSize; + } + + @VisibleForTesting + void setWriteBufferSize(int bufferSize) { + this.writeBufferSize = bufferSize; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org