This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new ea5f65700a7 HADOOP-19474: [ABFS][FnsOverBlob] Listing Optimizations to avoid multiple iteration over list response. (#7421) (#7581) ea5f65700a7 is described below commit ea5f65700a77ce86654aa4176d9db93fbffc779f Author: Anuj Modi <anujmodi2...@gmail.com> AuthorDate: Mon Apr 7 09:48:13 2025 +0530 HADOOP-19474: [ABFS][FnsOverBlob] Listing Optimizations to avoid multiple iteration over list response. (#7421) (#7581) Contributed by Anuj Modi Reviewed by: Anmol Asrani, Manish Bhatt, Manika Joshi Signed off by: Anuj Modi<anujm...@apache.org> --- .../src/config/checkstyle-suppressions.xml | 2 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 175 ++------------ .../fs/azurebfs/services/AbfsAHCHttpOperation.java | 28 +-- .../fs/azurebfs/services/AbfsBlobClient.java | 245 +++++++++++--------- .../hadoop/fs/azurebfs/services/AbfsClient.java | 135 +++++++++-- .../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 61 +++-- .../hadoop/fs/azurebfs/services/AbfsErrors.java | 2 + .../fs/azurebfs/services/AbfsHttpOperation.java | 32 +-- .../fs/azurebfs/services/ListActionTaker.java | 2 +- .../fs/azurebfs/services/ListResponseData.java | 102 ++++++++ .../fs/azurebfs/services/VersionedFileStatus.java | 139 +++++++++++ .../apache/hadoop/fs/azurebfs/ITestAbfsClient.java | 8 +- .../fs/azurebfs/ITestAbfsCustomEncryption.java | 7 +- .../azurebfs/ITestAzureBlobFileSystemCreate.java | 5 +- .../azurebfs/ITestAzureBlobFileSystemDelete.java | 14 +- .../ITestAzureBlobFileSystemFileStatus.java | 3 +- .../ITestAzureBlobFileSystemListStatus.java | 257 ++++++++++++++++++--- .../azurebfs/ITestAzureBlobFileSystemRename.java | 30 +-- .../fs/azurebfs/services/AbfsClientTestUtil.java | 27 ++- .../TestAbfsRestOperationMockFailures.java | 9 +- .../fs/azurebfs/services/TestListActionTaker.java | 7 +- .../fs/azurebfs/utils/DirectoryStateHelper.java | 2 +- 22 files changed, 875 insertions(+), 417 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 07aa26d2381..1cedf0c54b0 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -48,6 +48,8 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/> <suppress checks="ParameterNumber" files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsBlobClient.java"/> + <suppress checks="ParameterNumber|MagicNumber" + files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]VersionedFileStatus.java"/> <suppress checks="ParameterNumber" files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]contracts[\\/]services[\\/]AppendRequestParameters.java"/> <suppress checks="ParameterNumber|MagicNumber" diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 867edfd1438..eef6a8108d5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.azurebfs; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.io.UnsupportedEncodingException; @@ -59,7 +58,6 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.EtagSource; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -78,8 +76,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; @@ -115,6 +112,7 @@ import org.apache.hadoop.fs.azurebfs.services.ListingSupport; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.Base64; import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; @@ -1266,7 +1264,7 @@ public String listStatus(final Path path, final String startFrom, if (startFrom != null && !startFrom.isEmpty()) { /* * Blob Endpoint Does not support startFrom yet. Fallback to DFS Client. - * startFrom remains null for all HDFS APIs. This is only for internal use. + * startFrom remains null for all HDFS APIs. This is used only for tests. */ listingClient = getClient(AbfsServiceType.DFS); continuation = getIsNamespaceEnabled(tracingContext) @@ -1277,58 +1275,16 @@ public String listStatus(final Path path, final String startFrom, do { try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) { - AbfsRestOperation op = listingClient.listPath(relativePath, false, - abfsConfiguration.getListMaxResults(), continuation, - tracingContext); + ListResponseData listResponseData = listingClient.listPath(relativePath, + false, abfsConfiguration.getListMaxResults(), continuation, + tracingContext, this.uri); + AbfsRestOperation op = listResponseData.getOp(); perfInfo.registerResult(op.getResult()); - continuation = listingClient.getContinuationFromResponse(op.getResult()); - ListResultSchema retrievedSchema = op.getResult().getListResultSchema(); - if (retrievedSchema == null) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "listStatusAsync path not found", - null, op.getResult()); - } - - long blockSize = abfsConfiguration.getAzureBlockSize(); - - for (ListResultEntrySchema entry : retrievedSchema.paths()) { - final String owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), true, userName); - final String group = identityTransformer.transformIdentityForGetRequest(entry.group(), false, primaryUserGroup); - final String encryptionContext = entry.getXMsEncryptionContext(); - final FsPermission fsPermission = entry.permissions() == null - ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) - : AbfsPermission.valueOf(entry.permissions()); - final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions()); - - long lastModifiedMillis = 0; - long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); - boolean isDirectory = entry.isDirectory() == null ? false : entry.isDirectory(); - if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { - lastModifiedMillis = DateTimeUtils.parseLastModifiedTime( - entry.lastModified()); - } - - Path entryPath = new Path(File.separator + entry.name()); - entryPath = entryPath.makeQualified(this.uri, entryPath); - - fileStatuses.add( - new VersionedFileStatus( - owner, - group, - fsPermission, - hasAcl, - contentLength, - isDirectory, - 1, - blockSize, - lastModifiedMillis, - entryPath, - entry.eTag(), - encryptionContext)); + continuation = listResponseData.getContinuationToken(); + List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList(); + if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) { + fileStatuses.addAll(fileStatusListInCurrItr); } - perfInfo.registerSuccess(true); countAggregate++; shouldContinue = @@ -1931,110 +1887,6 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } - /** - * A File status with version info extracted from the etag value returned - * in a LIST or HEAD request. - * The etag is included in the java serialization. - */ - static final class VersionedFileStatus extends FileStatus - implements EtagSource { - - /** - * The superclass is declared serializable; this subclass can also - * be serialized. - */ - private static final long serialVersionUID = -2009013240419749458L; - - /** - * The etag of an object. - * Not-final so that serialization via reflection will preserve the value. - */ - private String version; - - private String encryptionContext; - - private VersionedFileStatus( - final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl, - final long length, final boolean isdir, final int blockReplication, - final long blocksize, final long modificationTime, final Path path, - final String version, final String encryptionContext) { - super(length, isdir, blockReplication, blocksize, modificationTime, 0, - fsPermission, - owner, - group, - null, - path, - hasAcl, false, false); - - this.version = version; - this.encryptionContext = encryptionContext; - } - - /** Compare if this object is equal to another object. - * @param obj the object to be compared. - * @return true if two file status has the same path name; false if not. - */ - @Override - public boolean equals(Object obj) { - if (!(obj instanceof FileStatus)) { - return false; - } - - FileStatus other = (FileStatus) obj; - - if (!this.getPath().equals(other.getPath())) {// compare the path - return false; - } - - if (other instanceof VersionedFileStatus) { - return this.version.equals(((VersionedFileStatus) other).version); - } - - return true; - } - - /** - * Returns a hash code value for the object, which is defined as - * the hash code of the path name. - * - * @return a hash code value for the path name and version - */ - @Override - public int hashCode() { - int hash = getPath().hashCode(); - hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0); - return hash; - } - - /** - * Returns the version of this FileStatus - * - * @return a string value for the FileStatus version - */ - public String getVersion() { - return this.version; - } - - @Override - public String getEtag() { - return getVersion(); - } - - public String getEncryptionContext() { - return encryptionContext; - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder( - "VersionedFileStatus{"); - sb.append(super.toString()); - sb.append("; version='").append(version).append('\''); - sb.append('}'); - return sb.toString(); - } - } - /** * Permissions class contain provided permission and umask in octalNotation. * If the object is created for namespace-disabled account, the permission and @@ -2176,6 +2028,11 @@ void setNamespaceEnabled(Trilean isNamespaceEnabled){ this.isNamespaceEnabled = isNamespaceEnabled; } + @VisibleForTesting + public URI getUri() { + return this.uri; + } + private void updateInfiniteLeaseDirs() { this.azureInfiniteLeaseDirSet = new HashSet<>(Arrays.asList( abfsConfiguration.getAzureInfiniteLeaseDirs().split(AbfsHttpConstants.COMMA))); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java index 3ed70965db9..dd585d32fb7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAHCHttpOperation.java @@ -38,7 +38,6 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; -import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.client.methods.HttpGet; @@ -48,7 +47,6 @@ import org.apache.http.client.methods.HttpPut; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.util.EntityUtils; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APACHE_IMPL; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; @@ -194,26 +192,14 @@ String getConnResponseMessage() throws IOException { public void processResponse(final byte[] buffer, final int offset, final int length) throws IOException { - try { - if (!isPayloadRequest) { - prepareRequest(); - LOG.debug("Sending request: {}", httpRequestBase); - httpResponse = executeRequest(); - LOG.debug("Request sent: {}; response {}", httpRequestBase, - httpResponse); - } - parseResponseHeaderAndBody(buffer, offset, length); - } finally { - if (httpResponse != null) { - try { - EntityUtils.consume(httpResponse.getEntity()); - } finally { - if (httpResponse instanceof CloseableHttpResponse) { - ((CloseableHttpResponse) httpResponse).close(); - } - } - } + if (!isPayloadRequest) { + prepareRequest(); + LOG.debug("Sending request: {}", httpRequestBase); + httpResponse = executeRequest(); + LOG.debug("Request sent: {}; response {}", httpRequestBase, + httpResponse); } + parseResponseHeaderAndBody(buffer, offset, length); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 2bc6397c369..8166cfb60df 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; import java.net.URLDecoder; import java.net.URLEncoder; @@ -36,6 +37,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Hashtable; import java.util.List; @@ -50,6 +52,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; @@ -60,6 +63,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -70,7 +74,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListXmlParser; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; @@ -163,6 +166,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_MAX_RESULTS; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_PREFIX; import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_RESTYPE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_BLOB_LIST_PARSING; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.PATH_EXISTS; import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ATOMIC_DIR_RENAME_RECOVERY_ON_GET_PATH_EXCEPTION; @@ -340,18 +344,19 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) * @param listMaxResults maximum number of blobs to return. * @param continuation marker to specify the continuation token. * @param tracingContext for tracing the service call. - * @return executed rest operation containing response from server. + * @param uri to be used for path conversion. + * @return {@link ListResponseData}. containing listing response. * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ - public AbfsRestOperation listPath(final String relativePath, final boolean recursive, - final int listMaxResults, final String continuation, TracingContext tracingContext) - throws IOException { - return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, true); + @Override + public ListResponseData listPath(final String relativePath, final boolean recursive, + final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException { + return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true); } - public AbfsRestOperation listPath(final String relativePath, final boolean recursive, - final int listMaxResults, final String continuation, TracingContext tracingContext, - boolean is404CheckRequired) throws AzureBlobFileSystemException { + public ListResponseData listPath(final String relativePath, final boolean recursive, + final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired) + throws AzureBlobFileSystemException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -374,63 +379,63 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur requestHeaders); op.execute(tracingContext); - // Filter the paths for which no rename redo operation is performed. - fixAtomicEntriesInListResults(op, tracingContext); - if (isEmptyListResults(op.getResult()) && is404CheckRequired) { + ListResponseData listResponseData = parseListPathResults(op.getResult(), uri); + listResponseData.setOp(op); + + // Perform Pending Rename Redo Operation on Atomic Rename Paths. + // Crashed HBase log rename recovery can be done by Filesystem.listStatus. + if (tracingContext.getOpType() == FSOperationType.LISTSTATUS + && op.getResult() != null + && op.getResult().getStatusCode() == HTTP_OK) { + retryRenameOnAtomicEntriesInListResults(tracingContext, + listResponseData.getRenamePendingJsonPaths()); + } + + if (isEmptyListResults(listResponseData) && is404CheckRequired) { // If the list operation returns no paths, we need to check if the path is a file. // If it is a file, we need to return the file in the list. // If it is a non-existing path, we need to throw a FileNotFoundException. if (relativePath.equals(ROOT_PATH)) { - // Root Always exists as directory. It can be a empty listing. - return op; + // Root Always exists as directory. It can be an empty listing. + return listResponseData; } AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false); BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus); + LOG.debug("ListBlob attempted on a file path. Returning file status."); + List<FileStatus> fileStatusList = new ArrayList<>(); + for (BlobListResultEntrySchema entry : listResultSchema.paths()) { + fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri)); + } AbfsRestOperation listOp = getAbfsRestOperation( AbfsRestOperationType.ListBlobs, HTTP_METHOD_GET, url, requestHeaders); listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema); - return listOp; + listResponseData.setFileStatusList(fileStatusList); + listResponseData.setContinuationToken(null); + listResponseData.setRenamePendingJsonPaths(null); + listResponseData.setOp(listOp); + return listResponseData; } - return op; + return listResponseData; } /** * Filter the paths for which no rename redo operation is performed. * Update BlobListResultSchema path with filtered entries. - * - * @param op blob list operation * @param tracingContext tracing context * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ - private void fixAtomicEntriesInListResults(final AbfsRestOperation op, - final TracingContext tracingContext) throws AzureBlobFileSystemException { - /* - * Crashed HBase log rename recovery is done by Filesystem.getFileStatus and - * Filesystem.listStatus. - */ - if (tracingContext == null - || tracingContext.getOpType() != FSOperationType.LISTSTATUS - || op == null || op.getResult() == null - || op.getResult().getStatusCode() != HTTP_OK) { + private void retryRenameOnAtomicEntriesInListResults(TracingContext tracingContext, + Map<Path, Integer> renamePendingJsonPaths) throws AzureBlobFileSystemException { + if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) { return; } - BlobListResultSchema listResultSchema - = (BlobListResultSchema) op.getResult().getListResultSchema(); - if (listResultSchema == null) { - return; - } - List<BlobListResultEntrySchema> filteredEntries = new ArrayList<>(); - for (BlobListResultEntrySchema entry : listResultSchema.paths()) { - if (!takeListPathAtomicRenameKeyAction(entry.path(), entry.isDirectory(), - entry.contentLength().intValue(), tracingContext)) { - filteredEntries.add(entry); - } - } - listResultSchema.withPaths(filteredEntries); + for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) { + retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), tracingContext); + } } /**{@inheritDoc}*/ @@ -1162,10 +1167,7 @@ public AbfsRestOperation getPathStatus(final String path, throws AzureBlobFileSystemException { AbfsRestOperation op = this.getPathStatus(path, tracingContext, contextEncryptionAdapter, true); - /* - * Crashed HBase log-folder rename can be recovered by FileSystem#getFileStatus - * and FileSystem#listStatus calls. - */ + // Crashed HBase log-folder rename can be recovered by FileSystem#getFileStatus if (tracingContext.getOpType() == FSOperationType.GET_FILESTATUS && op.getResult() != null && checkIsDir(op.getResult())) { takeGetPathStatusAtomicRenameKeyAction(new Path(path), tracingContext); @@ -1212,6 +1214,8 @@ public AbfsRestOperation getPathStatus(final String path, if (op.getResult().getStatusCode() == HTTP_NOT_FOUND && isImplicitCheckRequired && isNonEmptyDirectory(path, tracingContext)) { // Implicit path found. + // Create a marker blob at this path. + this.createMarkerAtPath(path, null, contextEncryptionAdapter, tracingContext); AbfsRestOperation successOp = getSuccessOp( AbfsRestOperationType.GetPathStatus, HTTP_METHOD_HEAD, url, requestHeaders); @@ -1562,18 +1566,6 @@ public boolean checkUserError(int responseStatusCode) { && responseStatusCode != HTTP_CONFLICT); } - /** - * Get the continuation token from the response from BLOB Endpoint Listing. - * Continuation Token will be present in XML List response body. - * @param result The response from the server. - * @return The continuation token. - */ - @Override - public String getContinuationFromResponse(AbfsHttpOperation result) { - BlobListResultSchema listResultSchema = (BlobListResultSchema) result.getListResultSchema(); - return listResultSchema.getNextMarker(); - } - /** * Get the User-defined metadata on a path from response headers of * GetBlobProperties API on Blob Endpoint. @@ -1604,26 +1596,43 @@ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result) /** * Parse the XML response body returned by ListBlob API on Blob Endpoint. - * @param stream InputStream contains the response from server. - * @return BlobListResultSchema containing the list of entries. - * @throws IOException if parsing fails. + * @param result InputStream contains the response from server. + * @param uri to be used for path conversion. + * @return {@link ListResponseData}. containing listing response. + * @throws AzureBlobFileSystemException if parsing fails. */ @Override - public ListResultSchema parseListPathResults(final InputStream stream) throws IOException { - if (stream == null) { - return null; - } + public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) + throws AzureBlobFileSystemException { BlobListResultSchema listResultSchema; - try { - final SAXParser saxParser = saxParserThreadLocal.get(); - saxParser.reset(); - listResultSchema = new BlobListResultSchema(); - saxParser.parse(stream, new BlobListXmlParser(listResultSchema, getBaseUrl().toString())); - } catch (SAXException | IOException e) { - throw new RuntimeException(e); + try (InputStream stream = result.getListResultStream()) { + if (stream == null) { + return null; + } + try { + final SAXParser saxParser = saxParserThreadLocal.get(); + saxParser.reset(); + listResultSchema = new BlobListResultSchema(); + saxParser.parse(stream, + new BlobListXmlParser(listResultSchema, getBaseUrl().toString())); + result.setListResultSchema(listResultSchema); + LOG.debug("ListBlobs listed {} blobs with {} as continuation token", + listResultSchema.paths().size(), + listResultSchema.getNextMarker()); + } catch (SAXException | IOException e) { + throw new AbfsDriverException(e); + } + } catch (IOException e) { + LOG.error("Unable to deserialize list results for uri {}", uri.toString(), e); + throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); } - return removeDuplicateEntries(listResultSchema); + try { + return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri); + } catch (IOException e) { + LOG.error("Unable to filter list results for uri {}", uri.toString(), e); + throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, e); + } } /** @@ -1764,13 +1773,14 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path, AbfsRestOperation pendingJsonFileStatus; Path pendingJsonPath = new Path(path.getParent(), path.toUri().getPath() + RenameAtomicity.SUFFIX); + int pendingJsonFileContentLength = 0; try { - pendingJsonFileStatus = getPathStatus( - pendingJsonPath.toUri().getPath(), tracingContext, - null, false); + pendingJsonFileStatus = getPathStatus(pendingJsonPath.toUri().getPath(), + tracingContext, null, false); if (checkIsDir(pendingJsonFileStatus.getResult())) { return; } + pendingJsonFileContentLength = Integer.parseInt(pendingJsonFileStatus.getResult().getResponseHeader(CONTENT_LENGTH)); } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HTTP_NOT_FOUND) { return; @@ -1781,9 +1791,7 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path, boolean renameSrcHasChanged; try { RenameAtomicity renameAtomicity = getRedoRenameAtomicity( - pendingJsonPath, Integer.parseInt(pendingJsonFileStatus.getResult() - .getResponseHeader(CONTENT_LENGTH)), - tracingContext); + pendingJsonPath, pendingJsonFileContentLength, tracingContext); renameAtomicity.redo(); renameSrcHasChanged = false; } catch (AbfsRestOperationException ex) { @@ -1818,23 +1826,16 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path, * @param renamePendingJsonLen length of the pendingJson file. * @param tracingContext tracing context. * - * @return true if action is taken. * @throws AzureBlobFileSystemException server error */ - private boolean takeListPathAtomicRenameKeyAction(final Path path, - final boolean isDirectory, final int renamePendingJsonLen, + + private void retryRenameOnAtomicKeyPath(final Path path, + final int renamePendingJsonLen, final TracingContext tracingContext) throws AzureBlobFileSystemException { - if (path == null || path.isRoot() || !isAtomicRenameKey( - path.toUri().getPath()) || isDirectory || !path.toUri() - .getPath() - .endsWith(RenameAtomicity.SUFFIX)) { - return false; - } try { - RenameAtomicity renameAtomicity - = getRedoRenameAtomicity(path, renamePendingJsonLen, - tracingContext); + RenameAtomicity renameAtomicity = getRedoRenameAtomicity(path, + renamePendingJsonLen, tracingContext); renameAtomicity.redo(); } catch (AbfsRestOperationException ex) { /* @@ -1850,7 +1851,6 @@ private boolean takeListPathAtomicRenameKeyAction(final Path path, throw ex; } } - return true; } @VisibleForTesting @@ -1924,39 +1924,65 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri * This is to handle duplicate listing entries returned by Blob Endpoint for * implicit paths that also has a marker file created for them. * This will retain entry corresponding to marker file and remove the BlobPrefix entry. + * This will also filter out all the rename pending json files in listing output. * @param listResultSchema List of entries returned by Blob Endpoint. + * @param uri URI to be used for path conversion. * @return List of entries after removing duplicates. */ - private BlobListResultSchema removeDuplicateEntries(BlobListResultSchema listResultSchema) { - List<BlobListResultEntrySchema> uniqueEntries = new ArrayList<>(); + private ListResponseData filterDuplicateEntriesAndRenamePendingFiles( + BlobListResultSchema listResultSchema, URI uri) throws IOException { + List<FileStatus> fileStatuses = new ArrayList<>(); + Map<Path, Integer> renamePendingJsonPaths = new HashMap<>(); TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>(); for (BlobListResultEntrySchema entry : listResultSchema.paths()) { if (StringUtils.isNotEmpty(entry.eTag())) { // This is a blob entry. It is either a file or a marker blob. // In both cases we will add this. - nameToEntryMap.put(entry.name(), entry); + if (isRenamePendingJsonPathEntry(entry)) { + renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue()); + } else { + nameToEntryMap.put(entry.name(), entry); + fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri)); + } } else { // This is a BlobPrefix entry. It is a directory with file inside // This might have already been added as a marker blob. if (!nameToEntryMap.containsKey(entry.name())) { nameToEntryMap.put(entry.name(), entry); + fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri)); } } } - uniqueEntries.addAll(nameToEntryMap.values()); - listResultSchema.withPaths(uniqueEntries); - return listResultSchema; + ListResponseData listResponseData = new ListResponseData(); + listResponseData.setFileStatusList(fileStatuses); + listResponseData.setRenamePendingJsonPaths(renamePendingJsonPaths); + listResponseData.setContinuationToken(listResultSchema.getNextMarker()); + return listResponseData; + } + + /** + * Check if the entry is a rename pending json file path. + * @param entry to be checked. + * @return true if it is a rename pending json file path. + */ + private boolean isRenamePendingJsonPathEntry(BlobListResultEntrySchema entry) { + String path = entry.path() != null ? entry.path().toUri().getPath() : null; + return path != null + && !entry.path().isRoot() + && isAtomicRenameKey(path) + && !entry.isDirectory() + && path.endsWith(RenameAtomicity.SUFFIX); } /** * When listing is done on a file, Blob Endpoint returns the empty listing * but DFS Endpoint returns the file status as one of the entries. * This is to convert file status into ListResultSchema. - * @param relativePath - * @param pathStatus - * @return + * @param relativePath relative path of the file. + * @param pathStatus AbfsRestOperation containing the file status. + * @return BlobListResultSchema containing the file status. */ private BlobListResultSchema getListResultSchemaFromPathStatus(String relativePath, AbfsRestOperation pathStatus) { BlobListResultSchema listResultSchema = new BlobListResultSchema(); @@ -2001,27 +2027,26 @@ private static String decodeMetadataAttribute(String encoded) @VisibleForTesting public boolean isNonEmptyDirectory(String path, TracingContext tracingContext) throws AzureBlobFileSystemException { - AbfsRestOperation listOp = listPath(path, false, 1, null, tracingContext, - false); - return !isEmptyListResults(listOp.getResult()); + // This method is only called internally to determine state of a path + // and hence don't need identity transformation to happen. + ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false); + return !isEmptyListResults(listResponseData); } /** * Check if the list call returned empty results without any continuation token. - * @param result The response of listing API from the server. + * @param listResponseData The response of listing API from the server. * @return True if empty results without continuation token. */ - private boolean isEmptyListResults(AbfsHttpOperation result) { + private boolean isEmptyListResults(ListResponseData listResponseData) { + AbfsHttpOperation result = listResponseData.getOp().getResult(); boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful result.getListResultSchema() != null && // Parsing of list response was successful - result.getListResultSchema().paths().isEmpty() && // No paths were returned - result.getListResultSchema() instanceof BlobListResultSchema && // It is safe to typecast to BlobListResultSchema - ((BlobListResultSchema) result.getListResultSchema()).getNextMarker() == null; // No continuation token was returned + listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned + StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned if (isEmptyList) { LOG.debug("List call returned empty results without any continuation token."); return true; - } else if (result != null && !(result.getListResultSchema() instanceof BlobListResultSchema)) { - throw new RuntimeException("List call returned unexpected results over Blob Endpoint."); } return false; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 5143a4a1954..8dcd78ea6b3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -19,12 +19,15 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.UnsupportedEncodingException; +import java.lang.reflect.InvocationTargetException; import java.net.HttpURLConnection; import java.net.InetAddress; import java.net.MalformedURLException; +import java.net.URI; import java.net.URL; import java.net.URLEncoder; import java.net.UnknownHostException; @@ -47,6 +50,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -66,18 +70,24 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; +import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.store.LogExactlyOnce; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.FutureCallback; @@ -115,6 +125,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SEMICOLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.UTF_8; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM; @@ -151,6 +162,7 @@ public abstract class AbfsClient implements Closeable { private final AbfsPerfTracker abfsPerfTracker; private String clientProvidedEncryptionKey = null; private String clientProvidedEncryptionKeySHA = null; + private final IdentityTransformerInterface identityTransformer; private final String accountName; private final AuthType authType; @@ -285,6 +297,19 @@ private AbfsClient(final URL baseUrl, metricIdlePeriod); } this.abfsMetricUrl = abfsConfiguration.getMetricUri(); + + final Class<? extends IdentityTransformerInterface> identityTransformerClass = + abfsConfiguration.getRawConfiguration().getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, + IdentityTransformerInterface.class); + try { + this.identityTransformer = identityTransformerClass.getConstructor( + Configuration.class).newInstance(abfsConfiguration.getRawConfiguration()); + } catch (IllegalAccessException | InstantiationException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException e) { + LOG.error("IdentityTransformer Init Falied", e); + throw new IOException(e); + } + LOG.trace("IdentityTransformer init complete"); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -501,12 +526,12 @@ public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, Stri * @param listMaxResults maximum number of blobs to return. * @param continuation marker to specify the continuation token. * @param tracingContext for tracing the server calls. - * @return executed rest operation containing response from server. + * @param uri to be used for the path conversion. + * @return {@link ListResponseData}. containing listing response. * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ - public abstract AbfsRestOperation listPath(String relativePath, boolean recursive, - int listMaxResults, String continuation, TracingContext tracingContext) - throws IOException; + public abstract ListResponseData listPath(String relativePath, boolean recursive, + int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException; /** * Retrieves user-defined metadata on filesystem. @@ -1685,11 +1710,12 @@ protected boolean isRenameResilience() { /** * Parses response of Listing API from server based on Endpoint used. - * @param stream InputStream of the response - * @return ListResultSchema + * @param result AbfsHttpOperation of list Operation. + * @param uri to be used for the path conversion. + * @return {@link ListResponseData} containing the list of entries. * @throws IOException if parsing fails */ - public abstract ListResultSchema parseListPathResults(InputStream stream) throws IOException; + public abstract ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) throws IOException; /** * Parses response of Get Block List from server based on Endpoint used. @@ -1707,13 +1733,6 @@ protected boolean isRenameResilience() { */ public abstract StorageErrorResponseSchema processStorageErrorResponse(InputStream stream) throws IOException; - /** - * Returns continuation token from server response based on Endpoint used. - * @param result response from server - * @return continuation token - */ - public abstract String getContinuationFromResponse(AbfsHttpOperation result); - /** * Returns user-defined metadata from server response based on Endpoint used. * @param result response from server @@ -1757,4 +1776,90 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy successOp.hardSetResult(HttpURLConnection.HTTP_OK); return successOp; } + + /** + * Get the primary user group name. + * @return primary user group name + * @throws AzureBlobFileSystemException if unable to get the primary user group + */ + private String getPrimaryUserGroup() throws AzureBlobFileSystemException { + if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) { + try { + return UserGroupInformation.getCurrentUser().getPrimaryGroupName(); + } catch (IOException ex) { + LOG.error("Failed to get primary group for {}, using user name as primary group name", + getPrimaryUser()); + } + } + //Provide a default group name + return getPrimaryUser(); + } + + /** + * Get the primary username. + * @return primary username + * @throws AzureBlobFileSystemException if unable to get the primary user + */ + private String getPrimaryUser() throws AzureBlobFileSystemException { + try { + return UserGroupInformation.getCurrentUser().getUserName(); + } catch (IOException ex) { + throw new AbfsDriverException(ex); + } + } + + /** + * Creates a VersionedFileStatus object from the ListResultEntrySchema. + * @param entry ListResultEntrySchema object. + * @param uri to be used for the path conversion. + * @return VersionedFileStatus object. + * @throws AzureBlobFileSystemException if transformation fails. + */ + protected VersionedFileStatus getVersionedFileStatusFromEntry( + ListResultEntrySchema entry, URI uri) throws AzureBlobFileSystemException { + long blockSize = abfsConfiguration.getAzureBlockSize(); + String owner = null, group = null; + try{ + if (identityTransformer != null) { + owner = identityTransformer.transformIdentityForGetRequest( + entry.owner(), true, getPrimaryUser()); + group = identityTransformer.transformIdentityForGetRequest( + entry.group(), false, getPrimaryUserGroup()); + } + } catch (IOException ex) { + LOG.error("Failed to get owner/group for path {}", entry.name(), ex); + throw new AbfsDriverException(ex); + } + final String encryptionContext = entry.getXMsEncryptionContext(); + final FsPermission fsPermission = entry.permissions() == null + ? new AbfsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL) + : AbfsPermission.valueOf(entry.permissions()); + final boolean hasAcl = AbfsPermission.isExtendedAcl(entry.permissions()); + + long lastModifiedMillis = 0; + long contentLength = entry.contentLength() == null ? 0 : entry.contentLength(); + boolean isDirectory = entry.isDirectory() != null && entry.isDirectory(); + if (entry.lastModified() != null && !entry.lastModified().isEmpty()) { + lastModifiedMillis = DateTimeUtils.parseLastModifiedTime( + entry.lastModified()); + } + + Path entryPath = new Path(File.separator + entry.name()); + if (uri != null) { + entryPath = entryPath.makeQualified(uri, entryPath); + } + return new VersionedFileStatus( + owner, + group, + fsPermission, + hasAcl, + contentLength, + isDirectory, + 1, + blockSize, + lastModifiedMillis, + entryPath, + entry.eTag(), + encryptionContext); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 05acaa78f48..ef4194179dc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; import java.nio.ByteBuffer; import java.nio.CharBuffer; @@ -30,6 +31,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; +import java.util.ArrayList; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -41,6 +43,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -58,8 +61,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.StorageErrorResponseSchema; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; @@ -138,6 +141,7 @@ import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.UNAUTHORIZED_BLOB_OVERWRITE; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_RECOVERY; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_DFS_LIST_PARSING; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_FILE_ALREADY_EXISTS; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY; @@ -308,15 +312,16 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) * @param listMaxResults maximum number of blobs to return. * @param continuation marker to specify the continuation token. * @param tracingContext for tracing the server calls. - * @return executed rest operation containing response from server. + * @param uri to be used for path conversion. + * @return {@link ListResponseData}. containing listing response. * @throws AzureBlobFileSystemException if rest operation or response parsing fails. */ @Override - public AbfsRestOperation listPath(final String relativePath, + public ListResponseData listPath(final String relativePath, final boolean recursive, final int listMaxResults, final String continuation, - TracingContext tracingContext) throws IOException { + TracingContext tracingContext, URI uri) throws IOException { final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -337,7 +342,9 @@ public AbfsRestOperation listPath(final String relativePath, AbfsRestOperationType.ListPaths, HTTP_METHOD_GET, url, requestHeaders); op.execute(tracingContext); - return op; + ListResponseData listResponseData = parseListPathResults(op.getResult(), uri); + listResponseData.setOp(op); + return listResponseData; } /** @@ -1444,8 +1451,7 @@ public boolean checkUserError(int responseStatusCode) { * @param result The response from the server. * @return The continuation token. */ - @Override - public String getContinuationFromResponse(AbfsHttpOperation result) { + private String getContinuationFromResponse(AbfsHttpOperation result) { return result.getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); } @@ -1464,20 +1470,41 @@ public Hashtable<String, String> getXMSProperties(AbfsHttpOperation result) /** * Parse the list file response from DFS ListPath API in Json format - * @param stream InputStream contains the list results. - * @throws IOException if parsing fails. + * @param result InputStream contains the list results. + * @param uri to be used for path conversion. + * @return {@link ListResponseData}. containing listing response. + * @throws AzureBlobFileSystemException if parsing fails. */ @Override - public ListResultSchema parseListPathResults(final InputStream stream) throws IOException { - DfsListResultSchema listResultSchema; - try { - final ObjectMapper objectMapper = new ObjectMapper(); - listResultSchema = objectMapper.readValue(stream, DfsListResultSchema.class); + public ListResponseData parseListPathResults(AbfsHttpOperation result, URI uri) throws AzureBlobFileSystemException { + try (InputStream listResultInputStream = result.getListResultStream()) { + DfsListResultSchema listResultSchema; + try { + final ObjectMapper objectMapper = new ObjectMapper(); + listResultSchema = objectMapper.readValue(listResultInputStream, + DfsListResultSchema.class); + result.setListResultSchema(listResultSchema); + LOG.debug("ListPath listed {} paths with {} as continuation token", + listResultSchema.paths().size(), + getContinuationFromResponse(result)); + } catch (IOException ex) { + throw new AbfsDriverException(ex); + } + + List<FileStatus> fileStatuses = new ArrayList<>(); + for (DfsListResultEntrySchema entry : listResultSchema.paths()) { + fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri)); + } + ListResponseData listResponseData = new ListResponseData(); + listResponseData.setFileStatusList(fileStatuses); + listResponseData.setRenamePendingJsonPaths(null); + listResponseData.setContinuationToken( + getContinuationFromResponse(result)); + return listResponseData; } catch (IOException ex) { - LOG.error("Unable to deserialize list results", ex); - throw ex; + LOG.error("Unable to deserialize list results for Uri {}", uri.toString(), ex); + throw new AbfsDriverException(ERR_DFS_LIST_PARSING, ex); } - return listResultSchema; } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index 511848c4d0a..da761f7b4e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -73,6 +73,8 @@ public final class AbfsErrors { "Error while recovering from create failure."; public static final String ERR_RENAME_RECOVERY = "Error while recovering from rename failure."; + public static final String ERR_BLOB_LIST_PARSING = "Parsing of XML List Response Failed in BlobClient."; + public static final String ERR_DFS_LIST_PARSING = "Parsing of Json List Response Failed in DfsClient."; public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem."; private AbfsErrors() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 017a4f41807..fc35aecd587 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -75,6 +75,7 @@ public abstract class AbfsHttpOperation implements AbfsPerfLoggable { private String requestId = ""; private String expectedAppendPos = ""; private ListResultSchema listResultSchema = null; + private InputStream listResultStream = null; private List<String> blockIdList = null; // metrics @@ -220,6 +221,10 @@ public ListResultSchema getListResultSchema() { return listResultSchema; } + public final InputStream getListResultStream() { + return listResultStream; + } + /** * Get response header value for the given headerKey. * @@ -383,7 +388,8 @@ final void parseResponse(final byte[] buffer, // consume the input stream to release resources int totalBytesRead = 0; - try (InputStream stream = getContentInputStream()) { + try { + InputStream stream = getContentInputStream(); if (isNullInputStream(stream)) { return; } @@ -395,7 +401,7 @@ final void parseResponse(final byte[] buffer, if (url.toString().contains(QUERY_PARAM_COMP + EQUAL + BLOCKLIST)) { parseBlockListResponse(stream); } else { - parseListFilesResponse(stream); + listResultStream = stream; } } else { if (buffer != null) { @@ -479,19 +485,6 @@ private void processStorageErrorResponse() { */ protected abstract InputStream getErrorStream() throws IOException; - /** - * Parse the list file response - * - * @param stream InputStream contains the list results. - * @throws IOException if the response cannot be deserialized. - */ - private void parseListFilesResponse(final InputStream stream) throws IOException { - if (stream == null || listResultSchema != null) { - return; - } - listResultSchema = client.parseListPathResults(stream); - } - private void parseBlockListResponse(final InputStream stream) throws IOException { if (stream == null || blockIdList != null) { return; @@ -579,7 +572,6 @@ public final long getSendLatency() { public final long getRecvLatency() { return recvResponseTimeMs; } - /** * Set response status code for the server call. * @@ -668,6 +660,14 @@ protected boolean isConnectionDisconnectedOnError() { return connectionDisconnectedOnError; } + /** + * Sets the list result schema after parsing done on Client. + * @param listResultSchema ListResultSchema + */ + protected void setListResultSchema(final ListResultSchema listResultSchema) { + this.listResultSchema = listResultSchema; + } + public static class AbfsHttpOperationWithFixedResult extends AbfsHttpOperation { /** * Creates an instance to represent fixed results. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java index ed3d464e9b6..74f5aa4ffb5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java @@ -239,7 +239,7 @@ protected String listAndEnqueue(final ListBlobQueue listBlobQueue, op = getAbfsClient().listPath(path.toUri().getPath(), true, queueAvailableSizeForProduction, continuationToken, - tracingContext); + tracingContext, null).getOp(); } catch (AzureBlobFileSystemException ex) { throw ex; } catch (IOException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java new file mode 100644 index 00000000000..28f10226d95 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +/** + * This class is used to hold the response data for list operations. + * It contains a list of FileStatus objects, a map of rename pending JSON paths, + * continuation token and the executed REST operation. + */ +public class ListResponseData { + + private List<FileStatus> fileStatusList; + private Map<Path, Integer> renamePendingJsonPaths; + private AbfsRestOperation executedRestOperation; + private String continuationToken; + + /** + * Returns the list of FileStatus objects. + * @return the list of FileStatus objects + */ + public List<FileStatus> getFileStatusList() { + return fileStatusList; + } + + /** + * Sets the list of FileStatus objects. + * @param fileStatusList the list of FileStatus objects + */ + public void setFileStatusList(final List<FileStatus> fileStatusList) { + this.fileStatusList = fileStatusList; + } + + /** + * Returns the map of rename pending JSON paths. + * @return the map of rename pending JSON paths + */ + public Map<Path, Integer> getRenamePendingJsonPaths() { + return renamePendingJsonPaths; + } + + /** + * Sets the map of rename pending JSON paths. + * @param renamePendingJsonPaths the map of rename pending JSON paths + */ + public void setRenamePendingJsonPaths(final Map<Path, Integer> renamePendingJsonPaths) { + this.renamePendingJsonPaths = renamePendingJsonPaths; + } + + /** + * Returns the executed REST operation. + * @return the executed REST operation + */ + public AbfsRestOperation getOp() { + return executedRestOperation; + } + + /** + * Sets the executed REST operation. + * @param executedRestOperation the executed REST operation + */ + public void setOp(final AbfsRestOperation executedRestOperation) { + this.executedRestOperation = executedRestOperation; + } + + /** + * Returns the continuation token. + * @return the continuation token + */ + public String getContinuationToken() { + return continuationToken; + } + + /** + * Sets the continuation token. + * @param continuationToken the continuation token + */ + public void setContinuationToken(final String continuationToken) { + this.continuationToken = continuationToken; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java new file mode 100644 index 00000000000..2db6e9dd83e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VersionedFileStatus.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.EtagSource; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +/** + * A File status with version info extracted from the etag value returned + * in a LIST or HEAD request. + * The etag is included in the java serialization. + */ +public class VersionedFileStatus extends FileStatus implements EtagSource { + + /** + * The superclass is declared serializable; this subclass can also + * be serialized. + */ + private static final long serialVersionUID = -2009013240419749458L; + + /** + * The etag of an object. + * Not-final so that serialization via reflection will preserve the value. + */ + private String version; + + private String encryptionContext; + + public VersionedFileStatus( + final String owner, final String group, final FsPermission fsPermission, final boolean hasAcl, + final long length, final boolean isdir, final int blockReplication, + final long blocksize, final long modificationTime, final Path path, + final String version, final String encryptionContext) { + super(length, isdir, blockReplication, blocksize, modificationTime, 0, + fsPermission, + owner, + group, + null, + path, + hasAcl, false, false); + + this.version = version; + this.encryptionContext = encryptionContext; + } + + /** Compare if this object is equal to another object. + * @param obj the object to be compared. + * @return true if two file status has the same path name; false if not. + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof FileStatus)) { + return false; + } + + FileStatus other = (FileStatus) obj; + + if (!this.getPath().equals(other.getPath())) {// compare the path + return false; + } + + if (other instanceof VersionedFileStatus) { + return this.version.equals(((VersionedFileStatus) other).version); + } + + return true; + } + + /** + * Returns a hash code value for the object, which is defined as + * the hash code of the path name. + * + * @return a hash code value for the path name and version + */ + @Override + public int hashCode() { + int hash = getPath().hashCode(); + hash = 89 * hash + (this.version != null ? this.version.hashCode() : 0); + return hash; + } + + /** + * Returns the version of this FileStatus + * + * @return a string value for the FileStatus version + */ + public String getVersion() { + return this.version; + } + + /** + * Returns the etag of this FileStatus. + * @return a string value for the FileStatus etag. + */ + @Override + public String getEtag() { + return getVersion(); + } + + /** + * Returns the encryption context of this FileStatus + * @return a string value for the FileStatus encryption context + */ + public String getEncryptionContext() { + return encryptionContext; + } + + /** + * Returns a string representation of the object. + * @return a string representation of the object + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "VersionedFileStatus{"); + sb.append(super.toString()); + sb.append("; version='").append(version).append('\''); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java index baa57da2881..c180689b267 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -65,7 +65,7 @@ public void testContinuationTokenHavingEqualSign() throws Exception { try { AbfsRestOperation op = abfsClient .listPath("/", true, LIST_MAX_RESULTS, "===========", - getTestTracingContext(fs, true)); + getTestTracingContext(fs, true), null).getOp(); Assert.assertTrue(false); } catch (AbfsRestOperationException ex) { Assert.assertEquals("InvalidQueryParameterValue", ex.getErrorCode().getErrorCode()); @@ -106,7 +106,7 @@ public void testListPathWithValidListMaxResultsValues() AbfsRestOperation op = getFileSystem().getAbfsClient().listPath( directory.toString(), false, getListMaxResults(), null, - getTestTracingContext(getFileSystem(), true)); + getTestTracingContext(getFileSystem(), true), null).getOp(); List<? extends ListResultEntrySchema> list = op.getResult().getListResultSchema().paths(); String continuationToken = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); @@ -141,7 +141,7 @@ public void testListPathWithValueGreaterThanServerMaximum() AbfsRestOperation op = getFileSystem().getAbfsClient().listPath( directory.toString(), false, getListMaxResults(), null, - getTestTracingContext(getFileSystem(), true)); + getTestTracingContext(getFileSystem(), true), null).getOp(); List<? extends ListResultEntrySchema> list = op.getResult().getListResultSchema().paths(); String continuationToken = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION); @@ -179,7 +179,7 @@ private List<? extends ListResultEntrySchema> listPath(String directory) throws IOException { return getFileSystem().getAbfsClient() .listPath(directory, false, getListMaxResults(), null, - getTestTracingContext(getFileSystem(), true)).getResult() + getTestTracingContext(getFileSystem(), true), null).getOp().getResult() .getListResultSchema().paths(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java index 2a06dfb4acc..781cd701400 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java @@ -54,6 +54,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.impl.OpenFileParameters; @@ -301,10 +302,10 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, */ FileStatus status = fs.listStatus(testPath)[0]; Assertions.assertThat(status) - .isInstanceOf(AzureBlobFileSystemStore.VersionedFileStatus.class); + .isInstanceOf(VersionedFileStatus.class); Assertions.assertThat( - ((AzureBlobFileSystemStore.VersionedFileStatus) status).getEncryptionContext()) + ((VersionedFileStatus) status).getEncryptionContext()) .isNotNull(); try (FSDataInputStream in = fs.openFileWithOptions(testPath, @@ -343,7 +344,7 @@ true, new BlobAppendRequestParameters(BLOCK_ID, null)), getTestTracingContext(fs, false)); case LISTSTATUS: return client.listPath(path, false, 5, null, - getTestTracingContext(fs, true)); + getTestTracingContext(fs, true), null).getOp(); case RENAME: TracingContext tc = getTestTracingContext(fs, true); return client.renamePath(path, new Path(path + "_2").toString(), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index f59c4a8b9f2..4b08e7d2246 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -66,6 +66,7 @@ import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; @@ -251,8 +252,8 @@ private AzureBlobFileSystem createJsonFile(Path path, Path renameJson) throws IO doReturn(client).when(store).getClient(); fs.setWorkingDirectory(new Path(ROOT_PATH)); fs.mkdirs(new Path(path, "test3")); - AzureBlobFileSystemStore.VersionedFileStatus fileStatus - = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path); + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); new RenameAtomicity(path, new Path("/hbase/test4"), renameJson, getTestTracingContext(fs, true), fileStatus.getEtag(), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index 3f16e6be80f..e20a2c43279 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URI; import java.nio.file.AccessDeniedException; import java.util.ArrayList; import java.util.List; @@ -311,12 +312,11 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception { doCallRealMethod().when(mockClient) .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), - Mockito.nullable(TracingContext.class)); + Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class)); doCallRealMethod().when((AbfsBlobClient) mockClient) .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), - Mockito.nullable(TracingContext.class), - Mockito.anyBoolean()); + Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class), Mockito.anyBoolean()); doCallRealMethod().when((AbfsBlobClient) mockClient) .getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean()); @@ -531,12 +531,12 @@ public void testDeleteImplicitDirWithSingleListResults() throws Exception { boolean recursive = answer.getArgument(1); String continuation = answer.getArgument(3); TracingContext context = answer.getArgument(4); - return client.listPath(path, recursive, 1, continuation, context); + return client.listPath(path, recursive, 1, continuation, context, null); }) .when(spiedClient) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); client.deleteBlobPath(new Path("/testDir/dir1"), null, getTestTracingContext(fs, true)); fs.delete(new Path("/testDir/dir1"), true); @@ -683,14 +683,14 @@ public void testProducerStopOnDeleteFailure() throws Exception { }) .when(spiedClient) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); intercept(AccessDeniedException.class, () -> { fs.delete(new Path("/src"), true); }); Mockito.verify(spiedClient, Mockito.times(1)) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java index 87b1ea4321d..f226e5b61fb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java @@ -98,6 +98,7 @@ private FileStatus validateStatus(final AzureBlobFileSystem fs, final Path name, assertTrue(errorInStatus + "not a file", fileStatus.isFile()); } } + assertPathDns(fileStatus.getPath()); return fileStatus; } @@ -255,7 +256,7 @@ public void testListStatusIsCalledForImplicitPathOnBlobEndpoint() throws Excepti fs.getFileStatus(implicitPath); Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), eq(false), any(), any()); - Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), eq(false)); + Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any(), eq(false)); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index e563e082b80..1c4ef6d4212 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.assertj.core.api.Assertions; import org.junit.Test; import org.mockito.Mockito; import org.mockito.stubbing.Stubber; @@ -39,21 +40,20 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; -import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; -import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultEntrySchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.DfsListResultSchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; -import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; +import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.contract.ContractTestUtils; import static java.net.HttpURLConnection.HTTP_OK; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS; +import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX; import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; @@ -72,7 +72,7 @@ public class ITestAzureBlobFileSystemListStatus extends AbstractAbfsIntegrationTest { private static final int TEST_FILES_NUMBER = 6000; - private static final String TEST_CONTINUATION_TOKEN = "continuation"; + public static final String TEST_CONTINUATION_TOKEN = "continuation"; public ITestAzureBlobFileSystemListStatus() throws Exception { super(); @@ -122,43 +122,20 @@ public Void call() throws Exception { */ @Test public void testListPathTracingContext() throws Exception { - assumeDfsServiceType(); - final AzureBlobFileSystem fs = getFileSystem(); - final AzureBlobFileSystem spiedFs = Mockito.spy(fs); - final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore()); - final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient()); + final AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + final AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + final AbfsClient spiedClient = Mockito.spy(spiedFs.getAbfsClient()); final TracingContext spiedTracingContext = Mockito.spy( new TracingContext( - fs.getClientCorrelationId(), fs.getFileSystemId(), + spiedFs.getClientCorrelationId(), spiedFs.getFileSystemId(), FSOperationType.LISTSTATUS, true, TracingHeaderFormat.ALL_ID_FORMAT, null)); Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); - spiedStore.setClient(spiedClient); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); spiedFs.setWorkingDirectory(new Path("/")); - AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient, + AbfsClientTestUtil.setMockAbfsRestOperationForListOperation(spiedClient, (httpOperation) -> { - - ListResultEntrySchema entry = new DfsListResultEntrySchema() - .withName("a") - .withIsDirectory(true); - List<ListResultEntrySchema> paths = new ArrayList<>(); - paths.add(entry); - paths.clear(); - entry = new DfsListResultEntrySchema() - .withName("abc.txt") - .withIsDirectory(false); - paths.add(entry); - ListResultSchema schema1 = new DfsListResultSchema().withPaths(paths); - ListResultSchema schema2 = new DfsListResultSchema().withPaths(paths); - - when(httpOperation.getListResultSchema()).thenReturn(schema1) - .thenReturn(schema2); - when(httpOperation.getResponseHeader( - HttpHeaderConfigurations.X_MS_CONTINUATION)) - .thenReturn(TEST_CONTINUATION_TOKEN) - .thenReturn(EMPTY_STRING); - Stubber stubber = Mockito.doThrow( new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE)); stubber.doNothing().when(httpOperation).processResponse( @@ -176,12 +153,12 @@ public void testListPathTracingContext() throws Exception { Mockito.verify(spiedClient, times(1)).listPath( "/", false, spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(), - null, spiedTracingContext); + null, spiedTracingContext, spiedFs.getAbfsStore().getUri()); // 2. With continuation token Mockito.verify(spiedClient, times(1)).listPath( "/", false, spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(), - TEST_CONTINUATION_TOKEN, spiedTracingContext); + TEST_CONTINUATION_TOKEN, spiedTracingContext, spiedFs.getAbfsStore().getUri()); // Assert that none of the API calls used the same tracing header. Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any(), any()); @@ -343,4 +320,208 @@ public void testRenameTrailingPeriodFile() throws IOException { assertTrue("Attempt to create file that ended with a dot should" + " throw IllegalArgumentException", exceptionThrown); } + + + + /** + * Test to verify that listStatus returns the correct file status all types + * of paths viz. implicit, explicit, file. + * @throws Exception if there is an error or test assertions fails. + */ + @Test + public void testListStatusWithImplicitExplicitChildren() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + fs.setWorkingDirectory(new Path(ROOT_PATH)); + Path root = new Path(ROOT_PATH); + + // Create an implicit directory under root + Path dir = new Path("a"); + Path fileInsideDir = new Path("a/file"); + createAzCopyFolder(dir); + + // Assert that implicit directory is returned + FileStatus[] fileStatuses = fs.listStatus(root); + Assertions.assertThat(fileStatuses.length) + .describedAs("List size is not expected").isEqualTo(1); + assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir)); + + // Create a marker blob for the directory. + fs.create(fileInsideDir); + + // Assert that only one entry of explicit directory is returned + fileStatuses = fs.listStatus(root); + Assertions.assertThat(fileStatuses.length) + .describedAs("List size is not expected").isEqualTo(1); + assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir)); + + // Create a file under root + Path file1 = new Path("b"); + fs.create(file1); + + // Assert that two entries are returned in alphabetic order. + fileStatuses = fs.listStatus(root); + Assertions.assertThat(fileStatuses.length) + .describedAs("List size is not expected").isEqualTo(2); + assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir)); + assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1)); + + // Create another implicit directory under root. + Path dir2 = new Path("c"); + createAzCopyFolder(dir2); + + // Assert that three entries are returned in alphabetic order. + fileStatuses = fs.listStatus(root); + Assertions.assertThat(fileStatuses.length) + .describedAs("List size is not expected").isEqualTo(3); + assertExplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(dir)); + assertFilePathFileStatus(fileStatuses[1], fs.makeQualified(file1)); + assertImplicitDirectoryFileStatus(fileStatuses[2], fs.makeQualified(dir2)); + } + + /** + * Test to verify that listStatus returns the correct file status when called on an implicit path + * @throws Exception if there is an error or test assertions fails. + */ + @Test + public void testListStatusOnImplicitDirectoryPath() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path implicitPath = new Path("/implicitDir"); + createAzCopyFolder(implicitPath); + + FileStatus[] statuses = fs.listStatus(implicitPath); + Assertions.assertThat(statuses.length) + .describedAs("List size is not expected").isGreaterThanOrEqualTo(1); + assertImplicitDirectoryFileStatus(statuses[0], fs.makeQualified(statuses[0].getPath())); + + FileStatus[] statuses1 = fs.listStatus(new Path(statuses[0].getPath().toString())); + Assertions.assertThat(statuses1.length) + .describedAs("List size is not expected").isGreaterThanOrEqualTo(1); + assertFilePathFileStatus(statuses1[0], fs.makeQualified(statuses1[0].getPath())); + } + + @Test + public void testListStatusOnEmptyDirectory() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path emptyDir = new Path("/emptyDir"); + fs.mkdirs(emptyDir); + + FileStatus[] statuses = fs.listStatus(emptyDir); + Assertions.assertThat(statuses.length) + .describedAs("List size is not expected").isEqualTo(0); + } + + @Test + public void testListStatusOnRenamePendingJsonFile() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path renamePendingJsonPath = new Path("/hbase/A/A-" + SUFFIX); + fs.create(renamePendingJsonPath); + + FileStatus[] statuses = fs.listStatus(renamePendingJsonPath); + Assertions.assertThat(statuses.length) + .describedAs("List size is not expected").isEqualTo(1); + assertFilePathFileStatus(statuses[0], fs.makeQualified(statuses[0].getPath())); + } + + @Test + public void testContinuationTokenAcrossListStatus() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testContinuationToken"); + fs.mkdirs(path); + fs.create(new Path(path + "/file1")); + fs.create(new Path(path + "/file2")); + + fs.listStatus(path); + + ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath( + "/testContinuationToken", false, 1, null, getTestTracingContext(fs, true), + fs.getAbfsStore().getUri()); + + Assertions.assertThat(listResponseData.getContinuationToken()) + .describedAs("Continuation Token Should not be null").isNotNull(); + Assertions.assertThat(listResponseData.getFileStatusList()) + .describedAs("Listing Size Not as expected").hasSize(1); + + ListResponseData listResponseData1 = fs.getAbfsStore().getClient().listPath( + "/testContinuationToken", false, 1, listResponseData.getContinuationToken(), getTestTracingContext(fs, true), + fs.getAbfsStore().getUri()); + + Assertions.assertThat(listResponseData1.getContinuationToken()) + .describedAs("Continuation Token Should be null").isNull(); + Assertions.assertThat(listResponseData1.getFileStatusList()) + .describedAs("Listing Size Not as expected").hasSize(1); + } + + @Test + public void testInvalidContinuationToken() throws Exception { + assumeHnsDisabled(); + final AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testInvalidContinuationToken"); + fs.mkdirs(path); + fs.create(new Path(path + "/file1")); + fs.create(new Path(path + "/file2")); + + intercept(AbfsRestOperationException.class, + () -> fs.getAbfsStore().getClient().listPath( + "/testInvalidContinuationToken", false, 1, "invalidToken", + getTestTracingContext(fs, true), fs.getAbfsStore().getUri())); + } + + @Test + public void testEmptyContinuationToken() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testInvalidContinuationToken"); + fs.mkdirs(path); + fs.create(new Path(path + "/file1")); + fs.create(new Path(path + "/file2")); + + ListResponseData listResponseData = fs.getAbfsStore().getClient().listPath( + "/testInvalidContinuationToken", false, 1, "", + getTestTracingContext(fs, true), fs.getAbfsStore().getUri()); + + Assertions.assertThat(listResponseData.getContinuationToken()) + .describedAs("Continuation Token Should Not be null").isNotNull(); + Assertions.assertThat(listResponseData.getFileStatusList()) + .describedAs("Listing Size Not as expected").hasSize(1); + } + + private void assertFilePathFileStatus(final FileStatus fileStatus, + final Path qualifiedPath) { + Assertions.assertThat(fileStatus.getPath()) + .describedAs("Path Not as expected").isEqualTo(qualifiedPath); + Assertions.assertThat(fileStatus.isFile()) + .describedAs("Expecting a File Path").isEqualTo(true); + Assertions.assertThat(fileStatus.isDirectory()) + .describedAs("Expecting a File Path").isEqualTo(false); + Assertions.assertThat(fileStatus.getModificationTime()).isNotEqualTo(0); + } + + private void assertImplicitDirectoryFileStatus(final FileStatus fileStatus, + final Path qualifiedPath) throws Exception { + assertDirectoryFileStatus(fileStatus, qualifiedPath); + DirectoryStateHelper.isImplicitDirectory(qualifiedPath, getFileSystem(), + getTestTracingContext(getFileSystem(), true)); + Assertions.assertThat(fileStatus.getModificationTime()) + .describedAs("Last Modified Time Not as Expected").isEqualTo(0); + } + + private void assertExplicitDirectoryFileStatus(final FileStatus fileStatus, + final Path qualifiedPath) throws Exception { + assertDirectoryFileStatus(fileStatus, qualifiedPath); + DirectoryStateHelper.isExplicitDirectory(qualifiedPath, getFileSystem(), + getTestTracingContext(getFileSystem(), true)); + Assertions.assertThat(fileStatus.getModificationTime()) + .describedAs("Last Modified Time Not as Expected").isNotEqualTo(0); + } + + private void assertDirectoryFileStatus(final FileStatus fileStatus, + final Path qualifiedPath) { + Assertions.assertThat(fileStatus.getPath()) + .describedAs("Path Not as Expected").isEqualTo(qualifiedPath); + Assertions.assertThat(fileStatus.isDirectory()) + .describedAs("Expecting a Directory Path").isEqualTo(true); + Assertions.assertThat(fileStatus.isFile()) + .describedAs("Expecting a Directory Path").isEqualTo(false); + Assertions.assertThat(fileStatus.getLen()) + .describedAs("Content Length Not as Expected").isEqualTo(0); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index ad352eef69b..fa9b63c0ef9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.util.ArrayList; @@ -64,6 +65,7 @@ import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity; import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils; import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; @@ -869,8 +871,8 @@ public void testRenameCompleteBeforeRenameAtomicityRedo() throws Exception { /* * Create renameJson file. */ - AzureBlobFileSystemStore.VersionedFileStatus fileStatus - = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path); + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); int jsonLen = new RenameAtomicity(path, new Path("/hbase/test4"), renameJson, getTestTracingContext(fs, true), fileStatus.getEtag(), @@ -1433,12 +1435,12 @@ public void testBlobRenameWithListGivingPaginatedResultWithOneObjectPerList() String continuation = answer.getArgument(3); TracingContext context = answer.getArgument(4); return getFileSystem().getAbfsClient() - .listPath(path, recursive, 1, continuation, context); + .listPath(path, recursive, 1, continuation, context, null); }) .when(spiedClient) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); fs.rename(new Path("/testDir/dir1"), new Path("/testDir/dir2")); for (int i = 0; i < 10; i++) { Assertions.assertThat(fs.exists(new Path("/testDir/dir2/file" + i))) @@ -1516,13 +1518,13 @@ public void testProducerStopOnRenameFailure() throws Exception { listCallInvocation[0]++; return getFileSystem().getAbfsClient().listPath(answer.getArgument(0), answer.getArgument(1), 1, - answer.getArgument(3), answer.getArgument(4)); + answer.getArgument(3), answer.getArgument(4), answer.getArgument(5)); } return answer.callRealMethod(); }) .when(spiedClient) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); intercept(AccessDeniedException.class, () -> { fs.rename(new Path("/src"), new Path("/dst")); @@ -1780,15 +1782,15 @@ private Configuration createConfig(String producerQueueSize, String consumerMaxL private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws IOException { - Assertions.assertThat(fs.exists(dst)) - .describedAs("Renamed Destination directory should exist.") - .isEqualTo(isDstExist); Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + SUFFIX))) .describedAs("Renamed Pending Json file should exist.") .isEqualTo(isJsonExist); Assertions.assertThat(fs.exists(src)) - .describedAs("Renamed Destination directory should exist.") + .describedAs("Renamed Source directory should not exist.") .isEqualTo(isSrcExist); + Assertions.assertThat(fs.exists(dst)) + .describedAs("Renamed Destination directory should exist.") + .isEqualTo(isDstExist); } /** @@ -2087,8 +2089,8 @@ public AzureBlobFileSystem createJsonFile(Path path, Path renameJson) fs.setWorkingDirectory(new Path(ROOT_PATH)); fs.create(new Path(path, "file.txt")); - AzureBlobFileSystemStore.VersionedFileStatus fileStatus - = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path); + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path); new RenameAtomicity(path, new Path("/hbase/test4"), renameJson, getTestTracingContext(fs, true), @@ -2301,8 +2303,8 @@ public void testListCrashRecoveryWithMultipleJsonFile() throws Exception { fs.create(new Path(path2, "file3.txt")); Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX); - AzureBlobFileSystemStore.VersionedFileStatus fileStatus - = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path2); + VersionedFileStatus fileStatus + = (VersionedFileStatus) fs.getFileStatus(path2); new RenameAtomicity(path2, new Path("/hbase/test4"), renameJson2, getTestTracingContext(fs, true), diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index a92fd6f6ca1..a6652fe85f1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -44,8 +44,10 @@ import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH; @@ -75,7 +77,7 @@ private AbfsClientTestUtil() { } - public static void setMockAbfsRestOperationForListPathOperation( + public static void setMockAbfsRestOperationForListOperation( final AbfsClient spiedClient, FunctionRaisingIOE<AbfsJdkHttpOperation, AbfsJdkHttpOperation> functionRaisingIOE) throws Exception { @@ -91,13 +93,30 @@ public static void setMockAbfsRestOperationForListPathOperation( new ArrayList<>(), spiedClient.getAbfsConfiguration() )); + ListResponseData listResponseData1 = Mockito.spy(new ListResponseData()); + listResponseData1.setRenamePendingJsonPaths(null); + listResponseData1.setOp(abfsRestOperation); + listResponseData1.setFileStatusList(new ArrayList<>()); + listResponseData1.setContinuationToken(TEST_CONTINUATION_TOKEN); + + ListResponseData listResponseData2 = Mockito.spy(new ListResponseData()); + listResponseData2.setRenamePendingJsonPaths(null); + listResponseData2.setOp(abfsRestOperation); + listResponseData2.setFileStatusList(new ArrayList<>()); + listResponseData2.setContinuationToken(EMPTY_STRING); Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation( eq(AbfsRestOperationType.ListPaths), any(), any(), any()); + Mockito.doReturn(abfsRestOperation).when(spiedClient).getAbfsRestOperation( + eq(AbfsRestOperationType.ListBlobs), any(), any(), any()); - addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + addGeneralMockBehaviourToAbfsClient(spiedClient, exponentialRetryPolicy, staticRetryPolicy, intercept, listResponseData1); addGeneralMockBehaviourToRestOpAndHttpOp(abfsRestOperation, httpOperation); + Mockito.doReturn(listResponseData1).doReturn(listResponseData2) + .when(spiedClient) + .parseListPathResults(any(), any()); + functionRaisingIOE.apply(httpOperation); } @@ -204,7 +223,8 @@ public static void addMockBehaviourToRestOpAndHttpOp(final AbfsRestOperation abf public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClient, final ExponentialRetryPolicy exponentialRetryPolicy, final StaticRetryPolicy staticRetryPolicy, - final AbfsThrottlingIntercept intercept) throws IOException, URISyntaxException { + final AbfsThrottlingIntercept intercept, + final ListResponseData listResponseData) throws IOException, URISyntaxException { Mockito.doReturn(OAuth).when(abfsClient).getAuthType(); Mockito.doReturn("").when(abfsClient).getAccessToken(); AbfsConfiguration abfsConfiguration = Mockito.mock(AbfsConfiguration.class); @@ -217,6 +237,7 @@ public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClie .when(intercept) .sendingRequest(any(), nullable(AbfsCounters.class)); Mockito.doNothing().when(intercept).updateMetrics(any(), any()); + Mockito.doReturn(listResponseData).when(abfsClient).parseListPathResults(any(), any()); // Returning correct retry policy based on failure reason Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getExponentialRetryPolicy(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index 8ee3a71f358..4aaa53003d4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -201,7 +201,8 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); AbfsThrottlingIntercept intercept = Mockito.mock( AbfsThrottlingIntercept.class); - addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept, Mockito.mock( + ListResponseData.class)); AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.ReadFile, @@ -289,7 +290,8 @@ private void testClientRequestIdForStatusRetry(int status, StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); AbfsThrottlingIntercept intercept = Mockito.mock( AbfsThrottlingIntercept.class); - addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept, Mockito.mock( + ListResponseData.class)); // Create a readfile operation that will fail AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( @@ -356,7 +358,8 @@ private void testClientRequestIdForTimeoutRetry(Exception[] exceptions, StaticRetryPolicy staticRetryPolicy = Mockito.mock(StaticRetryPolicy.class); AbfsThrottlingIntercept intercept = Mockito.mock( AbfsThrottlingIntercept.class); - addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept); + addGeneralMockBehaviourToAbfsClient(abfsClient, exponentialRetryPolicy, staticRetryPolicy, intercept, Mockito.mock( + ListResponseData.class)); AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation( AbfsRestOperationType.ReadFile, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java index f9c774b7c32..f67ba45360c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestListActionTaker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.net.URI; import java.util.List; import org.assertj.core.api.Assertions; @@ -63,9 +64,11 @@ public void testProducerResumeOnlyOnConsumerLagBecomesTolerable() throws Mockito.doReturn(DEFAULT_FS_AZURE_PRODUCER_QUEUE_MAX_SIZE) .when(abfsConfiguration) .getProducerQueueMaxSize(); + ListResponseData listResponseData = Mockito.mock(ListResponseData.class); AbfsRestOperation op = Mockito.mock(AbfsRestOperation.class); AbfsHttpOperation httpOperation = Mockito.mock(AbfsHttpOperation.class); Mockito.doReturn(httpOperation).when(op).getResult(); + Mockito.doReturn(op).when(listResponseData).getOp(); BlobListResultSchema listResultSchema = Mockito.mock( BlobListResultSchema.class); Mockito.doReturn(listResultSchema) @@ -132,10 +135,10 @@ protected void addPaths(final List<Path> paths, occurrences[0]++; Assertions.assertThat((int) answer.getArgument(2)) .isEqualTo(DEFAULT_AZURE_LIST_MAX_RESULTS); - return op; + return listResponseData; }).when(client) .listPath(Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyInt(), - Mockito.nullable(String.class), Mockito.any(TracingContext.class)); + Mockito.nullable(String.class), Mockito.any(TracingContext.class), Mockito.nullable(URI.class)); listActionTaker.listRecursiveAndTakeAction(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java index 3becb274a48..df65f7f19e2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java @@ -73,7 +73,7 @@ public static boolean isImplicitDirectory(Path path, AzureBlobFileSystem fs, // 2nd condition: listPaths should return some entries. AbfsRestOperation op = client.listPath( - relativePath, false, 1, null, testTracingContext, false); + relativePath, false, 1, null, testTracingContext, null, false).getOp(); if (op != null && op.getResult() != null) { int listSize = op.getResult().getListResultSchema().paths().size(); if (listSize > 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org