This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 66adc68b620 HADOOP-19572. [ABFS][BugFix] Empty Page Issue on Subsequent ListBlob call with NextMarker (#7698) 66adc68b620 is described below commit 66adc68b62058a63598172cb14bf514287f6584a Author: Anuj Modi <anujmodi2...@gmail.com> AuthorDate: Wed May 21 21:53:00 2025 +0530 HADOOP-19572. [ABFS][BugFix] Empty Page Issue on Subsequent ListBlob call with NextMarker (#7698) Contributed by Anuj Modi Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi Signed off by Anuj Mod<anujm...@apache.org> --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 11 +-- .../fs/azurebfs/services/AbfsBlobClient.java | 92 ++++++++++--------- .../hadoop/fs/azurebfs/services/AbfsClient.java | 70 +++++++-------- .../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 16 ++++ .../ITestAzureBlobFileSystemDelegationSAS.java | 4 +- .../azurebfs/ITestAzureBlobFileSystemDelete.java | 2 +- .../ITestAzureBlobFileSystemFileStatus.java | 2 +- .../ITestAzureBlobFileSystemListStatus.java | 100 +++++++++++++++++++++ .../azurebfs/contract/TestBlobListXmlParser.java | 42 ++++++++- .../fs/azurebfs/services/ITestAbfsClient.java | 81 +++++++++++++++++ .../fs/azurebfs/utils/DirectoryStateHelper.java | 2 +- 11 files changed, 324 insertions(+), 98 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 23212cd80ae..e3df8034805 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -76,7 +76,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; -import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; @@ -118,7 +117,6 @@ import org.apache.hadoop.fs.azurebfs.utils.CRC64; import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils; import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; -import org.apache.hadoop.fs.azurebfs.utils.ListUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.impl.BackReference; @@ -1298,13 +1296,8 @@ public String listStatus(final Path path, final String startFrom, } } while (shouldContinue); - if (listingClient instanceof AbfsBlobClient) { - fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList)); - LOG.debug("ListBlob API returned a total of {} elements including duplicates." - + "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size()); - } else { - fileStatuses.addAll(fileStatusList); - } + fileStatuses.addAll(listingClient.postListProcessing( + relativePath, fileStatusList, tracingContext, uri)); return continuation; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 7c4fc69bfc6..5d8c2efb94e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -51,6 +51,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; @@ -77,6 +78,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; +import org.apache.hadoop.fs.azurebfs.utils.ListUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_CONFLICT; @@ -348,13 +350,9 @@ public AbfsRestOperation deleteFilesystem(TracingContext tracingContext) */ @Override public ListResponseData listPath(final String relativePath, final boolean recursive, - final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException { - return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true); - } - - public ListResponseData listPath(final String relativePath, final boolean recursive, - final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired) + final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException { + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); @@ -400,37 +398,46 @@ public ListResponseData listPath(final String relativePath, final boolean recurs listResponseData.setOp(retryListOp); } } + return listResponseData; + } - if (isEmptyListResults(listResponseData) && is404CheckRequired) { + /** + * Post-processing of the list operation on Blob endpoint. + * There are two client handing to be done on list output. + * 1. Empty List returned on server could potentially mean path is a file. + * 2. There can be duplicates returned from the server for explicit non-empty directory. + * @param relativePath relative path to be listed. + * @param fileStatuses list of file statuses returned from the server. + * @param tracingContext tracing context to trace server calls. + * @param uri URI to be used for path conversion. + * @return rectified list of file statuses. + * @throws AzureBlobFileSystemException if any failure occurs. + */ + @Override + public List<FileStatus> postListProcessing(String relativePath, List<FileStatus> fileStatuses, + TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException { + List<FileStatus> rectifiedFileStatuses = new ArrayList<>(); + if (fileStatuses.isEmpty() && !ROOT_PATH.equals(relativePath)) { // If the list operation returns no paths, we need to check if the path is a file. // If it is a file, we need to return the file in the list. + // If it is a directory or root path, we need to return an empty list. // If it is a non-existing path, we need to throw a FileNotFoundException. - if (relativePath.equals(ROOT_PATH)) { - // Root Always exists as directory. It can be an empty listing. - return listResponseData; - } AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false); BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus); - LOG.debug("ListBlob attempted on a file path. Returning file status."); - List<VersionedFileStatus> fileStatusList = new ArrayList<>(); + LOG.debug("ListStatus attempted on a file path {}. Returning file status.", relativePath); for (BlobListResultEntrySchema entry : listResultSchema.paths()) { - fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri)); + rectifiedFileStatuses.add(getVersionedFileStatusFromEntry(entry, uri)); } - AbfsRestOperation listOp = getAbfsRestOperation( - AbfsRestOperationType.ListBlobs, - HTTP_METHOD_GET, - url, - requestHeaders); - listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema); - listResponseData.setFileStatusList(fileStatusList); - listResponseData.setContinuationToken(null); - listResponseData.setRenamePendingJsonPaths(null); - listResponseData.setOp(listOp); - return listResponseData; + } else { + // Remove duplicates from the non-empty list output only. + rectifiedFileStatuses.addAll(ListUtils.getUniqueListResult(fileStatuses)); + LOG.debug( + "ListBlob API returned a total of {} elements including duplicates." + + "Number of unique Elements are {}", fileStatuses.size(), + rectifiedFileStatuses.size()); } - return listResponseData; + return rectifiedFileStatuses; } - /** * Filter the paths for which no rename redo operation is performed. * Update BlobListResultSchema path with filtered entries. @@ -2013,6 +2020,8 @@ private static String decodeMetadataAttribute(String encoded) /** * Checks if the listing of the specified path is non-empty. + * Since listing is incomplete as long as continuation token is returned by server, + * we need to iterate until either we get one entry or continuation token becomes null. * * @param path The path to be listed. * @param tracingContext The tracing context for tracking the operation. @@ -2024,26 +2033,15 @@ public boolean isNonEmptyDirectory(String path, TracingContext tracingContext) throws AzureBlobFileSystemException { // This method is only called internally to determine state of a path // and hence don't need identity transformation to happen. - ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false); - return !isEmptyListResults(listResponseData); - } - - /** - * Check if the list call returned empty results without any continuation token. - * @param listResponseData The response of listing API from the server. - * @return True if empty results without continuation token. - */ - private boolean isEmptyListResults(ListResponseData listResponseData) { - AbfsHttpOperation result = listResponseData.getOp().getResult(); - boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful - result.getListResultSchema() != null && // Parsing of list response was successful - listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned - StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned - if (isEmptyList) { - LOG.debug("List call returned empty results without any continuation token."); - return true; - } - return false; + String continuationToken = null; + List<FileStatus> fileStatusList = new ArrayList<>(); + // We need to loop on continuation token until we get an entry or continuation token becomes null. + do { + ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null); + fileStatusList.addAll(listResponseData.getFileStatusList()); + continuationToken = listResponseData.getContinuationToken(); + } while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty()); + return !fileStatusList.isEmpty(); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index c13538b11ed..64d33d40a67 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -51,6 +51,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -164,6 +165,8 @@ public abstract class AbfsClient implements Closeable { private String clientProvidedEncryptionKey = null; private String clientProvidedEncryptionKeySHA = null; private final IdentityTransformerInterface identityTransformer; + private final String userName; + private String primaryUserGroup; private final String accountName; private final AuthType authType; @@ -305,6 +308,22 @@ private AbfsClient(final URL baseUrl, throw new IOException(e); } LOG.trace("IdentityTransformer init complete"); + + UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); + this.userName = userGroupInformation.getShortUserName(); + LOG.trace("UGI init complete"); + if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) { + try { + this.primaryUserGroup = userGroupInformation.getPrimaryGroupName(); + } catch (IOException ex) { + LOG.error("Failed to get primary group for {}, using user name as primary group name", userName); + this.primaryUserGroup = userName; + } + } else { + //Provide a default group name + this.primaryUserGroup = userName; + } + LOG.trace("primaryUserGroup is {}", this.primaryUserGroup); } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, @@ -524,6 +543,18 @@ public abstract AbfsRestOperation setFilesystemProperties(Hashtable<String, Stri public abstract ListResponseData listPath(String relativePath, boolean recursive, int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException; + /** + * Post-processing of the list operation. + * @param relativePath which is used to list the blobs. + * @param fileStatuses list of file statuses to be processed. + * @param tracingContext for tracing the server calls. + * @param uri to be used for the path conversion. + * @return list of file statuses to be returned. + * @throws AzureBlobFileSystemException if rest operation fails. + */ + public abstract List<FileStatus> postListProcessing(String relativePath, + List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException; + /** * Retrieves user-defined metadata on filesystem. * @param tracingContext for tracing the server calls. @@ -1772,37 +1803,6 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy return successOp; } - /** - * Get the primary user group name. - * @return primary user group name - * @throws AzureBlobFileSystemException if unable to get the primary user group - */ - private String getPrimaryUserGroup() throws AzureBlobFileSystemException { - if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) { - try { - return UserGroupInformation.getCurrentUser().getPrimaryGroupName(); - } catch (IOException ex) { - LOG.error("Failed to get primary group for {}, using user name as primary group name", - getPrimaryUser()); - } - } - //Provide a default group name - return getPrimaryUser(); - } - - /** - * Get the primary username. - * @return primary username - * @throws AzureBlobFileSystemException if unable to get the primary user - */ - private String getPrimaryUser() throws AzureBlobFileSystemException { - try { - return UserGroupInformation.getCurrentUser().getUserName(); - } catch (IOException ex) { - throw new AbfsDriverException(ex); - } - } - /** * Creates a VersionedFileStatus object from the ListResultEntrySchema. * @param entry ListResultEntrySchema object. @@ -1816,10 +1816,10 @@ protected VersionedFileStatus getVersionedFileStatusFromEntry( String owner = null, group = null; try{ if (identityTransformer != null) { - owner = identityTransformer.transformIdentityForGetRequest( - entry.owner(), true, getPrimaryUser()); - group = identityTransformer.transformIdentityForGetRequest( - entry.group(), false, getPrimaryUserGroup()); + owner = identityTransformer.transformIdentityForGetRequest(entry.owner(), + true, userName); + group = identityTransformer.transformIdentityForGetRequest(entry.group(), + false, primaryUserGroup); } } catch (IOException ex) { LOG.error("Failed to get owner/group for path {}", entry.name(), ex); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 4080299e067..2b1fa877563 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -45,6 +45,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -348,6 +349,21 @@ public ListResponseData listPath(final String relativePath, return listResponseData; } + /** + * Non-functional implementation. + * Client side handling to remove duplicates not needed in DFSClient. + * @param relativePath on which listing was attempted. + * @param fileStatuses result of listing operation. + * @param tracingContext for tracing the server calls. + * @param uri to be used for path conversion. + * @return fileStatuses as it is without any processing. + */ + @Override + public List<FileStatus> postListProcessing(String relativePath, + List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri){ + return fileStatuses; + } + /** * Get Rest Operation for API * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create"> diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java index ed6ffc00f3f..378655405da 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java @@ -445,8 +445,8 @@ public void testSetFileSystemProperties() throws Exception { //Test list and delete operation on implicit paths public void testListAndDeleteImplicitPaths() throws Exception { AzureBlobFileSystem fs = getFileSystem(); - AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient()); assumeBlobServiceType(); + AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient()); Path file1 = new Path("/testDir/dir1/file1"); Path file2 = new Path("/testDir/dir1/file2"); @@ -458,7 +458,7 @@ public void testListAndDeleteImplicitPaths() throws Exception { AbfsRestOperation op = client.listPath( implicitDir.toString(), false, 2, null, - getTestTracingContext(getFileSystem(), false), null, false).getOp(); + getTestTracingContext(getFileSystem(), false), null).getOp(); List<? extends ListResultEntrySchema> list = op.getResult() .getListResultSchema() .paths(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java index e20a2c43279..10d6606d243 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java @@ -316,7 +316,7 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception { doCallRealMethod().when((AbfsBlobClient) mockClient) .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(), Mockito.anyInt(), Mockito.nullable(String.class), - Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class), Mockito.anyBoolean()); + Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class)); doCallRealMethod().when((AbfsBlobClient) mockClient) .getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class), Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java index 2d7298f1e1b..2075ba81b72 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java @@ -259,7 +259,7 @@ public void testListStatusIsCalledForImplicitPathOnBlobEndpoint() throws Excepti fs.getFileStatus(implicitPath); Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), eq(false), any(), any()); - Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any(), eq(false)); + Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any()); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java index d415dd25334..a533f352a97 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java @@ -51,10 +51,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType; import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; +import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus; import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; @@ -62,6 +64,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; @@ -373,7 +376,104 @@ public void testRenameTrailingPeriodFile() throws IOException { + " throw IllegalArgumentException", exceptionThrown); } + @Test + public void testEmptyListingInSubsequentCall() throws IOException { + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING, + true, 1, 0); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING, + false, 1, 0); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN, + true, 1, 0); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN, + false, 1, 0); + + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING, + true, 2, 0); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING, + false, 2, 1); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN, + true, 3, 0); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN, + false, 3, 1); + + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING, + true, 1, 1); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING, + false, 1, 1); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN, + true, 1, 1); + testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN, + false, 1, 1); + + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING, + true, 2, 1); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING, + false, 2, 2); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN, + true, 3, 1); + testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN, + false, 3, 2); + } + private void testEmptyListingInSubsequentCallInternal(String firstCT, + boolean isfirstEmpty, String secondCT, boolean isSecondEmpty, + int expectedInvocations, int expectedSize) throws IOException { + assumeBlobServiceType(); + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + spiedStore.getAbfsConfiguration().setListMaxResults(1); + AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient()); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + spiedFs.mkdirs(new Path("/testPath")); + VersionedFileStatus status1 = new VersionedFileStatus( + "owner", "group", null, false, 0, false, 0, 0, 0, + new Path("/testPath/file1"), "version", "encryptionContext"); + VersionedFileStatus status2 = new VersionedFileStatus( + "owner", "group", null, false, 0, false, 0, 0, 0, + new Path("/testPath/file2"), "version", "encryptionContext"); + + List<VersionedFileStatus> mockedList1 = new ArrayList<>(); + mockedList1.add(status1); + List<VersionedFileStatus> mockedList2 = new ArrayList<>(); + mockedList2.add(status2); + + ListResponseData listResponseData1 = new ListResponseData(); + listResponseData1.setContinuationToken(firstCT); + listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1); + listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class)); + + ListResponseData listResponseData2 = new ListResponseData(); + listResponseData2.setContinuationToken(secondCT); + listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2); + listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class)); + + ListResponseData listResponseData3 = new ListResponseData(); + listResponseData3.setContinuationToken(EMPTY_STRING); + listResponseData3.setFileStatusList(new ArrayList<>()); + listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class)); + + Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3) + .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1), + any(), any(), any()); + + FileStatus[] list = spiedFs.listStatus(new Path("/testPath")); + + Mockito.verify(spiedClient, times(expectedInvocations)) + .listPath(eq("/testPath"), eq(false), eq(1), + any(), any(TracingContext.class), any()); + Mockito.verify(spiedClient, times(1)) + .postListProcessing(eq("/testPath"), any(), any(), any()); + Assertions.assertThat(list).hasSize(expectedSize); + + if (expectedSize == 0) { + Mockito.verify(spiedClient, times(1)) + .getPathStatus(eq("/testPath"), any(), eq(null), eq(false)); + } else { + Mockito.verify(spiedClient, times(0)) + .getPathStatus(eq("/testPath"), any(), eq(null), eq(false)); + } + } /** * Test to verify that listStatus returns the correct file status all types diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/TestBlobListXmlParser.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/TestBlobListXmlParser.java index 6dabd2467e8..2b7e04f45df 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/TestBlobListXmlParser.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/TestBlobListXmlParser.java @@ -101,7 +101,7 @@ public void testXMLParser() throws Exception { + "<OrMetadata />" + "</Blob>" + "</Blobs>" - + "<NextMarker />" + + "<NextMarker>TEST_CONTINUATION_TOKEN</NextMarker>" + "</EnumerationResults>"; BlobListResultSchema listResultSchema = getResultSchema(xmlResponseWithDelimiter); List<BlobListResultEntrySchema> paths = listResultSchema.paths(); @@ -110,10 +110,11 @@ public void testXMLParser() throws Exception { Assertions.assertThat(paths.get(1).isDirectory()).isEqualTo(true); Assertions.assertThat(paths.get(2).isDirectory()).isEqualTo(true); Assertions.assertThat(paths.get(3).isDirectory()).isEqualTo(false); + Assertions.assertThat(listResultSchema.getNextMarker()).isNotNull(); } @Test - public void testEmptyBlobList() throws Exception { + public void testEmptyBlobListNullCT() throws Exception { String xmlResponse = "" + "<?xml version=\"1.0\" encoding=\"utf-8\"?><" + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">" @@ -124,6 +125,43 @@ public void testEmptyBlobList() throws Exception { BlobListResultSchema listResultSchema = getResultSchema(xmlResponse); List<BlobListResultEntrySchema> paths = listResultSchema.paths(); Assertions.assertThat(paths.size()).isEqualTo(0); + Assertions.assertThat(listResultSchema.getNextMarker()).isNull(); + } + + @Test + public void testEmptyBlobListValidCT() throws Exception { + String xmlResponse = "" + + "<?xml version=\"1.0\" encoding=\"utf-8\"?><" + + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">" + + "<Prefix>abc/</Prefix>" + + "<Delimiter>/</Delimiter>" + + "<Blobs />" + + "<NextMarker>TEST_CONTINUATION_TOKEN</NextMarker>" + + "</EnumerationResults>"; + BlobListResultSchema listResultSchema = getResultSchema(xmlResponse); + List<BlobListResultEntrySchema> paths = listResultSchema.paths(); + Assertions.assertThat(paths.size()).isEqualTo(0); + Assertions.assertThat(listResultSchema.getNextMarker()).isNotNull(); + } + + @Test + public void testNonEmptyBlobListNullCT() throws Exception { + String xmlResponse = "" + + "<?xml version=\"1.0\" encoding=\"utf-8\"?><" + + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">" + + "<Prefix>abc/</Prefix>" + + "<Delimiter>/</Delimiter>" + + "<Blobs>" + + "<BlobPrefix>" + + "<Name>bye/</Name>" + + "</BlobPrefix>" + + "</Blobs>" + + "<NextMarker />" + + "</EnumerationResults>"; + BlobListResultSchema listResultSchema = getResultSchema(xmlResponse); + List<BlobListResultEntrySchema> paths = listResultSchema.paths(); + Assertions.assertThat(paths.size()).isEqualTo(1); + Assertions.assertThat(listResultSchema.getNextMarker()).isNull(); } private static final ThreadLocal<SAXParser> SAX_PARSER_THREAD_LOCAL diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index a786b77a793..1083df3ba57 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; @@ -42,6 +43,7 @@ import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore; import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation; import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; @@ -58,6 +60,7 @@ import org.apache.http.HttpResponse; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH; @@ -75,6 +78,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION; @@ -729,4 +733,81 @@ public void testExpectHundredContinue() throws Exception { .describedAs("The expect header is not false") .isFalse(); } + + @Test + public void testIsNonEmptyDirectory() throws IOException { + testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING, + true, 1, false); + testIsNonEmptyDirectoryInternal(EMPTY_STRING, false, EMPTY_STRING, + false, 1, true); + + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING, + true, 2, false); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING, + false, 2, true); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN, + true, 3, false); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN, + false, 2, true); + + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING, + true, 1, true); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING, + false, 1, true); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN, + true, 1, true); + testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN, + false, 1, true); + } + + private void testIsNonEmptyDirectoryInternal(String firstCT, + boolean isfirstEmpty, String secondCT, boolean isSecondEmpty, + int expectedInvocations, boolean isNonEmpty) throws IOException { + + assumeBlobServiceType(); + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient()); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + VersionedFileStatus status1 = new VersionedFileStatus( + "owner", "group", null, false, 0, false, 0, 0, 0, + new Path("/testPath/file1"), "version", "encryptionContext"); + VersionedFileStatus status2 = new VersionedFileStatus( + "owner", "group", null, false, 0, false, 0, 0, 0, + new Path("/testPath/file2"), "version", "encryptionContext"); + + List<VersionedFileStatus> mockedList1 = new ArrayList<>(); + mockedList1.add(status1); + List<VersionedFileStatus> mockedList2 = new ArrayList<>(); + mockedList2.add(status2); + + ListResponseData listResponseData1 = new ListResponseData(); + listResponseData1.setContinuationToken(firstCT); + listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1); + listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class)); + + ListResponseData listResponseData2 = new ListResponseData(); + listResponseData2.setContinuationToken(secondCT); + listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2); + listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class)); + + ListResponseData listResponseData3 = new ListResponseData(); + listResponseData3.setContinuationToken(EMPTY_STRING); + listResponseData3.setFileStatusList(new ArrayList<>()); + listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class)); + + Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3) + .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1), + any(), any(), any()); + + Assertions.assertThat(spiedClient.isNonEmptyDirectory("/testPath", + Mockito.mock(TracingContext.class))) + .describedAs("isNonEmptyDirectory in client giving unexpected results") + .isEqualTo(isNonEmpty); + + Mockito.verify(spiedClient, times(expectedInvocations)) + .listPath(eq("/testPath"), eq(false), eq(1), + any(), any(TracingContext.class), any()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java index df65f7f19e2..22eb8e1e68a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java @@ -73,7 +73,7 @@ public static boolean isImplicitDirectory(Path path, AzureBlobFileSystem fs, // 2nd condition: listPaths should return some entries. AbfsRestOperation op = client.listPath( - relativePath, false, 1, null, testTracingContext, null, false).getOp(); + relativePath, false, 1, null, testTracingContext, null).getOp(); if (op != null && op.getResult() != null) { int listSize = op.getResult().getListResultSchema().paths().size(); if (listSize > 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org