http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index a78e7af..2b3ccc0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -26,14 +26,17 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; + + +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*; +import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; + /** * AbfsClient */ @@ -53,7 +56,7 @@ public class AbfsClient { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); - this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(AbfsHttpConstants.FORWARD_SLASH) + 1); + this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1); this.abfsConfiguration = abfsConfiguration; this.retryPolicy = exponentialRetryPolicy; this.userAgent = initializeUserAgent(); @@ -73,19 +76,19 @@ public class AbfsClient { List<AbfsHttpHeader> createDefaultHeaders() { final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>(); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_VERSION, xMsVersion)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT, AbfsHttpConstants.APPLICATION_JSON - + AbfsHttpConstants.COMMA + AbfsHttpConstants.SINGLE_WHITE_SPACE + AbfsHttpConstants.APPLICATION_OCTET_STREAM)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.ACCEPT_CHARSET, - AbfsHttpConstants.UTF_8)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.CONTENT_TYPE, AbfsHttpConstants.EMPTY_STRING)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.USER_AGENT, userAgent)); + requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion)); + requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON + + COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM)); + requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET, + UTF_8)); + requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING)); + requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent)); return requestHeaders; } AbfsUriQueryBuilder createDefaultUriQueryBuilder() { final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_TIMEOUT, AbfsHttpConstants.DEFAULT_TIMEOUT); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT); return abfsUriQueryBuilder; } @@ -93,12 +96,12 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); @@ -109,19 +112,19 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, - AbfsHttpConstants.HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); @@ -133,16 +136,16 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, relativePath == null ? "" : urlEncode(relativePath)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults)); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_GET, + HTTP_METHOD_GET, url, requestHeaders); op.execute(); @@ -153,12 +156,12 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_HEAD, + HTTP_METHOD_HEAD, url, requestHeaders); op.execute(); @@ -169,12 +172,12 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, AbfsHttpConstants.FILESYSTEM); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM); final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_DELETE, + HTTP_METHOD_DELETE, url, requestHeaders); op.execute(); @@ -185,16 +188,16 @@ public class AbfsClient { throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); if (!overwrite) { - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, "*")); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, "*")); } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RESOURCE, isFile ? AbfsHttpConstants.FILE : AbfsHttpConstants.DIRECTORY); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); @@ -205,17 +208,17 @@ public class AbfsClient { throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - final String encodedRenameSource = urlEncode(AbfsHttpConstants.FORWARD_SLASH + this.getFileSystem() + source); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_RENAME_SOURCE, encodedRenameSource)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_NONE_MATCH, AbfsHttpConstants.STAR)); + final String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source); + requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource)); + requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); @@ -227,17 +230,17 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, - AbfsHttpConstants.HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.APPEND_ACTION); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders, buffer, offset, length); op.execute(); @@ -245,44 +248,46 @@ public class AbfsClient { } - public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) throws AzureBlobFileSystemException { + public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData) + throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, - AbfsHttpConstants.HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.FLUSH_ACTION); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_POSITION, Long.toString(position)); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData)); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); return op; } - public AbfsRestOperation setPathProperties(final String path, final String properties) throws AzureBlobFileSystemException { + public AbfsRestOperation setPathProperties(final String path, final String properties) + throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); // JDK7 does not support PATCH, so to workaround the issue we will use // PUT and specify the real method in the X-Http-Method-Override header. - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE, - AbfsHttpConstants.HTTP_METHOD_PATCH)); + requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE, + HTTP_METHOD_PATCH)); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PROPERTIES, properties)); + requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_PROPERTIES_ACTION); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_PUT, + HTTP_METHOD_PUT, url, requestHeaders); op.execute(); @@ -297,7 +302,7 @@ public class AbfsClient { final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_HEAD, + HTTP_METHOD_HEAD, url, requestHeaders); op.execute(); @@ -307,9 +312,9 @@ public class AbfsClient { public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset, final int bufferLength, final String eTag) throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.RANGE, + requestHeaders.add(new AbfsHttpHeader(RANGE, String.format("bytes=%d-%d", position, position + bufferLength - 1))); - requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); + requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -317,7 +322,7 @@ public class AbfsClient { final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_GET, + HTTP_METHOD_GET, url, requestHeaders, buffer, @@ -333,13 +338,13 @@ public class AbfsClient { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); - abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_CONTINUATION, continuation); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive)); + abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation); final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( this, - AbfsHttpConstants.HTTP_METHOD_DELETE, + HTTP_METHOD_DELETE, url, requestHeaders); op.execute(); @@ -347,7 +352,7 @@ public class AbfsClient { } private URL createRequestUrl(final String query) throws AzureBlobFileSystemException { - return createRequestUrl(AbfsHttpConstants.EMPTY_STRING, query); + return createRequestUrl(EMPTY_STRING, query); } private URL createRequestUrl(final String path, final String query) @@ -357,8 +362,8 @@ public class AbfsClient { try { encodedPath = urlEncode(path); } catch (AzureBlobFileSystemException ex) { - this.LOG.debug( - "Unexpected error.", ex); + LOG.debug("Unexpected error.", ex); + throw new InvalidUriException(path); } final StringBuilder sb = new StringBuilder(); @@ -378,9 +383,9 @@ public class AbfsClient { private static String urlEncode(final String value) throws AzureBlobFileSystemException { String encodedString = null; try { - encodedString = URLEncoder.encode(value, AbfsHttpConstants.UTF_8) - .replace(AbfsHttpConstants.PLUS, AbfsHttpConstants.PLUS_ENCODE) - .replace(AbfsHttpConstants.FORWARD_SLASH_ENCODE, AbfsHttpConstants.FORWARD_SLASH); + encodedString = URLEncoder.encode(value, UTF_8) + .replace(PLUS, PLUS_ENCODE) + .replace(FORWARD_SLASH_ENCODE, FORWARD_SLASH); } catch (UnsupportedEncodingException ex) { throw new InvalidUriException(value); } @@ -391,11 +396,11 @@ public class AbfsClient { private String initializeUserAgent() { final String userAgentComment = String.format(Locale.ROOT, "(JavaJRE %s; %s %s)", - System.getProperty(AbfsHttpConstants.JAVA_VERSION), - System.getProperty(AbfsHttpConstants.OS_NAME) - .replaceAll(AbfsHttpConstants.SINGLE_WHITE_SPACE, AbfsHttpConstants.EMPTY_STRING), - System.getProperty(AbfsHttpConstants.OS_VERSION)); + System.getProperty(JAVA_VERSION), + System.getProperty(OS_NAME) + .replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING), + System.getProperty(OS_VERSION)); - return String.format(AbfsHttpConstants.CLIENT_VERSION + " %s", userAgentComment); + return String.format(CLIENT_VERSION + " %s", userAgentComment); } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 0ea9365..53f6900 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -30,12 +30,12 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Represents an HTTP operation. @@ -427,4 +427,4 @@ public class AbfsHttpOperation { private boolean isNullInputStream(InputStream stream) { return stream == null ? true : false; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 6554380..848ce8a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** - * The AbfsInputStream for AbfsClient + * The AbfsInputStream for AbfsClient. */ public class AbfsInputStream extends FSInputStream { private final AbfsClient client; @@ -59,7 +59,6 @@ public class AbfsInputStream extends FSInputStream { final int bufferSize, final int readAheadQueueDepth, final String eTag) { - super(); this.client = client; this.statistics = statistics; this.path = path; @@ -379,4 +378,4 @@ public class AbfsInputStream extends FSInputStream { public boolean markSupported() { return false; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index de5c934..2dbcee5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.LinkedBlockingQueue; @@ -35,7 +36,7 @@ import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; /** - * The BlobFsOutputStream for Rest AbfsClient + * The BlobFsOutputStream for Rest AbfsClient. */ public class AbfsOutputStream extends OutputStream implements Syncable { private final AbfsClient client; @@ -79,8 +80,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable { maxConcurrentRequestCount, 10L, TimeUnit.SECONDS, - new LinkedBlockingQueue()); - this.completionService = new ExecutorCompletionService(this.threadExecutor); + new LinkedBlockingQueue<>()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); } /** @@ -111,9 +112,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { @Override public synchronized void write(final byte[] data, final int off, final int length) throws IOException { - if (this.lastError != null) { - throw this.lastError; - } + maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -143,6 +142,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable { } /** + * Throw the last error recorded if not null. + * After the stream is closed, this is always set to + * an exception, so acts as a guard against method invocation once + * closed. + * @throws IOException if lastError is set + */ + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** * Flushes this output stream and forces any buffered output bytes to be * written out. If any data remains in the payload it is committed to the * service. Data is queued for writing and forced out to the service @@ -150,7 +162,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void flush() throws IOException { - this.flushInternalAsync(); + flushInternalAsync(); } /** Similar to posix fsync, flush out the data in client's user buffer @@ -159,7 +171,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hsync() throws IOException { - this.flushInternal(); + flushInternal(); } /** Flush out the data in client's user buffer. After the return of @@ -168,7 +180,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ @Override public void hflush() throws IOException { - this.flushInternal(); + flushInternal(); } /** @@ -186,34 +198,30 @@ public class AbfsOutputStream extends OutputStream implements Syncable { } try { - this.flushInternal(); - this.threadExecutor.shutdown(); + flushInternal(); + threadExecutor.shutdown(); } finally { - this.lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - this.buffer = null; - this.bufferIndex = 0; - this.closed = true; - this.writeOperations.clear(); - if (!this.threadExecutor.isShutdown()) { - this.threadExecutor.shutdownNow(); + lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + buffer = null; + bufferIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); } } } private synchronized void flushInternal() throws IOException { - if (this.lastError != null) { - throw this.lastError; - } - this.writeCurrentBufferToService(); - this.flushWrittenBytesToService(); + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(); } private synchronized void flushInternalAsync() throws IOException { - if (this.lastError != null) { - throw this.lastError; - } - this.writeCurrentBufferToService(); - this.flushWrittenBytesToServiceAsync(); + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); } private synchronized void writeCurrentBufferToService() throws IOException { @@ -221,19 +229,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable { return; } - final byte[] bytes = this.buffer; + final byte[] bytes = buffer; final int bytesLength = bufferIndex; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; - final long offset = this.position; - this.position += bytesLength; + buffer = new byte[bufferSize]; + bufferIndex = 0; + final long offset = position; + position += bytesLength; - if (this.threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - this.waitForTaskToComplete(); + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + waitForTaskToComplete(); } - final Future job = this.completionService.submit(new Callable<Void>() { + final Future<Void> job = completionService.submit(new Callable<Void>() { @Override public Void call() throws Exception { client.append(path, offset, bytes, 0, @@ -242,25 +250,25 @@ public class AbfsOutputStream extends OutputStream implements Syncable { } }); - this.writeOperations.add(new WriteOperation(job, offset, bytesLength)); + writeOperations.add(new WriteOperation(job, offset, bytesLength)); // Try to shrink the queue shrinkWriteOperationQueue(); } private synchronized void flushWrittenBytesToService() throws IOException { - for (WriteOperation writeOperation : this.writeOperations) { + for (WriteOperation writeOperation : writeOperations) { try { writeOperation.task.get(); } catch (Exception ex) { - if (AzureBlobFileSystemException.class.isInstance(ex.getCause())) { - ex = AzureBlobFileSystemException.class.cast(ex.getCause()); + if (ex.getCause() instanceof AzureBlobFileSystemException) { + ex = (AzureBlobFileSystemException)ex.getCause(); } - this.lastError = new IOException(ex); - throw this.lastError; + lastError = new IOException(ex); + throw lastError; } } - flushWrittenBytesToServiceInternal(this.position, false); + flushWrittenBytesToServiceInternal(position, false); } private synchronized void flushWrittenBytesToServiceAsync() throws IOException { @@ -273,7 +281,8 @@ public class AbfsOutputStream extends OutputStream implements Syncable { this.lastTotalAppendOffset = 0; } - private synchronized void flushWrittenBytesToServiceInternal(final long offset, final boolean retainUncommitedData) throws IOException { + private synchronized void flushWrittenBytesToServiceInternal(final long offset, + final boolean retainUncommitedData) throws IOException { try { client.flush(path, offset, retainUncommitedData); } catch (AzureBlobFileSystemException ex) { @@ -288,31 +297,33 @@ public class AbfsOutputStream extends OutputStream implements Syncable { */ private synchronized void shrinkWriteOperationQueue() throws IOException { try { - while (this.writeOperations.peek() != null && this.writeOperations.peek().task.isDone()) { - this.writeOperations.peek().task.get(); - this.lastTotalAppendOffset += this.writeOperations.peek().length; - this.writeOperations.remove(); + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); } } catch (Exception e) { - if (AzureBlobFileSystemException.class.isInstance(e.getCause())) { - this.lastError = IOException.class.cast(e.getCause()); + if (e.getCause() instanceof AzureBlobFileSystemException) { + lastError = (AzureBlobFileSystemException)e.getCause(); } else { - this.lastError = new IOException(e); + lastError = new IOException(e); } - throw this.lastError; + throw lastError; } } private void waitForTaskToComplete() throws IOException { boolean completed; - for (completed = false; this.completionService.poll() != null; completed = true) {} + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } if (!completed) { try { - this.completionService.take(); + completionService.take(); } catch (InterruptedException e) { - this.lastError = new IOException(e); - throw this.lastError; + lastError = (IOException)new InterruptedIOException(e.toString()).initCause(e); + throw lastError; } } } @@ -332,4 +343,4 @@ public class AbfsOutputStream extends OutputStream implements Syncable { this.length = length; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 17fc35a..6126398 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -23,15 +23,16 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; -import org.slf4j.Logger; - /** - * The AbfsRestOperation for Rest AbfsClient + * The AbfsRestOperation for Rest AbfsClient. */ public class AbfsRestOperation { // Blob FS client, which has the credentials, retry policy, and logs. @@ -47,7 +48,7 @@ public class AbfsRestOperation { // request body and all the download methods have a response body. private final boolean hasRequestBody; - private final Logger logger; + private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); // For uploads, this is the request entity body. For downloads, // this will hold the response entity body. @@ -79,7 +80,6 @@ public class AbfsRestOperation { this.requestHeaders = requestHeaders; this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); - this.logger = client.LOG; } /** @@ -150,11 +150,11 @@ public class AbfsRestOperation { httpOperation.processResponse(buffer, bufferOffset, bufferLength); } catch (IOException ex) { - if (logger.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (httpOperation != null) { - logger.debug("HttpRequestFailure: " + httpOperation.toString(), ex); + LOG.debug("HttpRequestFailure: " + httpOperation.toString(), ex); } else { - logger.debug("HttpRequestFailure: " + method + "," + url, ex); + LOG.debug("HttpRequestFailure: " + method + "," + url, ex); } } if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { @@ -163,8 +163,8 @@ public class AbfsRestOperation { return false; } - if (logger.isDebugEnabled()) { - logger.debug("HttpRequest: " + httpOperation.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("HttpRequest: " + httpOperation.toString()); } if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { @@ -175,4 +175,4 @@ public class AbfsRestOperation { return true; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java index bac66af..3624853 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsUriQueryBuilder.java @@ -18,13 +18,13 @@ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; - import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; + /** - * The UrlQueryBuilder for Rest AbfsClient + * The UrlQueryBuilder for Rest AbfsClient. */ public class AbfsUriQueryBuilder { private Map<String, String> parameters; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 54aa1ab..5eb7a66 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -43,7 +43,7 @@ public class ExponentialRetryPolicy { private static final int DEFAULT_MAX_BACKOFF = 1000 * 30; /** - *Represents the default minimum amount of time used when calculating the exponential + * Represents the default minimum amount of time used when calculating the exponential * delay between retries. */ private static final int DEFAULT_MIN_BACKOFF = 1000 * 3; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 1fac13d..00e4f00 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -18,10 +18,10 @@ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; - import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + class ReadBuffer { private AbfsInputStream stream; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 164e549..5b71cf0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -28,7 +28,7 @@ import java.util.Stack; import java.util.concurrent.CountDownLatch; /** - * The Read Buffer Manager for Rest AbfsClient + * The Read Buffer Manager for Rest AbfsClient. */ final class ReadBufferManager { private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); @@ -40,11 +40,11 @@ final class ReadBufferManager { private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<Integer>(); // indices in buffers[] array that are available + private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - private Queue<ReadBuffer> readAheadQueue = new LinkedList<ReadBuffer>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<ReadBuffer>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<ReadBuffer>(); // buffers available for reading + private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block static { @@ -85,7 +85,7 @@ final class ReadBufferManager { /** - * {@link AbfsInputStream} calls this method to queue read-aheads + * {@link AbfsInputStream} calls this method to queue read-aheads. * * @param stream The {@link AbfsInputStream} for which to do the read-ahead * @param requestedOffset The offset in the file which shoukd be read @@ -93,15 +93,15 @@ final class ReadBufferManager { */ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for " + stream.getPath() + " offset " + requestedOffset - + " length " + requestedLength); + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); } ReadBuffer buffer; synchronized (this) { if (isAlreadyQueued(stream, requestedOffset)) { return; // already queued, do not queue again } - if (freeList.size() == 0 && !tryEvict()) { + if (freeList.isEmpty() && !tryEvict()) { return; // no buffers available, cannot queue anything } @@ -121,8 +121,8 @@ final class ReadBufferManager { notifyAll(); } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file " + stream.getPath() + " offset " + requestedOffset - + " buffer idx " + buffer.getBufferindex()); + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); } } @@ -144,7 +144,8 @@ final class ReadBufferManager { int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file " + stream.getPath() + " position " + position + " thread " + Thread.currentThread().getName()); + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); } waitForProcess(stream, position); @@ -155,12 +156,13 @@ final class ReadBufferManager { } if (bytesRead > 0) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for " + stream.getPath() + " position " + position + " length " + bytesRead); + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); } return bytesRead; } - // otherwise, just say we got nothing - calling thread can do it's own read + // otherwise, just say we got nothing - calling thread can do its own read return 0; } @@ -179,8 +181,8 @@ final class ReadBufferManager { if (readBuf != null) { // if in in-progress queue, then block for it try { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file " + stream.getPath() + " offset " + readBuf.getOffset() - + " buffer idx " + readBuf.getBufferindex()); + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); } readBuf.getLatch().await(); // blocking wait on the caller stream's thread // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread @@ -193,8 +195,8 @@ final class ReadBufferManager { Thread.currentThread().interrupt(); } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file " + stream.getPath() + " buffer idx " + readBuf.getBufferindex() - + " length " + readBuf.getLength()); + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); } } } @@ -254,8 +256,8 @@ final class ReadBufferManager { freeList.push(buf.getBufferindex()); completedReadList.remove(buf); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx " + buf.getBufferindex() + "; was used for file " + buf.getStream().getPath() - + " offset " + buf.getOffset() + " length " + buf.getLength()); + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); } return true; } @@ -344,13 +346,14 @@ final class ReadBufferManager { inProgressList.add(buffer); } if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset()); + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); } return buffer; } /** - * ReadBufferWorker thread calls this method to post completion + * ReadBufferWorker thread calls this method to post completion. * * @param buffer the buffer whose read was completed * @param result the {@link ReadBufferStatus} after the read operation in the worker thread @@ -358,8 +361,8 @@ final class ReadBufferManager { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file " + buffer.getStream().getPath() + " for offset " + buffer.getOffset() - + " bytes " + bytesActuallyRead); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); } synchronized (this) { inProgressList.remove(buffer); @@ -380,8 +383,9 @@ final class ReadBufferManager { /** * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing, - * so it is much more suitable to measuring intervals. + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. * * @return current time in milliseconds */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index 2d0c96e..af69de0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -18,10 +18,10 @@ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; - import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + class ReadBufferWorker implements Runnable { protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java index dd59892..105a1a2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/SharedKeyCredentials.java @@ -22,6 +22,7 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; +import java.net.URL; import java.net.URLDecoder; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -38,11 +39,11 @@ import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.codec.Charsets; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.Charsets; /** * Represents the shared key credentials used to access an Azure Storage * account. @@ -89,7 +90,7 @@ public class SharedKeyCredentials { } private String computeHmac256(final String stringToSign) { - byte[] utf8Bytes = null; + byte[] utf8Bytes; try { utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8); } catch (final UnsupportedEncodingException e) { @@ -158,7 +159,7 @@ public class SharedKeyCredentials { } /** - * Initialie the HmacSha256 associated with the account key. + * Initialize the HmacSha256 associated with the account key. */ private void initializeMac() { // Initializes the HMAC-SHA256 Mac and SecretKey. @@ -171,7 +172,7 @@ public class SharedKeyCredentials { } /** - * Append a string to a string builder with a newline constant + * Append a string to a string builder with a newline constant. * * @param builder the StringBuilder object * @param element the string to append. @@ -194,9 +195,10 @@ public class SharedKeyCredentials { * @param conn the HttpURLConnection for the operation. * @return A canonicalized string. */ - private static String canonicalizeHttpRequest(final java.net.URL address, final String accountName, - final String method, final String contentType, final long contentLength, final String date, - final HttpURLConnection conn) throws UnsupportedEncodingException { + private static String canonicalizeHttpRequest(final URL address, + final String accountName, final String method, final String contentType, + final long contentLength, final String date, final HttpURLConnection conn) + throws UnsupportedEncodingException { // The first element should be the Method of the request. // I.e. GET, POST, PUT, or HEAD. @@ -246,7 +248,8 @@ public class SharedKeyCredentials { * @param accountName the account name for the request. * @return the canonicalized resource string. */ - private static String getCanonicalizedResource(final java.net.URL address, final String accountName) throws UnsupportedEncodingException { + private static String getCanonicalizedResource(final URL address, + final String accountName) throws UnsupportedEncodingException { // Resource path final StringBuilder resourcepath = new StringBuilder(AbfsHttpConstants.FORWARD_SLASH); resourcepath.append(accountName); @@ -263,7 +266,7 @@ public class SharedKeyCredentials { final Map<String, String[]> queryVariables = parseQueryString(address.getQuery()); - final Map<String, String> lowercasedKeyNameValue = new HashMap<String, String>(); + final Map<String, String> lowercasedKeyNameValue = new HashMap<>(); for (final Entry<String, String[]> entry : queryVariables.entrySet()) { // sort the value and organize it as comma separated values @@ -303,14 +306,17 @@ public class SharedKeyCredentials { } /** - * Gets all the values for the given header in the one to many map, performs a trimStart() on each return value + * Gets all the values for the given header in the one to many map, + * performs a trimStart() on each return value. * * @param headers a one to many map of key / values representing the header values for the connection. * @param headerName the name of the header to lookup * @return an ArrayList<String> of all trimmed values corresponding to the requested headerName. This may be empty * if the header is not found. */ - private static ArrayList<String> getHeaderValues(final Map<String, List<String>> headers, final String headerName) { + private static ArrayList<String> getHeaderValues( + final Map<String, List<String>> headers, + final String headerName) { final ArrayList<String> arrayOfValues = new ArrayList<String>(); List<String> values = null; @@ -338,7 +344,7 @@ public class SharedKeyCredentials { * @return a HashMap<String, String[]> of the key values. */ private static HashMap<String, String[]> parseQueryString(String parseString) throws UnsupportedEncodingException { - final HashMap<String, String[]> retVals = new HashMap<String, String[]>(); + final HashMap<String, String[]> retVals = new HashMap<>(); if (parseString == null || parseString.isEmpty()) { return retVals; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md new file mode 100644 index 0000000..a4b3483 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -0,0 +1,72 @@ +<!--- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +# Hadoop Azure Support: ABFS â Azure Data Lake Storage Gen2 + +<!-- MACRO{toc|fromDepth=1|toDepth=3} --> + +## Introduction + +The `hadoop-azure` module provides support for the Azure Data Lake Storage Gen2 +storage layer through the "abfs" connector + +To make it part of Apache Hadoop's default classpath, simply make sure that +`HADOOP_OPTIONAL_TOOLS` in `hadoop-env.sh` has `hadoop-azure` in the list. + +## Features + +* Read and write data stored in an Azure Blob Storage account. +* *Fully Consistent* view of the storage across all clients. +* Can read data written through the wasb: connector. +* Present a hierarchical file system view by implementing the standard Hadoop + [`FileSystem`](../api/org/apache/hadoop/fs/FileSystem.html) interface. +* Supports configuration of multiple Azure Blob Storage accounts. +* Can act as a source or destination of data in Hadoop MapReduce, Apache Hive, Apache Spark +* Tested at scale on both Linux and Windows. +* Can be used as a replacement for HDFS on Hadoop clusters deployed in Azure infrastructure. + + + +## Limitations + +* File last access time is not tracked. + + +## Technical notes + +### Security + +### Consistency and Concurrency + +*TODO*: complete/review + +The abfs client has a fully consistent view of the store, which has complete Create Read Update and Delete consistency for data and metadata. +(Compare and contrast with S3 which only offers Create consistency; S3Guard adds CRUD to metadata, but not the underlying data). + +### Performance + +*TODO*: check these. + +* File Rename: `O(1)`. +* Directory Rename: `O(files)`. +* Directory Delete: `O(files)`. + +## Testing ABFS + +See the relevant section in [Testing Azure](testing_azure.html). + +## References + +* [A closer look at Azure Data Lake Storage Gen2](https://azure.microsoft.com/en-gb/blog/a-closer-look-at-azure-data-lake-storage-gen2/); +MSDN Article from June 28, 2018. http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md index b58e68b..c148807 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/testing_azure.md @@ -574,3 +574,79 @@ mvn test -Dtest=CleanupTestContainers This will delete the containers; the output log of the test run will provide the details and summary of the operation. + + +## Testing ABFS + +The ABFS Connector tests share the same account as the wasb tests; this is +needed for cross-connector compatibility tests. + +This makes for a somewhat complex set of configuration options. + +Here are the settings for an account `ACCOUNTNAME` + +```xml +<property> + <name>abfs.account.name</name> + <value>ACCOUNTNAME</value> +</property> + +<property> + <name>abfs.account.full.name</name> + <value>${abfs.account.name}.dfs.core.windows.net</value> +</property> + +<property> + <name>abfs.account.key</name> + <value>SECRETKEY==</value> +</property> + +<property> + <name>fs.azure.account.key.ACCOUNTNAME.dfs.core.windows.net</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.account.key.ACCOUNTNAME.blob.core.windows.net</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.test.account.key.ACCOUNTNAME.dfs.core.windows.net</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.test.account.key.ACCOUNTNAME.blob.core.windows.net</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.account.key.ACCOUNTNAME</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.test.account.key.ACCOUNTNAME</name> + <value>${abfs.account.key}</value> +</property> + +<property> + <name>fs.azure.test.account.name</name> + <value>${abfs.account.full.name}</value> +</property> + +<property> + <name>fs.contract.test.fs.abfs</name> + <value>abfs://testcontai...@accountname.dfs.core.windows.net</value> + <description>Container for contract tests</description> +</property> + +<property> + <name>fs.contract.test.fs.abfss</name> + <value>abfss://testcontai...@accountname.dfs.core.windows.net</value> + <description>Container for contract tests</description> +</property> + + +``` http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java index fd21bd2..db4a843 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/contract/ITestAzureNativeContractAppend.java @@ -18,10 +18,19 @@ package org.apache.hadoop.fs.azure.contract; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractAppendTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Append test, skipping one of them. @@ -38,4 +47,18 @@ public class ITestAzureNativeContractAppend extends AbstractContractAppendTest { public void testRenameFileBeingAppended() throws Throwable { skip("Skipping as renaming an opened file is not supported"); } + + /** + * Wasb returns a different exception, so change the intercept logic here. + */ + @Override + @Test + public void testAppendDirectory() throws Exception { + final FileSystem fs = getFileSystem(); + + final Path folderPath = path("testAppendDirectory"); + fs.mkdirs(folderPath); + intercept(FileNotFoundException.class, + () -> fs.append(folderPath)); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java new file mode 100644 index 0000000..106fa09 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.net.URI; +import java.util.Hashtable; +import java.util.UUID; +import java.util.concurrent.Callable; + +import com.google.common.base.Preconditions; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout; +import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.fs.azure.integration.AzureTestConstants; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.utils.UriUtils; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.IOUtils; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.junit.Assume.assumeTrue; +import static org.hamcrest.CoreMatchers.*; + +/** + * Base for AzureBlobFileSystem Integration tests. + * + * <I>Important: This is for integration tests only.</I> + */ +public abstract class AbstractAbfsIntegrationTest extends + AbstractWasbTestWithTimeout { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class); + + private final boolean isEmulator; + private NativeAzureFileSystem wasb; + private AzureBlobFileSystem abfs; + private String abfsScheme; + + private Configuration configuration; + private String fileSystemName; + private String accountName; + private String testUrl; + + protected AbstractAbfsIntegrationTest(final boolean secure) { + this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME); + } + + protected AbstractAbfsIntegrationTest() { + this(FileSystemUriSchemes.ABFS_SCHEME); + } + + private AbstractAbfsIntegrationTest(final String scheme) { + abfsScheme = scheme; + fileSystemName = ABFS_TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); + configuration = new Configuration(); + configuration.addResource(ABFS_TEST_RESOURCE_XML); + + String accountName = configuration.get(FS_AZURE_TEST_ACCOUNT_NAME, ""); + assumeTrue("Not set: " + FS_AZURE_TEST_ACCOUNT_NAME, + !accountName.isEmpty()); + assertThat("The key in " + FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + + " is not bound to an ABFS account", + accountName, containsString("dfs.core.windows.net")); + String fullKey = FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + + accountName; + assumeTrue("Not set: " + fullKey, + configuration.get(fullKey) != null); + + final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName(); + URI defaultUri = null; + + try { + defaultUri = new URI(abfsScheme, abfsUrl, null, null, null); + } catch (Exception ex) { + throw new AssertionError(ex); + } + + this.testUrl = defaultUri.toString(); + configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); + configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); + this.isEmulator = this.configuration.getBoolean(FS_AZURE_EMULATOR_ENABLED, false); + this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME); + } + + + @Before + public void setup() throws Exception { + //Create filesystem first to make sure getWasbFileSystem() can return an existing filesystem. + createFileSystem(); + + if (!isEmulator) { + final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl())); + final AzureNativeFileSystemStore azureNativeFileSystemStore = + new AzureNativeFileSystemStore(); + azureNativeFileSystemStore.initialize( + wasbUri, + getConfiguration(), + new AzureFileSystemInstrumentation(getConfiguration())); + + wasb = new NativeAzureFileSystem(azureNativeFileSystemStore); + wasb.initialize(wasbUri, configuration); + } + } + + @After + public void teardown() throws Exception { + try { + IOUtils.closeStream(wasb); + wasb = null; + + if (abfs == null) { + return; + } + + final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore(); + abfsStore.deleteFilesystem(); + + AbfsRestOperationException ex = intercept( + AbfsRestOperationException.class, + new Callable<Hashtable<String, String>>() { + @Override + public Hashtable<String, String> call() throws Exception { + return abfsStore.getFilesystemProperties(); + } + }); + if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) { + LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex); + } + } catch (Exception e) { + LOG.warn("During cleanup: {}", e, e); + } finally { + IOUtils.closeStream(abfs); + abfs = null; + } + } + + public AzureBlobFileSystem getFileSystem() throws IOException { + return abfs; + } + + /** + * Creates the filesystem; updates the {@link #abfs} field. + * @return the created filesystem. + * @throws IOException failure during create/init. + */ + public AzureBlobFileSystem createFileSystem() throws IOException { + Preconditions.checkState(abfs == null, + "existing ABFS instance exists: %s", abfs); + abfs = (AzureBlobFileSystem) FileSystem.newInstance(configuration); + return abfs; + } + + + protected NativeAzureFileSystem getWasbFileSystem() { + return wasb; + } + + protected String getHostName() { + return configuration.get(FS_AZURE_TEST_HOST_NAME); + } + + protected void setTestUrl(String testUrl) { + this.testUrl = testUrl; + } + + protected String getTestUrl() { + return testUrl; + } + + protected void setFileSystemName(String fileSystemName) { + this.fileSystemName = fileSystemName; + } + protected String getFileSystemName() { + return fileSystemName; + } + + protected String getAccountName() { + return configuration.get(FS_AZURE_TEST_ACCOUNT_NAME); + } + + protected String getAccountKey() { + return configuration.get( + FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + + getAccountName()); + } + + protected Configuration getConfiguration() { + return configuration; + } + + protected boolean isEmulator() { + return isEmulator; + } + + /** + * Write a buffer to a file. + * @param path path + * @param buffer buffer + * @throws IOException failure + */ + protected void write(Path path, byte[] buffer) throws IOException { + ContractTestUtils.writeDataset(getFileSystem(), path, buffer, buffer.length, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT, false); + } + + /** + * Touch a file in the test store. Will overwrite any existing file. + * @param path path + * @throws IOException failure. + */ + protected void touch(Path path) throws IOException { + ContractTestUtils.touch(getFileSystem(), path); + } + + protected static String wasbUrlToAbfsUrl(final String wasbUrl) { + return convertTestUrls( + wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX, + FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX); + } + + protected static String abfsUrlToWasbUrl(final String abfsUrl) { + return convertTestUrls( + abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX, + FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX); + } + + private static String convertTestUrls( + final String url, + final String fromNonSecureScheme, + final String fromSecureScheme, + final String fromDnsPrefix, + final String toNonSecureScheme, + final String toSecureScheme, + final String toDnsPrefix) { + String data = null; + if (url.startsWith(fromNonSecureScheme + "://")) { + data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://"); + } else if (url.startsWith(fromSecureScheme + "://")) { + data = url.replace(fromSecureScheme + "://", toSecureScheme + "://"); + } + + + if (data != null) { + data = data.replace("." + fromDnsPrefix + ".", + "." + toDnsPrefix + "."); + } + return data; + } + + public Path getTestPath() { + Path path = new Path(UriUtils.generateUniqueTestPath()); + return path; + } + + /** + * Create a path under the test path provided by + * {@link #getTestPath()}. + * @param filepath path string in + * @return a path qualified by the test filesystem + * @throws IOException IO problems + */ + protected Path path(String filepath) throws IOException { + return getFileSystem().makeQualified( + new Path(getTestPath(), filepath)); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java new file mode 100644 index 0000000..cfda7a7 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsScaleTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.azure.integration.AzureTestConstants; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled; + +/** + * Integration tests at bigger scale; configurable as to + * size, off by default. + */ +public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest { + + protected static final Logger LOG = + LoggerFactory.getLogger(AbstractAbfsScaleTest.class); + + @Override + protected int getTestTimeoutMillis() { + return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS; + } + + @Override + public void setup() throws Exception { + super.setup(); + LOG.debug("Scale test operation count = {}", getOperationCount()); + assumeScaleTestsEnabled(getConfiguration()); + } + + protected long getOperationCount() { + return getConfiguration().getLong(AzureTestConstants.KEY_OPERATION_COUNT, + AzureTestConstants.DEFAULT_OPERATION_COUNT); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java deleted file mode 100644 index 74a530c..0000000 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/DependencyInjectedTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.azurebfs; - -import java.net.URI; -import java.util.UUID; -import java.util.concurrent.Callable; - -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; -import org.apache.hadoop.fs.azure.NativeAzureFileSystem; -import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; -import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; -import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; - -import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.FILE_SYSTEM_NOT_FOUND; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; -import static org.junit.Assert.assertEquals; -import static org.junit.Assume.assumeNotNull; - -/** - * Provide dependencies for AzureBlobFileSystem tests. - */ -public abstract class DependencyInjectedTest { - private final boolean isEmulator; - private NativeAzureFileSystem wasb; - private String abfsScheme; - - private Configuration configuration; - private String fileSystemName; - private String accountName; - private String testUrl; - - public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-"; - - public DependencyInjectedTest(final boolean secure) { - this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME); - } - - protected DependencyInjectedTest() { - this(FileSystemUriSchemes.ABFS_SCHEME); - } - - private DependencyInjectedTest(final String scheme) { - abfsScheme = scheme; - fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); - configuration = new Configuration(); - configuration.addResource("azure-bfs-test.xml"); - - assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME)); - assumeNotNull(configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + configuration.get(TestConfigurationKeys - .FS_AZURE_TEST_ACCOUNT_NAME))); - - final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName(); - URI defaultUri = null; - - try { - defaultUri = new URI(abfsScheme, abfsUrl, null, null, null); - } catch (Exception ex) { - Assert.fail(ex.getMessage()); - } - - this.testUrl = defaultUri.toString(); - configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); - configuration.setBoolean(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); - this.isEmulator = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); - this.accountName = this.configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME); - } - - @Before - public void initialize() throws Exception { - //Create filesystem first to make sure getWasbFileSystem() can return an existed filesystem. - this.getFileSystem(); - - if (!this.isEmulator) { - final URI wasbUri = new URI(abfsUrlToWasbUrl(this.getTestUrl())); - final AzureNativeFileSystemStore azureNativeFileSystemStore = new AzureNativeFileSystemStore(); - azureNativeFileSystemStore.initialize( - wasbUri, - this.getConfiguration(), - new AzureFileSystemInstrumentation(this.getConfiguration())); - - this.wasb = new NativeAzureFileSystem(azureNativeFileSystemStore); - this.wasb.initialize(wasbUri, configuration); - } - } - - @After - public void testCleanup() throws Exception { - if (this.wasb != null) { - this.wasb.close(); - } - - FileSystem.closeAll(); - - final AzureBlobFileSystem fs = this.getFileSystem(); - final AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); - abfsStore.deleteFilesystem(); - - AbfsRestOperationException ex = intercept( - AbfsRestOperationException.class, - new Callable<Void>() { - @Override - public Void call() throws Exception { - fs.getAbfsStore().getFilesystemProperties(); - return null; - } - }); - - assertEquals(FILE_SYSTEM_NOT_FOUND.getStatusCode(), ex.getStatusCode()); - } - - public AzureBlobFileSystem getFileSystem() throws Exception { - return (AzureBlobFileSystem) FileSystem.get(this.configuration); - } - - protected NativeAzureFileSystem getWasbFileSystem() { - return this.wasb; - } - - protected String getHostName() { - return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_HOST_NAME); - } - - protected void updateTestUrl(String testUrl) { - this.testUrl = testUrl; - } - protected String getTestUrl() { - return testUrl; - } - - protected void updateFileSystemName(String fileSystemName) { - this.fileSystemName = fileSystemName; - } - protected String getFileSystemName() { - return fileSystemName; - } - - protected String getAccountName() { - return configuration.get(TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_NAME); - } - - protected String getAccountKey() { - return configuration.get( - TestConfigurationKeys.FS_AZURE_TEST_ACCOUNT_KEY_PREFIX - + getAccountName()); - } - - protected Configuration getConfiguration() { - return this.configuration; - } - - protected boolean isEmulator() { - return isEmulator; - } - - protected static String wasbUrlToAbfsUrl(final String wasbUrl) { - return convertTestUrls( - wasbUrl, FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX, - FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX); - } - - protected static String abfsUrlToWasbUrl(final String abfsUrl) { - return convertTestUrls( - abfsUrl, FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME, FileSystemUriSchemes.ABFS_DNS_PREFIX, - FileSystemUriSchemes.WASB_SCHEME, FileSystemUriSchemes.WASB_SECURE_SCHEME, FileSystemUriSchemes.WASB_DNS_PREFIX); - } - - private static String convertTestUrls( - final String url, final String fromNonSecureScheme, final String fromSecureScheme, final String fromDnsPrefix, - final String toNonSecureScheme, final String toSecureScheme, final String toDnsPrefix) { - String data = null; - if (url.startsWith(fromNonSecureScheme + "://")) { - data = url.replace(fromNonSecureScheme + "://", toNonSecureScheme + "://"); - } else if (url.startsWith(fromSecureScheme + "://")) { - data = url.replace(fromSecureScheme + "://", toSecureScheme + "://"); - } - - data = data.replace("." + fromDnsPrefix + ".", "." + toDnsPrefix + "."); - return data; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce03a93f/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 10d42d1..f2e26ec 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -25,13 +25,13 @@ import org.junit.Test; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; - -import static org.junit.Assert.assertEquals; +import org.apache.hadoop.fs.contract.ContractTestUtils; /** * Test append operations. */ -public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest { +public class ITestAzureBlobFileSystemAppend extends + AbstractAbfsIntegrationTest { private static final Path TEST_FILE_PATH = new Path("testfile"); private static final Path TEST_FOLDER_PATH = new Path("testFolder"); public ITestAzureBlobFileSystemAppend() { @@ -40,7 +40,7 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest { @Test(expected = FileNotFoundException.class) public void testAppendDirShouldFail() throws Exception { - final AzureBlobFileSystem fs = this.getFileSystem(); + final AzureBlobFileSystem fs = getFileSystem(); final Path filePath = TEST_FILE_PATH; fs.mkdirs(filePath); fs.append(filePath, 0); @@ -48,21 +48,21 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest { @Test public void testAppendWithLength0() throws Exception { - final AzureBlobFileSystem fs = this.getFileSystem(); - FSDataOutputStream stream = fs.create(TEST_FILE_PATH); - final byte[] b = new byte[1024]; - new Random().nextBytes(b); - stream.write(b, 1000, 0); - - assertEquals(0, stream.getPos()); + final AzureBlobFileSystem fs = getFileSystem(); + try(FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) { + final byte[] b = new byte[1024]; + new Random().nextBytes(b); + stream.write(b, 1000, 0); + assertEquals(0, stream.getPos()); + } } @Test(expected = FileNotFoundException.class) public void testAppendFileAfterDelete() throws Exception { - final AzureBlobFileSystem fs = this.getFileSystem(); + final AzureBlobFileSystem fs = getFileSystem(); final Path filePath = TEST_FILE_PATH; - fs.create(filePath); + ContractTestUtils.touch(fs, filePath); fs.delete(filePath, false); fs.append(filePath); @@ -70,7 +70,7 @@ public class ITestAzureBlobFileSystemAppend extends DependencyInjectedTest { @Test(expected = FileNotFoundException.class) public void testAppendDirectory() throws Exception { - final AzureBlobFileSystem fs = this.getFileSystem(); + final AzureBlobFileSystem fs = getFileSystem(); final Path folderPath = TEST_FOLDER_PATH; fs.mkdirs(folderPath); fs.append(folderPath); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org