This is an automated email from the ASF dual-hosted git repository. anujmodi pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 2c0c1299fef HADOOP-19445: ABFS: [FnsOverBlob][Tests] Add Tests For Negative Scenarios Identified for Rename Operation (#7386) (#7525) 2c0c1299fef is described below commit 2c0c1299fef94c04d31684b940a03c9e862eccae Author: Manish Bhatt <52626736+bhattmanis...@users.noreply.github.com> AuthorDate: Thu Mar 20 05:01:29 2025 -0700 HADOOP-19445: ABFS: [FnsOverBlob][Tests] Add Tests For Negative Scenarios Identified for Rename Operation (#7386) (#7525) Contributed by Manish Bhatt Reviewed by Anmol Asrani, Anuj Modi Signed off by: Anuj Modi <anujm...@apache.org> --- .../contracts/services/AzureServiceErrorCode.java | 1 - .../fs/azurebfs/services/AbfsBlobClient.java | 57 +- .../fs/azurebfs/services/BlobRenameHandler.java | 25 +- .../fs/azurebfs/services/ListActionTaker.java | 12 +- .../apache/hadoop/fs/azurebfs/utils/UriUtils.java | 9 +- .../azurebfs/ITestAzureBlobFileSystemRename.java | 915 ++++++++++++++++++++- .../ITestAzureBlobFileSystemRenameUnicode.java | 17 - 7 files changed, 960 insertions(+), 76 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index e60e645b3bb..ce03d794ddb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -56,7 +56,6 @@ public enum AzureServiceErrorCode { OTHER_SERVER_THROTTLING("ServerBusy", HttpURLConnection.HTTP_UNAVAILABLE, "The server is currently unable to receive requests. Please retry your request."), INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), - INVALID_RENAME_DESTINATION("InvalidRenameDestinationPath", HttpURLConnection.HTTP_BAD_REQUEST, null), AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null), MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 7325a40ce85..08c78566e13 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -83,8 +83,8 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; -import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ACQUIRE_LEASE_ACTION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.AND_MARK; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_BLOB_TYPE; @@ -424,7 +424,7 @@ private void fixAtomicEntriesInListResults(final AbfsRestOperation op, } List<BlobListResultEntrySchema> filteredEntries = new ArrayList<>(); for (BlobListResultEntrySchema entry : listResultSchema.paths()) { - if (!takeListPathAtomicRenameKeyAction(entry.path(), + if (!takeListPathAtomicRenameKeyAction(entry.path(), entry.isDirectory(), entry.contentLength().intValue(), tracingContext)) { filteredEntries.add(entry); } @@ -442,8 +442,12 @@ public void createNonRecursivePreCheck(Path parentPath, if (isAtomicRenameKey(parentPath.toUri().getPath())) { takeGetPathStatusAtomicRenameKeyAction(parentPath, tracingContext); } - getPathStatus(parentPath.toUri().getPath(), false, - tracingContext, null); + try { + getPathStatus(parentPath.toUri().getPath(), false, + tracingContext, null); + } finally { + getAbfsCounters().incrementCounter(CALL_GET_FILE_STATUS, 1); + } } catch (AbfsRestOperationException ex) { if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { throw new FileNotFoundException("Cannot create file " @@ -451,8 +455,6 @@ public void createNonRecursivePreCheck(Path parentPath, + " because parent folder does not exist."); } throw ex; - } finally { - getAbfsCounters().incrementCounter(CALL_GET_FILE_STATUS, 1); } } @@ -807,23 +809,26 @@ public AbfsClientRenameResult renamePath(final String source, BlobRenameHandler blobRenameHandler = getBlobRenameHandler(source, destination, sourceEtag, isAtomicRenameKey(source), tracingContext ); - incrementAbfsRenamePath(); - if (blobRenameHandler.execute()) { - final AbfsUriQueryBuilder abfsUriQueryBuilder - = createDefaultUriQueryBuilder(); - final URL url = createRequestUrl(destination, - abfsUriQueryBuilder.toString()); - final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); - final AbfsRestOperation successOp = getSuccessOp( - AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT, - url, requestHeaders); - return new AbfsClientRenameResult(successOp, true, false); - } else { - throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR, - AzureServiceErrorCode.UNKNOWN.getErrorCode(), - ERR_RENAME_BLOB + source + SINGLE_WHITE_SPACE + AND_MARK - + SINGLE_WHITE_SPACE + destination, - null); + try { + if (blobRenameHandler.execute()) { + final AbfsUriQueryBuilder abfsUriQueryBuilder + = createDefaultUriQueryBuilder(); + final URL url = createRequestUrl(destination, + abfsUriQueryBuilder.toString()); + final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); + final AbfsRestOperation successOp = getSuccessOp( + AbfsRestOperationType.RenamePath, HTTP_METHOD_PUT, + url, requestHeaders); + return new AbfsClientRenameResult(successOp, true, false); + } else { + throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR, + AzureServiceErrorCode.UNKNOWN.getErrorCode(), + ERR_RENAME_BLOB + source + SINGLE_WHITE_SPACE + AND_MARK + + SINGLE_WHITE_SPACE + destination, + null); + } + } finally { + incrementAbfsRenamePath(); } } @@ -1817,11 +1822,11 @@ public void takeGetPathStatusAtomicRenameKeyAction(final Path path, * @throws AzureBlobFileSystemException server error */ private boolean takeListPathAtomicRenameKeyAction(final Path path, - final int renamePendingJsonLen, + final boolean isDirectory, final int renamePendingJsonLen, final TracingContext tracingContext) throws AzureBlobFileSystemException { if (path == null || path.isRoot() || !isAtomicRenameKey( - path.toUri().getPath()) || !path.toUri() + path.toUri().getPath()) || isDirectory || !path.toUri() .getPath() .endsWith(RenameAtomicity.SUFFIX)) { return false; @@ -1849,7 +1854,7 @@ private boolean takeListPathAtomicRenameKeyAction(final Path path, } @VisibleForTesting - RenameAtomicity getRedoRenameAtomicity(final Path renamePendingJsonPath, + public RenameAtomicity getRedoRenameAtomicity(final Path renamePendingJsonPath, int fileLen, final TracingContext tracingContext) { return new RenameAtomicity(renamePendingJsonPath, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java index 695c0694cf5..f78228bfcff 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java @@ -31,7 +31,6 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; @@ -257,7 +256,7 @@ private boolean containsColon(Path p) { private boolean preCheck(final Path src, final Path dst, final PathInformation pathInformation) throws AzureBlobFileSystemException { - validateDestinationPath(src, dst); + validateDestinationIsNotSubDir(src, dst); validateSourcePath(pathInformation); validateDestinationPathNotExist(src, dst, pathInformation); validateDestinationParentExist(src, dst, pathInformation); @@ -265,28 +264,6 @@ private boolean preCheck(final Path src, final Path dst, return true; } - /** - * Validate if the format of the destination path is correct and if the destination - * path is not a sub-directory of the source path. - * - * @param src source path - * @param dst destination path - * - * @throws AbfsRestOperationException if the destination path is invalid - */ - private void validateDestinationPath(final Path src, final Path dst) - throws AbfsRestOperationException { - if (containsColon(dst)) { - throw new AbfsRestOperationException( - HttpURLConnection.HTTP_BAD_REQUEST, - AzureServiceErrorCode.INVALID_RENAME_DESTINATION.getErrorCode(), null, - new PathIOException(dst.toUri().getPath(), - "Destination path contains colon")); - } - - validateDestinationIsNotSubDir(src, dst); - } - /** * Validate if the destination path is not a sub-directory of the source path. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java index b87f16ae461..ed3d464e9b6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListActionTaker.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.services.BlobListResultSchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; @@ -119,7 +120,14 @@ private boolean takeAction(List<Path> paths) LOG.debug("Thread interrupted while taking action on path: {}", path.toUri().getPath()); } catch (ExecutionException e) { - executionException = (AzureBlobFileSystemException) e.getCause(); + LOG.debug("Execution exception while taking action on path: {}", + path.toUri().getPath()); + if (e.getCause() instanceof AzureBlobFileSystemException) { + executionException = (AzureBlobFileSystemException) e.getCause(); + } else { + executionException = + new FileSystemOperationUnhandledException(executionException); + } } } if (executionException != null) { @@ -261,7 +269,7 @@ protected String listAndEnqueue(final ListBlobQueue listBlobQueue, protected void addPaths(final List<Path> paths, final ListResultSchema retrievedSchema) { for (ListResultEntrySchema entry : retrievedSchema.paths()) { - Path entryPath = new Path(ROOT_PATH, entry.name()); + Path entryPath = new Path(ROOT_PATH + entry.name()); if (!entryPath.equals(this.path)) { paths.add(entryPath); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java index e77336925f6..beb0d2d35f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/UriUtils.java @@ -248,8 +248,13 @@ private static String replacedUrl(String baseUrl, String oldString, String newSt */ public static boolean isKeyForDirectorySet(String key, Set<String> dirSet) { for (String dir : dirSet) { - if (dir.isEmpty() || key.startsWith( - dir + AbfsHttpConstants.FORWARD_SLASH)) { + // Ensure the directory ends with a forward slash + if (StringUtils.isNotEmpty(dir) + && !dir.endsWith(AbfsHttpConstants.FORWARD_SLASH)) { + dir += AbfsHttpConstants.FORWARD_SLASH; + } + // Return true if the directory is empty or the key starts with the directory + if (dir.isEmpty() || key.startsWith(dir)) { return true; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index afb66c054fc..ad352eef69b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -23,8 +23,10 @@ import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -42,11 +44,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -61,6 +65,7 @@ import org.apache.hadoop.fs.azurebfs.services.RenameAtomicityTestUtils; import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.IOStatistics; @@ -71,7 +76,9 @@ import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_FORBIDDEN; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_PENDING; @@ -79,12 +86,19 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND; import static org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY; @@ -108,6 +122,12 @@ public class ITestAzureBlobFileSystemRename extends private static final int BLOB_COUNT = 11; + private static final int TOTAL_FILES = 25; + + private static final int TOTAL_THREADS_IN_POOL = 5; + + private static final int FAILED_CALL = 15; + public ITestAzureBlobFileSystemRename() throws Exception { super(); } @@ -304,12 +324,69 @@ public void testRenameNotFoundBlobToEmptyRoot() throws Exception { * * @throws Exception if an error occurs during test execution */ - @Test(expected = IOException.class) - public void testRenameBlobToDstWithColonInPath() throws Exception { + @Test + public void testRenameBlobToDstWithColonInSourcePath() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + fs.create(new Path("/src:/file")); + Assertions.assertThat( + fs.rename(new Path("/src:"), new Path("/dst"))) + .describedAs("Rename should succeed") + .isTrue(); + } + + /** + * Tests renaming a source path to a destination path that contains a colon in the path. + * This verifies that the rename operation handles paths with special characters like a colon. + * + * The test creates a source directory and renames it to a destination path that includes a colon, + * ensuring that the operation succeeds without errors. + * + * @throws Exception if an error occurs during test execution + */ + @Test + public void testRenameWithColonInDestinationPath() throws Exception { AzureBlobFileSystem fs = getFileSystem(); - assumeBlobServiceType(); fs.create(new Path("/src")); - fs.rename(new Path("/src"), new Path("/dst:file")); + Assertions.assertThat( + fs.rename(new Path("/src"), new Path("/dst:"))) + .describedAs("Rename should succeed") + .isTrue(); + } + + @Test + public void testRenameWithColonInSourcePath() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + String sourceDirectory = "/src:"; + String destinationDirectory = "/dst"; + String fileName = "file"; + fs.create(new Path(sourceDirectory, fileName)); + fs.create(new Path(sourceDirectory + "/Test:", fileName)); + // Rename from source to destination + Assertions.assertThat( + fs.rename(new Path(sourceDirectory), new Path(destinationDirectory))) + .describedAs("Rename should succeed") + .isTrue(); + Assertions.assertThat( + fs.exists(new Path(sourceDirectory, fileName))) + .describedAs("Source directory should not exist after rename") + .isFalse(); + Assertions.assertThat( + fs.exists(new Path(destinationDirectory, fileName))) + .describedAs("Destination directory should exist after rename") + .isTrue(); + + // Rename from destination to source + Assertions.assertThat( + fs.rename(new Path(destinationDirectory), new Path(sourceDirectory))) + .describedAs("Rename should succeed").isTrue(); + Assertions.assertThat( + fs.exists(new Path(sourceDirectory, fileName))) + .describedAs("Destination directory should exist after rename") + .isTrue(); + Assertions.assertThat( + fs.exists(new Path(destinationDirectory, fileName))) + .describedAs("Source directory should not exist after rename") + .isFalse(); } /** @@ -1655,6 +1732,836 @@ public void testRenameSrcDirDeleteEmitDeletionCountInClientRequestId() fs.rename(new Path(dirPathStr), new Path("/dst/")); } + /** + * Helper method to configure the AzureBlobFileSystem and rename directories. + * + * @param currentFs The current AzureBlobFileSystem to use for renaming. + * @param producerQueueSize Maximum size of the producer queue. + * @param consumerMaxLag Maximum lag allowed for the consumer. + * @param maxThread Maximum threads for the rename operation. + * @param src The source path of the directory to rename. + * @param dst The destination path of the renamed directory. + * @throws IOException If an I/O error occurs during the operation. + */ + private void renameDir(AzureBlobFileSystem currentFs, String producerQueueSize, + String consumerMaxLag, String maxThread, Path src, Path dst) + throws IOException { + Configuration config = createConfig(producerQueueSize, consumerMaxLag, maxThread); + try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config)) { + fs.rename(src, dst); + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Helper method to create the configuration for the AzureBlobFileSystem. + * + * @param producerQueueSize Maximum size of the producer queue. + * @param consumerMaxLag Maximum lag allowed for the consumer. + * @param maxThread Maximum threads for the rename operation. + * @return The configuration object. + */ + private Configuration createConfig(String producerQueueSize, String consumerMaxLag, String maxThread) { + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize); + config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag); + config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread); + return config; + } + + /** + * Helper method to validate that the rename was successful and that the destination exists. + * + * @param fs The AzureBlobFileSystem instance to check the existence on. + * @param dst The destination path. + * @param src The source path. + * @throws IOException If an I/O error occurs during the validation. + */ + private void validateRename(AzureBlobFileSystem fs, Path src, Path dst, + boolean isSrcExist, boolean isDstExist, boolean isJsonExist) + throws IOException { + Assertions.assertThat(fs.exists(dst)) + .describedAs("Renamed Destination directory should exist.") + .isEqualTo(isDstExist); + Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + SUFFIX))) + .describedAs("Renamed Pending Json file should exist.") + .isEqualTo(isJsonExist); + Assertions.assertThat(fs.exists(src)) + .describedAs("Renamed Destination directory should exist.") + .isEqualTo(isSrcExist); + } + + /** + * Test the renaming of a directory with different parallelism configurations. + */ + @Test + public void testRenameDirWithDifferentParallelismConfig() throws Exception { + try (AzureBlobFileSystem currentFs = getFileSystem()) { + assumeBlobServiceType(); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(currentFs, src, TOTAL_FILES); + + // Test renaming with different configurations + renameDir(currentFs, "10", "5", "2", src, dst); + renameDir(currentFs, "100", "5", "2", dst, src); + + String errorMessage = intercept(PathIOException.class, + () -> renameDir(currentFs, "50", "50", "5", src, dst)) + .getMessage(); + + // Validate error message for invalid configuration + Assertions.assertThat(errorMessage) + .describedAs("maxConsumptionLag should be lesser than maxSize") + .contains( + "Invalid configuration value detected for \"fs.azure.blob.dir.list.consumer.max.lag\". " + + "maxConsumptionLag should be lesser than maxSize"); + } + } + + /** + * Helper method to create files in the given directory. + * + * @param fs The AzureBlobFileSystem instance to use for file creation. + * @param src The source path (directory). + * @param numFiles The number of files to create. + * @throws ExecutionException, InterruptedException If an error occurs during file creation. + */ + private void createFiles(AzureBlobFileSystem fs, Path src, int numFiles) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL); + List<Future> futures = new ArrayList<>(); + for (int i = 0; i < numFiles; i++) { + final int iter = i; + Future future = executorService.submit(() -> + fs.create(new Path(src, "file" + iter + ".txt"))); + futures.add(future); + } + for (Future future : futures) { + future.get(); + } + executorService.shutdown(); + } + + /** + * Tests renaming a directory with a failure during the copy operation. + * Simulates an error when copying on the 6th call. + */ + @Test + public void testRenameCopyFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem( + createConfig("5", "3", "2")))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of copy operations + AtomicInteger copyCall = new AtomicInteger(0); + Mockito.doAnswer(copyRequest -> { + if (copyCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_ALREADY_EXISTS.getStatusCode(), + BLOB_ALREADY_EXISTS.getErrorCode(), + BLOB_ALREADY_EXISTS.getErrorMessage(), + new Exception()); + } + copyCall.incrementAndGet(); + return copyRequest.callRealMethod(); + }).when(client).copyBlob(Mockito.any(Path.class), + Mockito.any(Path.class), Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + fs.rename(src, dst); + // Validate copy operation count + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Validate that rename redo operation was triggered + copyCall.set(0); + + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, false, true, true); + + Assertions.assertThat(copyCall.get()) + .describedAs("Copy operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a directory with a failure during the delete operation. + * Simulates an error on the 6th delete operation and verifies the behavior. + */ + @Test + public void testRenameDeleteFailureInBetween() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem( + createConfig("5", "3", "2")))) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(client); + Path src = new Path("/hbase/A1/A2"); + Path dst = new Path("/hbase/A1/A3"); + + // Create sample files in the source directory + createFiles(fs, src, TOTAL_FILES); + + // Track the number of delete operations + AtomicInteger deleteCall = new AtomicInteger(0); + Mockito.doAnswer(deleteRequest -> { + if (deleteCall.get() == FAILED_CALL) { + throw new AbfsRestOperationException( + BLOB_PATH_NOT_FOUND.getStatusCode(), + BLOB_PATH_NOT_FOUND.getErrorCode(), + BLOB_PATH_NOT_FOUND.getErrorMessage(), + new Exception()); + } + deleteCall.incrementAndGet(); + return deleteRequest.callRealMethod(); + }).when(client).deleteBlobPath(Mockito.any(Path.class), + Mockito.anyString(), Mockito.any(TracingContext.class)); + + fs.rename(src, dst); + + // Validate delete operation count + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be less than 10.") + .isLessThan(TOTAL_FILES); + + // Validate that delete redo operation was triggered + deleteCall.set(0); + // Assertions to validate renamed destination and source + validateRename(fs, src, dst, false, true, true); + + Assertions.assertThat(deleteCall.get()) + .describedAs("Delete operation count should be greater than 0.") + .isGreaterThan(0); + + // Validate final state of destination and source + // Validate that delete redo operation was triggered + validateRename(fs, src, dst, false, true, false); + } + } + + /** + * Tests renaming a file or directory when the destination path contains + * a colon (":"). The test ensures that: + * - The source directory exists before the rename. + * - The file is successfully renamed to the destination path. + * - The old source directory no longer exists after the rename. + * - The new destination directory exists after the rename. + * + * @throws Exception if an error occurs during file system operations + */ + @Test + public void testRenameWhenDestinationPathContainsColon() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + fs.setWorkingDirectory(new Path(ROOT_PATH)); + String fileName = "file"; + Path src = new Path("/test1/"); + Path dst = new Path("/test1:/"); + + // Create the file + fs.create(new Path(src, fileName)); + + // Perform the rename operation and validate the results + performRenameAndValidate(fs, src, dst, fileName); + } + + /** + * Performs the rename operation and validates the existence of the directories and files. + * + * @param fs the AzureBlobFileSystem instance + * @param src the source path to be renamed + * @param dst the destination path for the rename + * @param fileName the name of the file to be renamed + */ + private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path dst, String fileName) + throws IOException { + // Assert the source directory exists + Assertions.assertThat(fs.exists(src)) + .describedAs("Old directory should exist before rename") + .isTrue(); + + // Perform rename + fs.rename(src, dst); + + // Assert the destination directory and file exist after rename + Assertions.assertThat(fs.exists(new Path(dst, fileName))) + .describedAs("Rename should be successful") + .isTrue(); + + // Assert the source directory no longer exists + Assertions.assertThat(fs.exists(src)) + .describedAs("Old directory should not exist") + .isFalse(); + + // Assert the new destination directory exists + Assertions.assertThat(fs.exists(dst)) + .describedAs("New directory should exist") + .isTrue(); + } + + /** + * Tests the behavior of the atomic rename key for the root folder + * in Azure Blob File System. The test verifies that the atomic rename key + * returns false for the root folder path. + * + * @throws Exception if an error occurs during the atomic rename key check + */ + @Test + public void testGetAtomicRenameKeyForRootFolder() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + assumeBlobServiceType(); + AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient(); + Assertions.assertThat(abfsBlobClient.isAtomicRenameKey("/hbase")) + .describedAs("Atomic rename key should return false for Root folder") + .isFalse(); + } + + /** + * Tests the behavior of the atomic rename key for non-root folders + * in Azure Blob File System. The test verifies that the atomic rename key + * works for specific folders as defined in the configuration. + * It checks the atomic rename key for various paths, + * ensuring it returns true for matching paths and false for others. + * + * @throws Exception if an error occurs during the atomic rename key check + */ + @Test + public void testGetAtomicRenameKeyForNonRootFolder() throws Exception { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set(FS_AZURE_ATOMIC_RENAME_KEY, "/hbase,/a,/b"); + + final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), config); + assumeBlobServiceType(); + AbfsBlobClient abfsBlobClient = (AbfsBlobClient) fs.getAbfsClient(); + + // Test for various paths + validateAtomicRenameKey(abfsBlobClient, "/hbase1/test", false); + validateAtomicRenameKey(abfsBlobClient, "/hbase/test", true); + validateAtomicRenameKey(abfsBlobClient, "/a/b/c", true); + validateAtomicRenameKey(abfsBlobClient, "/test/a", false); + } + + /** + * Validates the atomic rename key for a specific path. + * + * @param abfsBlobClient the AbfsBlobClient instance + * @param path the path to check for atomic rename key + * @param expected the expected value (true or false) + */ + private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String path, boolean expected) { + Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path)) + .describedAs("Atomic rename key check for path: " + path) + .isEqualTo(expected); + } + + /** + * Helper method to create a json file. + * @param path parent path + * @param renameJson rename json path + * @return file system + * @throws IOException in case of failure + */ + public AzureBlobFileSystem createJsonFile(Path path, Path renameJson) + throws IOException { + final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem()); + assumeBlobServiceType(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + fs.setWorkingDirectory(new Path(ROOT_PATH)); + fs.create(new Path(path, "file.txt")); + + AzureBlobFileSystemStore.VersionedFileStatus fileStatus + = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path); + + new RenameAtomicity(path, new Path("/hbase/test4"), + renameJson, getTestTracingContext(fs, true), + fileStatus.getEtag(), client) + .preRename(); + + Assertions.assertThat(fs.exists(renameJson)) + .describedAs("Rename Pending Json file should exist.") + .isTrue(); + + return fs; + } + + /** + * Test case to verify crash recovery with a single child folder. + * + * This test simulates a scenario where a pending rename JSON file exists for a single child folder + * under the parent directory. It ensures that when listing the files in the parent directory, + * only the child folder (with the pending rename JSON file) is returned, and no additional files are listed. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithSingleChildFolder() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 1 file") + .isEqualTo(1); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify crash recovery with multiple child folders. + * + * This test simulates a scenario where a pending rename JSON file exists, and multiple files are + * created in the parent directory. It ensures that when listing the files in the parent directory, + * the correct number of files is returned, including the pending rename JSON file. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithMultipleChildFolder() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1")); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 3 files") + .isEqualTo(3); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify crash recovery with a pending rename JSON file. + * + * This test simulates a scenario where a pending rename JSON file exists in the parent directory, + * and it ensures that after the deletion of the target directory and creation of new files, + * the listing operation correctly returns the remaining files without considering the pending rename. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithPendingJsonFile() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + fs.delete(path, true); + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 2 files") + .isEqualTo(2); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify crash recovery when no pending rename JSON file exists. + * + * This test simulates a scenario where there is no pending rename JSON file in the directory. + * It ensures that the listing operation correctly returns all files in the parent directory, including + * those created after the rename JSON file is deleted. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + fs.delete(renameJson, true); + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 3 files") + .isEqualTo(3); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify crash recovery when a pending rename JSON directory exists. + * + * This test simulates a scenario where a pending rename JSON directory exists, ensuring that the + * listing operation correctly returns all files in the parent directory without triggering a redo + * rename operation. It also checks that the directory with the suffix "-RenamePending.json" exists. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithPendingJsonDir() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs.mkdirs(renameJson); + + fs.create(new Path(path.getParent(), "file1.txt")); + fs.create(new Path(path, "file2.txt")); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 3 files") + .isEqualTo(3); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("No redo rename call should be made") + .isEqualTo(0); + + Assertions.assertThat( + Arrays.stream(fileStatuses) + .anyMatch(status -> renameJson.toUri().getPath().equals(status.getPath().toUri().getPath()))) + .describedAs("Directory with suffix -RenamePending.json should exist.") + .isTrue(); + } + } + + /** + * Test case to verify crash recovery during listing with multiple pending rename JSON files. + * + * This test simulates a scenario where multiple pending rename JSON files exist, ensuring that + * crash recovery properly handles the situation. It verifies that two redo rename calls are made + * and that the list operation returns the correct number of paths. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testListCrashRecoveryWithMultipleJsonFile() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + + // 1st Json file + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + // 2nd Json file + Path path2 = new Path("/hbase/A1/A3"); + fs.create(new Path(path2, "file3.txt")); + + Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX); + AzureBlobFileSystemStore.VersionedFileStatus fileStatus + = (AzureBlobFileSystemStore.VersionedFileStatus) fs.getFileStatus(path2); + + new RenameAtomicity(path2, new Path("/hbase/test4"), + renameJson2, getTestTracingContext(fs, true), + fileStatus.getEtag(), client).preRename(); + + fs.create(new Path(path, "file2.txt")); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + FileStatus[] fileStatuses = fs.listStatus(path.getParent()); + + Assertions.assertThat(fileStatuses.length) + .describedAs("List should return 2 paths") + .isEqualTo(2); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("2 redo rename calls should be made") + .isEqualTo(2); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify path status when a pending rename JSON file exists. + * + * This test simulates a scenario where a rename operation was pending, and ensures that + * the path status retrieval triggers a redo rename operation. The test also checks that + * the correct error code (`PATH_NOT_FOUND`) is returned. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testGetPathStatusWithPendingJsonFile() throws Exception { + AzureBlobFileSystem fs = null; + try { + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + fs = createJsonFile(path, renameJson); + + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + fs.create(new Path("/hbase/A1/file1.txt")); + fs.create(new Path("/hbase/A1/file2.txt")); + + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + TracingContext tracingContext = new TracingContext( + conf.getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + + AzureServiceErrorCode azureServiceErrorCode = intercept( + AbfsRestOperationException.class, () -> client.getPathStatus( + path.toUri().getPath(), true, + tracingContext, null)).getErrorCode(); + + Assertions.assertThat(azureServiceErrorCode.getErrorCode()) + .describedAs("Path had to be recovered from atomic rename operation.") + .isEqualTo(PATH_NOT_FOUND.getErrorCode()); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("There should be one redo rename call") + .isEqualTo(1); + } finally { + if (fs != null) { + fs.close(); + } + } + } + + /** + * Test case to verify path status when there is no pending rename JSON file. + * + * This test ensures that when no rename pending JSON file is present, the path status is + * successfully retrieved, the ETag is present, and no redo rename operation is triggered. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testGetPathStatusWithoutPendingJsonFile() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); + + Path path = new Path("/hbase/A1/A2"); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); + + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity( + Mockito.any(Path.class), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + + TracingContext tracingContext = new TracingContext( + conf.getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, + null); + + AbfsHttpOperation abfsHttpOperation = client.getPathStatus( + path.toUri().getPath(), true, + tracingContext, null).getResult(); + + Assertions.assertThat(abfsHttpOperation.getStatusCode()) + .describedAs("Path should be found.") + .isEqualTo(HTTP_OK); + + Assertions.assertThat(extractEtagHeader(abfsHttpOperation)) + .describedAs("Etag should be present.") + .isNotNull(); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("There should be no redo rename call.") + .isEqualTo(0); + } + } + + /** + * Test case to verify path status when there is a pending rename JSON directory. + * + * This test simulates the scenario where a directory is created with a rename pending JSON + * file (indicated by a specific suffix). It ensures that the path is found, the ETag is present, + * and no redo rename operation is triggered. It also verifies that the rename pending directory + * exists. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testGetPathStatusWithPendingJsonDir() throws Exception { + try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) { + assumeBlobServiceType(); + + Path path = new Path("/hbase/A1/A2"); + AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs); + + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); + + fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX)); + + AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration(); + + AtomicInteger redoRenameCall = new AtomicInteger(0); + Mockito.doAnswer(answer -> { + redoRenameCall.incrementAndGet(); + return answer.callRealMethod(); + }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class), + Mockito.anyInt(), Mockito.any(TracingContext.class)); + + TracingContext tracingContext = new TracingContext( + conf.getClientCorrelationId(), fs.getFileSystemId(), + FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, null); + + AbfsHttpOperation abfsHttpOperation + = client.getPathStatus(path.toUri().getPath(), true, + tracingContext, null).getResult(); + + Assertions.assertThat(abfsHttpOperation.getStatusCode()) + .describedAs("Path should be found.") + .isEqualTo(HTTP_OK); + + Assertions.assertThat(extractEtagHeader(abfsHttpOperation)) + .describedAs("Etag should be present.") + .isNotNull(); + + Assertions.assertThat(redoRenameCall.get()) + .describedAs("There should be no redo rename call.") + .isEqualTo(0); + + Assertions.assertThat(fs.exists(new Path(path.getParent(), path.getName() + SUFFIX))) + .describedAs("Directory with suffix -RenamePending.json should exist.") + .isTrue(); + } + } + + /** + * Test case to verify the behavior when the ETag of a file changes during a rename operation. + * + * This test simulates a scenario where the ETag of a file changes after the creation of a + * rename pending JSON file. The steps include: + * - Creating a rename pending JSON file with an old ETag. + * - Deleting the original directory for an ETag change. + * - Creating new files in the directory. + * - Verifying that the copy blob call is not triggered. + * - Verifying that the rename atomicity operation is called once. + * + * The test ensures that the system correctly handles the ETag change during the rename process. + * + * @throws Exception if any error occurs during the test execution + */ + @Test + public void testETagChangedDuringRename() throws Exception { + AzureBlobFileSystem fs = null; + try { + assumeBlobServiceType(); + Path path = new Path("/hbase/A1/A2"); + Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX); + // Create rename pending json file with old etag + fs = createJsonFile(path, renameJson); + AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs); + fs.getAbfsStore().setClient(abfsBlobClient); + + // Delete the directory to change etag + fs.delete(path, true); + + fs.create(new Path(path, "file1.txt")); + fs.create(new Path(path, "file2.txt")); + AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0); + Mockito.doAnswer(copyBlob -> { + numberOfCopyBlobCalls.incrementAndGet(); + return copyBlob.callRealMethod(); + }) + .when(abfsBlobClient) + .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class), + Mockito.nullable(String.class), + Mockito.any(TracingContext.class)); + + AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0); + Mockito.doAnswer(redoRenameAtomicity -> { + numberOfRedoRenameAtomicityCalls.incrementAndGet(); + return redoRenameAtomicity.callRealMethod(); + }) + .when(abfsBlobClient) + .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(), + Mockito.any(TracingContext.class)); + // Call list status to trigger rename redo + fs.listStatus(path.getParent()); + Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get()) + .describedAs("There should be one call to getRedoRenameAtomicity") + .isEqualTo(1); + Assertions.assertThat(numberOfCopyBlobCalls.get()) + .describedAs("There should be no copy blob call") + .isEqualTo(0); + } finally { + if (fs != null) { + fs.close(); + } + } + } + /** * Test to verify the idempotency of the `rename` operation in Azure Blob File System when retrying * after a failure. The test simulates a "path not found" error (HTTP 404) on the first attempt, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java index b134c0e93bd..589ca5415fc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameUnicode.java @@ -24,21 +24,15 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathIOException; -import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; -import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertRenameOutcome; -import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** * Parameterized test of rename operations of unicode paths. @@ -90,17 +84,6 @@ public void testRenameFileUsingUnicode() throws Exception { assertIsFile(fs, filePath); Path folderPath2 = new Path(destDir); - if (getAbfsServiceType() == AbfsServiceType.BLOB - && destDir.contains(COLON)) { - AbfsRestOperationException ex = intercept( - AbfsRestOperationException.class, () -> { - fs.rename(folderPath1, folderPath2); - return null; - }); - assertTrue(ex.getCause() instanceof PathIOException); - assertEquals(HTTP_BAD_REQUEST, ex.getStatusCode()); - return; - } assertRenameOutcome(fs, folderPath1, folderPath2, true); assertPathDoesNotExist(fs, "renamed", folderPath1); assertIsDirectory(fs, folderPath2); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org