This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f79f5bc2616 HADOOP-19522: ABFS: [FnsOverBlob] Rename Recovery Should 
Succeed When Marker File Exists with Destination Directory (#7559)
f79f5bc2616 is described below

commit f79f5bc2616370fa751d9a8f0876ecb3907f909d
Author: Manish Bhatt <52626736+bhattmanis...@users.noreply.github.com>
AuthorDate: Fri Apr 18 02:51:52 2025 -0700

    HADOOP-19522: ABFS: [FnsOverBlob] Rename Recovery Should Succeed When 
Marker File Exists with Destination Directory (#7559)
    
    Contributed by Manish Bhatt
    Reviewed by Anmol Asrani, Manika Joshi, Anuj Modi
    
    Signed off by Anuj Modi<anujm...@apache.org>
---
 .../hadoop/fs/azurebfs/AbfsCountersImpl.java       |    4 +-
 .../apache/hadoop/fs/azurebfs/AbfsStatistic.java   |    4 +-
 .../fs/azurebfs/services/AbfsBlobClient.java       |   21 +-
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |    8 +-
 .../fs/azurebfs/services/BlobRenameHandler.java    |   30 +-
 .../fs/azurebfs/services/RenameAtomicity.java      |    2 +-
 .../azurebfs/ITestAzureBlobFileSystemRename.java   | 1639 ++++++++------------
 .../ITestAzureBlobFileSystemRenameRecovery.java    |  770 +++++++++
 .../fs/azurebfs/services/AbfsClientTestUtil.java   |    2 +-
 .../hadoop/fs/azurebfs/utils/AbfsTestUtils.java    |   37 +
 10 files changed, 1487 insertions(+), 1030 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index fdcbc2275ff..7a941171fa4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableMetric;
 
+import static 
org.apache.hadoop.fs.azurebfs.AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_RECEIVED;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.BYTES_SENT;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
@@ -134,7 +135,8 @@ public class AbfsCountersImpl implements AbfsCounters {
       SERVER_UNAVAILABLE,
       RENAME_RECOVERY,
       METADATA_INCOMPLETE_RENAME_FAILURES,
-      RENAME_PATH_ATTEMPTS
+      RENAME_PATH_ATTEMPTS,
+      ATOMIC_RENAME_PATH_ATTEMPTS
   };
 
   private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
index 3a77e82ffb4..eba46357335 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java
@@ -109,7 +109,9 @@ public enum AbfsStatistic {
       "Number of times rename operation failed due to metadata being "
           + "incomplete"),
   RENAME_PATH_ATTEMPTS("rename_path_attempts",
-      "Number of times we attempt to rename a path internally");
+      "Number of times we attempt to rename a path internally"),
+  ATOMIC_RENAME_PATH_ATTEMPTS("atomic_rename_path_attempts",
+      "Number of times atomic rename attempted");
 
   private String statName;
   private String statDescription;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 229bf75fc3a..dea41ad38bd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -385,8 +385,20 @@ public ListResponseData listPath(final String 
relativePath, final boolean recurs
     if (tracingContext.getOpType() == FSOperationType.LISTSTATUS
         && op.getResult() != null
         && op.getResult().getStatusCode() == HTTP_OK) {
-      retryRenameOnAtomicEntriesInListResults(tracingContext,
+      boolean isRenameRecovered = 
retryRenameOnAtomicEntriesInListResults(tracingContext,
           listResponseData.getRenamePendingJsonPaths());
+      if (isRenameRecovered) {
+        LOG.debug("Retrying list operation after rename recovery.");
+        // Retry the list operation to get the updated list of paths after 
rename recovery.
+        AbfsRestOperation retryListOp = getAbfsRestOperation(
+            AbfsRestOperationType.ListBlobs,
+            HTTP_METHOD_GET,
+            url,
+            requestHeaders);
+        retryListOp.execute(tracingContext);
+        listResponseData = parseListPathResults(retryListOp.getResult(), uri);
+        listResponseData.setOp(retryListOp);
+      }
     }
 
     if (isEmptyListResults(listResponseData) && is404CheckRequired) {
@@ -425,15 +437,16 @@ public ListResponseData listPath(final String 
relativePath, final boolean recurs
    * @param tracingContext tracing context
    * @throws AzureBlobFileSystemException if rest operation or response 
parsing fails.
    */
-  private void retryRenameOnAtomicEntriesInListResults(TracingContext 
tracingContext,
+  private boolean retryRenameOnAtomicEntriesInListResults(TracingContext 
tracingContext,
       Map<Path, Integer> renamePendingJsonPaths) throws 
AzureBlobFileSystemException {
     if (renamePendingJsonPaths == null || renamePendingJsonPaths.isEmpty()) {
-      return;
+      return false;
     }
 
     for (Map.Entry<Path, Integer> entry : renamePendingJsonPaths.entrySet()) {
       retryRenameOnAtomicKeyPath(entry.getKey(), entry.getValue(), 
tracingContext);
     }
+    return true;
   }
 
   /**{@inheritDoc}*/
@@ -813,7 +826,7 @@ public AbfsClientRenameResult renamePath(final String 
source,
         destination, sourceEtag, isAtomicRenameKey(source), tracingContext
     );
     try {
-      if (blobRenameHandler.execute()) {
+      if (blobRenameHandler.execute(false)) {
         final AbfsUriQueryBuilder abfsUriQueryBuilder
             = createDefaultUriQueryBuilder();
         final URL url = createRequestUrl(destination,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index c4bc472441d..c13538b11ed 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -83,6 +83,7 @@
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
@@ -1536,8 +1537,11 @@ public void getMetricCall(TracingContext tracingContext) 
throws IOException {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
 
-    final URL url = createRequestUrl(new URL(abfsMetricUrl), EMPTY_STRING, 
abfsUriQueryBuilder.toString());
-
+    // Construct the URL for the metric call
+    // In case of blob storage, the URL is changed to DFS URL
+    final URL url = UriUtils.changeUrlFromBlobToDfs(
+        createRequestUrl(new URL(abfsMetricUrl),
+            EMPTY_STRING, abfsUriQueryBuilder.toString()));
     final AbfsRestOperation op = getAbfsRestOperation(
             AbfsRestOperationType.GetFileSystemProperties,
             HTTP_METHOD_HEAD,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
index f78228bfcff..1f22d049ece 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java
@@ -31,6 +31,7 @@
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException;
@@ -115,14 +116,15 @@ int getMaxConsumptionParallelism() {
 
   /**
    * Orchestrates the rename operation.
+   * @param isRenameRecovery true if the rename operation is a recovery of a 
previous failed atomic rename operation
    *
    * @return AbfsClientRenameResult containing the result of the rename 
operation
    * @throws AzureBlobFileSystemException if server call fails
    */
-  public boolean execute() throws AzureBlobFileSystemException {
+  public boolean execute(final boolean isRenameRecovery) throws 
AzureBlobFileSystemException {
     PathInformation pathInformation = getPathInformation(src, tracingContext);
     boolean result = false;
-    if (preCheck(src, dst, pathInformation)) {
+    if (preCheck(src, dst, pathInformation, isRenameRecovery)) {
       RenameAtomicity renameAtomicity = null;
       if (pathInformation.getIsDirectory()
           && pathInformation.getIsImplicit()) {
@@ -147,6 +149,8 @@ public boolean execute() throws 
AzureBlobFileSystemException {
            * recovers the lease on a log file, to gain exclusive access to it, 
before
            * it splits it.
            */
+          getAbfsClient().getAbfsCounters()
+              .incrementCounter(AbfsStatistic.ATOMIC_RENAME_PATH_ATTEMPTS, 1);
           if (srcAbfsLease == null) {
             srcAbfsLease = takeLease(src, srcEtag);
           }
@@ -192,6 +196,13 @@ private boolean finalSrcRename() throws 
AzureBlobFileSystemException {
     tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1);
     try {
       return renameInternal(src, dst);
+    } catch(AbfsRestOperationException e) {
+      if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) {
+        // If the destination path already exists, then delete the source path.
+        getAbfsClient().deleteBlobPath(src, null, tracingContext);
+        return true;
+      }
+      throw e;
     } finally {
       tracingContext.setOperatedBlobCount(null);
     }
@@ -249,16 +260,17 @@ private boolean containsColon(Path p) {
    * @param src source path
    * @param dst destination path
    * @param pathInformation object in which path information of the source 
path would be stored
+   * @param isRenameRecovery true if the rename operation is a recovery of a 
previous failed atomic rename operation
    *
    * @return true if the pre-checks pass
    * @throws AzureBlobFileSystemException if server call fails or given paths 
are invalid.
    */
   private boolean preCheck(final Path src, final Path dst,
-      final PathInformation pathInformation)
+      final PathInformation pathInformation, final boolean isRenameRecovery)
       throws AzureBlobFileSystemException {
     validateDestinationIsNotSubDir(src, dst);
     validateSourcePath(pathInformation);
-    validateDestinationPathNotExist(src, dst, pathInformation);
+    validateDestinationPathNotExist(src, dst, pathInformation, 
isRenameRecovery);
     validateDestinationParentExist(src, dst, pathInformation);
 
     return true;
@@ -319,13 +331,13 @@ private void validateSourcePath(final PathInformation 
pathInformation)
    * @param src source path
    * @param dst destination path
    * @param pathInformation object containing the path information of the 
source path
+   * @param isRenameRecovery true if the rename operation is a recovery of a 
previous failed atomic rename operation
    *
    * @throws AbfsRestOperationException if the destination path already exists
    */
   private void validateDestinationPathNotExist(final Path src,
-      final Path dst,
-      final PathInformation pathInformation)
-      throws AzureBlobFileSystemException {
+      final Path dst, final PathInformation pathInformation,
+      final boolean isRenameRecovery) throws AzureBlobFileSystemException {
     /*
      * Destination path name can be same to that of source path name only in 
the
      * case of a directory rename.
@@ -333,8 +345,8 @@ private void validateDestinationPathNotExist(final Path src,
      * In case the directory is being renamed to some other name, the 
destination
      * check would happen on the AzureBlobFileSystem#rename method.
      */
-    if (pathInformation.getIsDirectory() && dst.getName()
-        .equals(src.getName())) {
+    if (!isRenameRecovery && pathInformation.getIsDirectory()
+        && dst.getName().equals(src.getName())) {
       PathInformation dstPathInformation = getPathInformation(
           dst,
           tracingContext);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java
index 42e8f6ed3aa..860d9eb5277 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java
@@ -152,7 +152,7 @@ public void redo() throws AzureBlobFileSystemException {
             abfsClient, srcEtag, true,
             true, tracingContext);
 
-        blobRenameHandler.execute();
+        blobRenameHandler.execute(true);
       }
     } finally {
       deleteRenamePendingJson();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index e2f8b679fc5..408f94d78d8 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -24,10 +24,8 @@
 import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -51,7 +49,6 @@
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
@@ -94,17 +91,15 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
-import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
-import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_RESOURCE_TYPE;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED;
-import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil.mockAddClientTransactionIdToHeader;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_RENAME_RECOVERY;
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
@@ -126,10 +121,6 @@ public class ITestAzureBlobFileSystemRename extends
 
   private static final int TOTAL_FILES = 25;
 
-  private static final int TOTAL_THREADS_IN_POOL = 5;
-
-  private static final int FAILED_CALL = 15;
-
   public ITestAzureBlobFileSystemRename() throws Exception {
     super();
   }
@@ -527,55 +518,6 @@ public void 
testRenamePendingJsonIsRemovedPostSuccessfulRename()
         (int) correctDeletePathCount[0]);
   }
 
-  /**
-   * Spies on the AzureBlobFileSystem's store and client to enable mocking and 
verification
-   * of client interactions in tests. It replaces the actual store and client 
with mocked versions.
-   *
-   * @param fs the AzureBlobFileSystem instance
-   * @return the spied AbfsClient for interaction verification
-   */
-  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
-    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
-    Mockito.doReturn(store).when(fs).getAbfsStore();
-    AbfsClient client = Mockito.spy(store.getClient());
-    Mockito.doReturn(client).when(store).getClient();
-    return client;
-  }
-
-  /**
-   * A helper method to set up the test environment and execute the common 
logic for handling
-   * failed rename operations and recovery in HBase. This method performs the 
necessary setup
-   * (creating directories and files) and then triggers the 
`crashRenameAndRecover` method
-   * with a provided recovery action.
-   *
-   * This method is used by different tests that require different recovery 
actions, such as
-   * performing `listStatus` or checking the existence of a path after a 
failed rename.
-   *
-   * @param fs the AzureBlobFileSystem instance to be used in the test
-   * @param client the AbfsBlobClient instance to be used in the test
-   * @param srcPath the source path for the rename operation
-   * @param failedCopyPath the path that simulates a failed copy during rename
-   * @param recoveryAction the specific recovery action to be performed after 
the rename failure
-   *                       (e.g., listing directory status or checking path 
existence)
-   * @throws Exception if any error occurs during setup or execution of the 
recovery action
-   */
-  private void setupAndTestHBaseFailedRenameRecovery(
-      final AzureBlobFileSystem fs,
-      final AbfsBlobClient client,
-      final String srcPath,
-      final String failedCopyPath,
-      final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryAction)
-      throws Exception {
-    fs.setWorkingDirectory(new Path("/"));
-    fs.mkdirs(new Path(srcPath));
-    fs.mkdirs(new Path(srcPath, "test3"));
-    fs.create(new Path(srcPath + "/test3/file"));
-    fs.create(new Path(failedCopyPath));
-    fs.mkdirs(new Path("hbase/test4/"));
-    fs.create(new Path("hbase/test4/file1"));
-    crashRenameAndRecover(fs, client, srcPath, recoveryAction);
-  }
-
   /**
    * Test for a directory in /hbase directory. To simulate the crash of 
process,
    * test will throw an exception with 403 on a copy of one of the blob.<br>
@@ -622,87 +564,6 @@ public void 
testHBaseHandlingForFailedRenameWithGetFileStatusRecovery()
         });
   }
 
-
-  /**
-   * Simulates a rename failure, performs a recovery action, and verifies that 
the "RenamePendingJson"
-   * file is deleted. It checks that the rename operation is successfully 
completed after recovery.
-   *
-   * @param fs the AzureBlobFileSystem instance
-   * @param client the AbfsBlobClient instance
-   * @param srcPath the source path for the rename operation
-   * @param recoveryCallable the recovery action to perform
-   * @throws Exception if an error occurs during recovery or verification
-   */
-  private void crashRenameAndRecover(final AzureBlobFileSystem fs,
-      AbfsBlobClient client,
-      final String srcPath,
-      final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryCallable)
-      throws Exception {
-    crashRename(fs, client, srcPath);
-    AzureBlobFileSystem fs2 = Mockito.spy(getFileSystem());
-    fs2.setWorkingDirectory(new Path(ROOT_PATH));
-    client = (AbfsBlobClient) addSpyHooksOnClient(fs2);
-    int[] renameJsonDeleteCounter = new int[1];
-    Mockito.doAnswer(answer -> {
-          if ((ROOT_PATH + srcPath + SUFFIX)
-              .equalsIgnoreCase(((Path) 
answer.getArgument(0)).toUri().getPath())) {
-            renameJsonDeleteCounter[0] = 1;
-          }
-          return answer.callRealMethod();
-        })
-        .when(client)
-        .deleteBlobPath(Mockito.any(Path.class), 
Mockito.nullable(String.class),
-            Mockito.any(TracingContext.class));
-    recoveryCallable.apply(fs2);
-    Assertions.assertThat(renameJsonDeleteCounter[0])
-        .describedAs("RenamePendingJson should be deleted")
-        .isEqualTo(1);
-    //List would complete the rename orchestration.
-    assertFalse(fs2.exists(new Path("hbase/test1/test2")));
-    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3")));
-    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3")));
-    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file")));
-    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file")));
-    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1")));
-    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1")));
-  }
-
-  /**
-   * Simulates a rename failure by triggering an `AbfsRestOperationException` 
during the rename process.
-   * It intercepts the exception and ensures that all leases acquired during 
the atomic rename are released.
-   *
-   * @param fs the AzureBlobFileSystem instance used for the rename operation
-   * @param client the AbfsBlobClient instance used for mocking the rename 
failure
-   * @param srcPath the source path for the rename operation
-   * @throws Exception if an error occurs during the simulated failure or 
lease release
-   */
-  private void crashRename(final AzureBlobFileSystem fs,
-      final AbfsBlobClient client,
-      final String srcPath) throws Exception {
-    BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1];
-    AbfsClientTestUtil.mockGetRenameBlobHandler(client,
-        blobRenameHandler -> {
-          blobRenameHandlers[0] = blobRenameHandler;
-          return null;
-        });
-    //Fail rename orchestration on path hbase/test1/test2/test3/file1
-    Mockito.doThrow(new AbfsRestOperationException(HTTP_FORBIDDEN, "", "",
-            new Exception()))
-        .when(client)
-        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
-            Mockito.nullable(String.class),
-            Mockito.any(TracingContext.class));
-    LambdaTestUtils.intercept(AccessDeniedException.class, () -> {
-      fs.rename(new Path(srcPath),
-          new Path("hbase/test4"));
-    });
-    //Release all the leases taken by atomic rename orchestration
-    List<AbfsLease> leases = new 
ArrayList<>(blobRenameHandlers[0].getLeases());
-    for (AbfsLease lease : leases) {
-      lease.free();
-    }
-  }
-
   /**
    * Simulates a scenario where HMaster in Hbase starts up and executes 
listStatus
    * API on the directory that has to be renamed by some other 
executor-machine.
@@ -721,93 +582,6 @@ public void 
testHbaseListStatusBeforeRenamePendingFileAppendedWithIngressOnBlob(
     testAtomicityRedoInvalidFile(fs);
   }
 
-  /**
-   * Tests renaming a directory in AzureBlobFileSystem when the creation of 
the "RenamePendingJson"
-   * file fails on the first attempt. It ensures the renaming operation is 
retried.
-   *
-   * The test verifies that the creation of the "RenamePendingJson" file is 
attempted twice:
-   * once on failure and once on retry.
-   *
-   * @param fs the AzureBlobFileSystem instance for the test
-   * @throws Exception if an error occurs during the test
-   */
-  private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem 
fs)
-      throws Exception {
-    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-    Path src = new Path("hbase/test1/test2");
-    Path dest = new Path("hbase/test4");
-    fs.mkdirs(src);
-    fs.mkdirs(new Path(src, "test3"));
-    final int[] renamePendingJsonWriteCounter = new int[1];
-    /*
-     * Fail the creation of RenamePendingJson file on the first attempt.
-     */
-    Answer renamePendingJsonCreateAns = createAnswer -> {
-      Path path = createAnswer.getArgument(0);
-      Mockito.doAnswer(clientFlushAns -> {
-            if (renamePendingJsonWriteCounter[0]++ == 0) {
-              fs.delete(path, true);
-            }
-            return clientFlushAns.callRealMethod();
-          })
-          .when(client)
-          .flush(Mockito.any(byte[].class), Mockito.anyString(),
-              Mockito.anyBoolean(), Mockito.nullable(String.class),
-              Mockito.nullable(String.class), Mockito.anyString(),
-              Mockito.nullable(ContextEncryptionAdapter.class),
-              Mockito.any(TracingContext.class));
-      return createAnswer.callRealMethod();
-    };
-    RenameAtomicityTestUtils.addCreatePathMock(client,
-        renamePendingJsonCreateAns);
-    fs.rename(src, dest);
-    Assertions.assertThat(renamePendingJsonWriteCounter[0])
-        .describedAs("Creation of RenamePendingJson should be attempted twice")
-        .isEqualTo(2);
-  }
-
-  /**
-   * Tests the behavior of the redo operation when an invalid 
"RenamePendingJson" file exists.
-   * It verifies that the file is deleted and that no copy operation is 
performed.
-   *
-   * The test simulates a scenario where the "RenamePendingJson" file is 
partially written and
-   * ensures that the `redo` method correctly deletes the file and does not 
trigger a copy operation.
-   *
-   * @param fs the AzureBlobFileSystem instance for the test
-   * @throws Exception if an error occurs during the test
-   */
-  private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs)
-      throws Exception {
-    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-    Path path = new Path("/hbase/test1/test2");
-    fs.mkdirs(new Path(path, "test3"));
-    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-    OutputStream os = fs.create(renameJson);
-    os.write("{".getBytes(StandardCharsets.UTF_8));
-    os.close();
-    int[] renameJsonDeleteCounter = new int[1];
-    Mockito.doAnswer(deleteAnswer -> {
-          Path ansPath = deleteAnswer.getArgument(0);
-          if (renameJson.toUri()
-              .getPath()
-              .equalsIgnoreCase(ansPath.toUri().getPath())) {
-            renameJsonDeleteCounter[0]++;
-          }
-          return deleteAnswer.callRealMethod();
-        })
-        .when(client)
-        .deleteBlobPath(Mockito.any(Path.class), 
Mockito.nullable(String.class),
-            Mockito.any(TracingContext.class));
-    new RenameAtomicity(renameJson, 1,
-        getTestTracingContext(fs, true), null, client).redo();
-    Assertions.assertThat(renameJsonDeleteCounter[0])
-        .describedAs("RenamePendingJson should be deleted")
-        .isEqualTo(1);
-    Mockito.verify(client, Mockito.times(0)).copyBlob(Mockito.any(Path.class),
-        Mockito.any(Path.class), Mockito.nullable(String.class),
-        Mockito.any(TracingContext.class));
-  }
-
   /**
    * Test to check the atomicity of rename operation. The rename operation 
should
    * be atomic and should not leave any intermediate state.
@@ -1031,31 +805,6 @@ public void testBlobRenameSrcDirHasNoMarker() throws 
Exception {
     assertTrue(fs.exists(new Path("/test2/test1")));
   }
 
-  /**
-   * Mocks the progress status for a copy blob operation.
-   * This method simulates a copy operation that is pending and not yet 
completed.
-   * It intercepts the `copyBlob` method and modifies its response to return a 
"COPY_STATUS_PENDING"
-   * status for the copy operation.
-   *
-   * @param spiedClient The {@link AbfsBlobClient} instance that is being 
spied on.
-   * @throws AzureBlobFileSystemException if the mock setup fails.
-   */
-  private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient 
spiedClient)
-      throws AzureBlobFileSystemException {
-    Mockito.doAnswer(answer -> {
-          AbfsRestOperation op = Mockito.spy(
-              (AbfsRestOperation) answer.callRealMethod());
-          AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
-          Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
-              HttpHeaderConfigurations.X_MS_COPY_STATUS);
-          Mockito.doReturn(httpOp).when(op).getResult();
-          return op;
-        })
-        .when(spiedClient)
-        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
-            Mockito.nullable(String.class), Mockito.any(TracingContext.class));
-  }
-
   /**
    * Verifies the behavior of a blob copy operation that takes time to 
complete.
    * The test ensures the following:
@@ -1091,54 +840,14 @@ public void testCopyBlobTakeTime() throws Exception {
   }
 
   /**
-   * Mocks the final status of a blob copy operation.
-   * This method ensures that when checking the status of a copy operation in 
progress,
-   * it returns the specified final status (e.g., success, failure, aborted).
-   *
-   * @param spiedClient The mocked Azure Blob client to apply the mock 
behavior.
-   * @param requiredCopyFinalStatus The final status of the copy operation to 
be returned
-   *                                (e.g., COPY_STATUS_FAILED, 
COPY_STATUS_ABORTED).
-   */
-  private void addMockForCopyOperationFinalStatus(final AbfsBlobClient 
spiedClient,
-      final String requiredCopyFinalStatus) {
-    AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient,
-        blobRenameHandler -> {
-          Mockito.doAnswer(onHandleCopyInProgress -> {
-                Path handlePath = onHandleCopyInProgress.getArgument(0);
-                TracingContext tracingContext = 
onHandleCopyInProgress.getArgument(
-                    1);
-                Mockito.doAnswer(onStatusCheck -> {
-                      AbfsRestOperation op = Mockito.spy(
-                          (AbfsRestOperation) onStatusCheck.callRealMethod());
-                      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
-                      Mockito.doReturn(requiredCopyFinalStatus)
-                          .when(httpOp)
-                          .getResponseHeader(
-                              HttpHeaderConfigurations.X_MS_COPY_STATUS);
-                      Mockito.doReturn(httpOp).when(op).getResult();
-                      return op;
-                    })
-                    .when(spiedClient)
-                    .getPathStatus(handlePath.toUri().getPath(),
-                        tracingContext, null, false);
-                return onHandleCopyInProgress.callRealMethod();
-              })
-              .when(blobRenameHandler)
-              .handleCopyInProgress(Mockito.any(Path.class),
-                  Mockito.any(TracingContext.class), 
Mockito.any(String.class));
-          return null;
-        });
-  }
-
-  /**
-   * Verifies the behavior when a blob copy operation takes time and 
eventually fails.
-   * The test ensures the following:
-   * <ul>
-   *   <li>A file is created and a copy operation is initiated.</li>
-   *   <li>The copy operation is mocked to eventually fail.</li>
-   *   <li>The rename operation triggers an exception due to the failed 
copy.</li>
-   *   <li>The test checks that the appropriate 'COPY_FAILED' error code and 
status code are returned.</li>
-   * </ul>
+   * Verifies the behavior when a blob copy operation takes time and 
eventually fails.
+   * The test ensures the following:
+   * <ul>
+   *   <li>A file is created and a copy operation is initiated.</li>
+   *   <li>The copy operation is mocked to eventually fail.</li>
+   *   <li>The rename operation triggers an exception due to the failed 
copy.</li>
+   *   <li>The test checks that the appropriate 'COPY_FAILED' error code and 
status code are returned.</li>
+   * </ul>
    *
    * @throws Exception if an error occurs during the test execution
    */
@@ -1734,65 +1443,6 @@ public void 
testRenameSrcDirDeleteEmitDeletionCountInClientRequestId()
     fs.rename(new Path(dirPathStr), new Path("/dst/"));
   }
 
-  /**
-   * Helper method to configure the AzureBlobFileSystem and rename directories.
-   *
-   * @param currentFs The current AzureBlobFileSystem to use for renaming.
-   * @param producerQueueSize Maximum size of the producer queue.
-   * @param consumerMaxLag Maximum lag allowed for the consumer.
-   * @param maxThread Maximum threads for the rename operation.
-   * @param src The source path of the directory to rename.
-   * @param dst The destination path of the renamed directory.
-   * @throws IOException If an I/O error occurs during the operation.
-   */
-  private void renameDir(AzureBlobFileSystem currentFs, String 
producerQueueSize,
-      String consumerMaxLag, String maxThread, Path src, Path dst)
-      throws IOException {
-    Configuration config = createConfig(producerQueueSize, consumerMaxLag, 
maxThread);
-    try (AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(currentFs.getUri(), config)) {
-      fs.rename(src, dst);
-      validateRename(fs, src, dst, false, true, false);
-    }
-  }
-
-  /**
-   * Helper method to create the configuration for the AzureBlobFileSystem.
-   *
-   * @param producerQueueSize Maximum size of the producer queue.
-   * @param consumerMaxLag Maximum lag allowed for the consumer.
-   * @param maxThread Maximum threads for the rename operation.
-   * @return The configuration object.
-   */
-  private Configuration createConfig(String producerQueueSize, String 
consumerMaxLag, String maxThread) {
-    Configuration config = new Configuration(this.getRawConfiguration());
-    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
-    config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
-    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
-    return config;
-  }
-
-  /**
-   * Helper method to validate that the rename was successful and that the 
destination exists.
-   *
-   * @param fs The AzureBlobFileSystem instance to check the existence on.
-   * @param dst The destination path.
-   * @param src The source path.
-   * @throws IOException If an I/O error occurs during the validation.
-   */
-  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
-      boolean isSrcExist, boolean isDstExist, boolean isJsonExist)
-      throws IOException {
-    Assertions.assertThat(fs.exists(new Path(src.getParent(), src.getName() + 
SUFFIX)))
-        .describedAs("Renamed Pending Json file should exist.")
-        .isEqualTo(isJsonExist);
-    Assertions.assertThat(fs.exists(src))
-        .describedAs("Renamed Source directory should not exist.")
-        .isEqualTo(isSrcExist);
-    Assertions.assertThat(fs.exists(dst))
-        .describedAs("Renamed Destination directory should exist.")
-        .isEqualTo(isDstExist);
-  }
-
   /**
    * Test the renaming of a directory with different parallelism 
configurations.
    */
@@ -1804,7 +1454,7 @@ public void testRenameDirWithDifferentParallelismConfig() 
throws Exception {
       Path dst = new Path("/hbase/A1/A3");
 
       // Create sample files in the source directory
-      createFiles(currentFs, src, TOTAL_FILES);
+     createFiles(currentFs, src, TOTAL_FILES);
 
       // Test renaming with different configurations
       renameDir(currentFs, "10", "5", "2", src, dst);
@@ -1823,138 +1473,6 @@ public void 
testRenameDirWithDifferentParallelismConfig() throws Exception {
     }
   }
 
-  /**
-   * Helper method to create files in the given directory.
-   *
-   * @param fs The AzureBlobFileSystem instance to use for file creation.
-   * @param src The source path (directory).
-   * @param numFiles The number of files to create.
-   * @throws ExecutionException, InterruptedException If an error occurs 
during file creation.
-   */
-  private void createFiles(AzureBlobFileSystem fs, Path src, int numFiles)
-      throws ExecutionException, InterruptedException {
-    ExecutorService executorService = 
Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
-    List<Future> futures = new ArrayList<>();
-    for (int i = 0; i < numFiles; i++) {
-      final int iter = i;
-      Future future = executorService.submit(() ->
-          fs.create(new Path(src, "file" + iter + ".txt")));
-      futures.add(future);
-    }
-    for (Future future : futures) {
-      future.get();
-    }
-    executorService.shutdown();
-  }
-
-  /**
-   * Tests renaming a directory with a failure during the copy operation.
-   * Simulates an error when copying on the 6th call.
-   */
-  @Test
-  public void testRenameCopyFailureInBetween() throws Exception {
-    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
-        createConfig("5", "3", "2")))) {
-      assumeBlobServiceType();
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-      fs.getAbfsStore().setClient(client);
-      Path src = new Path("/hbase/A1/A2");
-      Path dst = new Path("/hbase/A1/A3");
-
-      // Create sample files in the source directory
-      createFiles(fs, src, TOTAL_FILES);
-
-      // Track the number of copy operations
-      AtomicInteger copyCall = new AtomicInteger(0);
-      Mockito.doAnswer(copyRequest -> {
-        if (copyCall.get() == FAILED_CALL) {
-          throw new AbfsRestOperationException(
-              BLOB_ALREADY_EXISTS.getStatusCode(),
-              BLOB_ALREADY_EXISTS.getErrorCode(),
-              BLOB_ALREADY_EXISTS.getErrorMessage(),
-              new Exception());
-        }
-        copyCall.incrementAndGet();
-        return copyRequest.callRealMethod();
-      }).when(client).copyBlob(Mockito.any(Path.class),
-          Mockito.any(Path.class), Mockito.nullable(String.class),
-          Mockito.any(TracingContext.class));
-
-      fs.rename(src, dst);
-      // Validate copy operation count
-      Assertions.assertThat(copyCall.get())
-          .describedAs("Copy operation count should be less than 10.")
-          .isLessThan(TOTAL_FILES);
-
-      // Validate that rename redo operation was triggered
-      copyCall.set(0);
-
-      // Assertions to validate renamed destination and source
-      validateRename(fs, src, dst, false, true, true);
-
-      Assertions.assertThat(copyCall.get())
-          .describedAs("Copy operation count should be greater than 0.")
-          .isGreaterThan(0);
-
-      // Validate final state of destination and source
-      validateRename(fs, src, dst, false, true, false);
-    }
-  }
-
-  /**
-   * Tests renaming a directory with a failure during the delete operation.
-   * Simulates an error on the 6th delete operation and verifies the behavior.
-   */
-  @Test
-  public void testRenameDeleteFailureInBetween() throws Exception {
-    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem(
-        createConfig("5", "3", "2")))) {
-      assumeBlobServiceType();
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-      fs.getAbfsStore().setClient(client);
-      Path src = new Path("/hbase/A1/A2");
-      Path dst = new Path("/hbase/A1/A3");
-
-      // Create sample files in the source directory
-      createFiles(fs, src, TOTAL_FILES);
-
-      // Track the number of delete operations
-      AtomicInteger deleteCall = new AtomicInteger(0);
-      Mockito.doAnswer(deleteRequest -> {
-        if (deleteCall.get() == FAILED_CALL) {
-          throw new AbfsRestOperationException(
-              BLOB_PATH_NOT_FOUND.getStatusCode(),
-              BLOB_PATH_NOT_FOUND.getErrorCode(),
-              BLOB_PATH_NOT_FOUND.getErrorMessage(),
-              new Exception());
-        }
-        deleteCall.incrementAndGet();
-        return deleteRequest.callRealMethod();
-      }).when(client).deleteBlobPath(Mockito.any(Path.class),
-          Mockito.anyString(), Mockito.any(TracingContext.class));
-
-      fs.rename(src, dst);
-
-      // Validate delete operation count
-      Assertions.assertThat(deleteCall.get())
-          .describedAs("Delete operation count should be less than 10.")
-          .isLessThan(TOTAL_FILES);
-
-      // Validate that delete redo operation was triggered
-      deleteCall.set(0);
-      // Assertions to validate renamed destination and source
-      validateRename(fs, src, dst, false, true, true);
-
-      Assertions.assertThat(deleteCall.get())
-          .describedAs("Delete operation count should be greater than 0.")
-          .isGreaterThan(0);
-
-      // Validate final state of destination and source
-      // Validate that delete redo operation was triggered
-      validateRename(fs, src, dst, false, true, false);
-    }
-  }
-
   /**
    * Tests renaming a file or directory when the destination path contains
    * a colon (":"). The test ensures that:
@@ -1980,40 +1498,6 @@ public void testRenameWhenDestinationPathContainsColon() 
throws Exception {
     performRenameAndValidate(fs, src, dst, fileName);
   }
 
-  /**
-   * Performs the rename operation and validates the existence of the 
directories and files.
-   *
-   * @param fs the AzureBlobFileSystem instance
-   * @param src the source path to be renamed
-   * @param dst the destination path for the rename
-   * @param fileName the name of the file to be renamed
-   */
-  private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path 
dst, String fileName)
-      throws IOException {
-    // Assert the source directory exists
-    Assertions.assertThat(fs.exists(src))
-        .describedAs("Old directory should exist before rename")
-        .isTrue();
-
-    // Perform rename
-    fs.rename(src, dst);
-
-    // Assert the destination directory and file exist after rename
-    Assertions.assertThat(fs.exists(new Path(dst, fileName)))
-        .describedAs("Rename should be successful")
-        .isTrue();
-
-    // Assert the source directory no longer exists
-    Assertions.assertThat(fs.exists(src))
-        .describedAs("Old directory should not exist")
-        .isFalse();
-
-    // Assert the new destination directory exists
-    Assertions.assertThat(fs.exists(dst))
-        .describedAs("New directory should exist")
-        .isTrue();
-  }
-
   /**
    * Tests the behavior of the atomic rename key for the root folder
    * in Azure Blob File System. The test verifies that the atomic rename key
@@ -2058,413 +1542,81 @@ public void testGetAtomicRenameKeyForNonRootFolder() 
throws Exception {
   }
 
   /**
-   * Validates the atomic rename key for a specific path.
+   * Test case to verify path status when there is no pending rename JSON file.
    *
-   * @param abfsBlobClient the AbfsBlobClient instance
-   * @param path the path to check for atomic rename key
-   * @param expected the expected value (true or false)
+   * This test ensures that when no rename pending JSON file is present, the 
path status is
+   * successfully retrieved, the ETag is present, and no redo rename operation 
is triggered.
+   *
+   * @throws Exception if any error occurs during the test execution
    */
-  private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String 
path, boolean expected) {
-    Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path))
-        .describedAs("Atomic rename key check for path: " + path)
-        .isEqualTo(expected);
-  }
+  @Test
+  public void testGetPathStatusWithoutPendingJsonFile() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+      assumeBlobServiceType();
 
-  /**
-   * Helper method to create a json file.
-   * @param path parent path
-   * @param renameJson rename json path
-   * @return file system
-   * @throws IOException in case of failure
-   */
-  public AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
-      throws IOException {
-    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
-    assumeBlobServiceType();
-    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
-    Mockito.doReturn(store).when(fs).getAbfsStore();
-    AbfsClient client = Mockito.spy(store.getClient());
-    Mockito.doReturn(client).when(store).getClient();
+      Path path = new Path("/hbase/A1/A2");
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
 
-    fs.setWorkingDirectory(new Path(ROOT_PATH));
-    fs.create(new Path(path, "file.txt"));
+      fs.create(new Path(path, "file1.txt"));
+      fs.create(new Path(path, "file2.txt"));
 
-    VersionedFileStatus fileStatus
-        = (VersionedFileStatus) fs.getFileStatus(path);
+      AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
 
-    new RenameAtomicity(path, new Path("/hbase/test4"),
-        renameJson, getTestTracingContext(fs, true),
-        fileStatus.getEtag(), client)
-        .preRename();
+      AtomicInteger redoRenameCall = new AtomicInteger(0);
+      Mockito.doAnswer(answer -> {
+        redoRenameCall.incrementAndGet();
+        return answer.callRealMethod();
+      }).when(client).getRedoRenameAtomicity(
+          Mockito.any(Path.class), Mockito.anyInt(),
+          Mockito.any(TracingContext.class));
 
-    Assertions.assertThat(fs.exists(renameJson))
-        .describedAs("Rename Pending Json file should exist.")
-        .isTrue();
+      TracingContext tracingContext = new TracingContext(
+          conf.getClientCorrelationId(), fs.getFileSystemId(),
+          FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
+          null);
 
-    return fs;
-  }
+      AbfsHttpOperation abfsHttpOperation = client.getPathStatus(
+          path.toUri().getPath(), true,
+          tracingContext, null).getResult();
 
-  /**
-   * Test case to verify crash recovery with a single child folder.
-   *
-   * This test simulates a scenario where a pending rename JSON file exists 
for a single child folder
-   * under the parent directory. It ensures that when listing the files in the 
parent directory,
-   * only the child folder (with the pending rename JSON file) is returned, 
and no additional files are listed.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
+      Assertions.assertThat(abfsHttpOperation.getStatusCode())
+          .describedAs("Path should be found.")
+          .isEqualTo(HTTP_OK);
 
-      FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+      Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
+          .describedAs("Etag should be present.")
+          .isNotNull();
 
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 1 file")
-          .isEqualTo(1);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
+      Assertions.assertThat(redoRenameCall.get())
+          .describedAs("There should be no redo rename call.")
+          .isEqualTo(0);
     }
   }
 
   /**
-   * Test case to verify crash recovery with multiple child folders.
+   * Test case to verify path status when there is a pending rename JSON 
directory.
    *
-   * This test simulates a scenario where a pending rename JSON file exists, 
and multiple files are
-   * created in the parent directory. It ensures that when listing the files 
in the parent directory,
-   * the correct number of files is returned, including the pending rename 
JSON file.
+   * This test simulates the scenario where a directory is created with a 
rename pending JSON
+   * file (indicated by a specific suffix). It ensures that the path is found, 
the ETag is present,
+   * and no redo rename operation is triggered. It also verifies that the 
rename pending directory
+   * exists.
    *
    * @throws Exception if any error occurs during the test execution
    */
   @Test
-  public void testListCrashRecoveryWithMultipleChildFolder() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
+  public void testGetPathStatusWithPendingJsonDir() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+      assumeBlobServiceType();
+
       Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
 
-      fs.create(new Path("/hbase/A1/file1.txt"));
-      fs.create(new Path("/hbase/A1/file2.txt"));
+      fs.create(new Path(path, "file1.txt"));
+      fs.create(new Path(path, "file2.txt"));
 
-      FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+      fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX));
 
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 3 files")
-          .isEqualTo(3);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * Test case to verify crash recovery with a pending rename JSON file.
-   *
-   * This test simulates a scenario where a pending rename JSON file exists in 
the parent directory,
-   * and it ensures that after the deletion of the target directory and 
creation of new files,
-   * the listing operation correctly returns the remaining files without 
considering the pending rename.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testListCrashRecoveryWithPendingJsonFile() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
-
-      fs.delete(path, true);
-      fs.create(new Path("/hbase/A1/file1.txt"));
-      fs.create(new Path("/hbase/A1/file2.txt"));
-
-      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
-
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 2 files")
-          .isEqualTo(2);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * Test case to verify crash recovery when no pending rename JSON file 
exists.
-   *
-   * This test simulates a scenario where there is no pending rename JSON file 
in the directory.
-   * It ensures that the listing operation correctly returns all files in the 
parent directory, including
-   * those created after the rename JSON file is deleted.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws 
Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
-
-      fs.delete(renameJson, true);
-      fs.create(new Path("/hbase/A1/file1.txt"));
-      fs.create(new Path("/hbase/A1/file2.txt"));
-
-      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
-
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 3 files")
-          .isEqualTo(3);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * Test case to verify crash recovery when a pending rename JSON directory 
exists.
-   *
-   * This test simulates a scenario where a pending rename JSON directory 
exists, ensuring that the
-   * listing operation correctly returns all files in the parent directory 
without triggering a redo
-   * rename operation. It also checks that the directory with the suffix 
"-RenamePending.json" exists.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testListCrashRecoveryWithPendingJsonDir() throws Exception {
-    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
-      assumeBlobServiceType();
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs.mkdirs(renameJson);
-
-      fs.create(new Path(path.getParent(), "file1.txt"));
-      fs.create(new Path(path, "file2.txt"));
-
-      AtomicInteger redoRenameCall = new AtomicInteger(0);
-      Mockito.doAnswer(answer -> {
-        redoRenameCall.incrementAndGet();
-        return answer.callRealMethod();
-      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
-          Mockito.anyInt(), Mockito.any(TracingContext.class));
-
-      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
-
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 3 files")
-          .isEqualTo(3);
-
-      Assertions.assertThat(redoRenameCall.get())
-          .describedAs("No redo rename call should be made")
-          .isEqualTo(0);
-
-      Assertions.assertThat(
-              Arrays.stream(fileStatuses)
-                  .anyMatch(status -> 
renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
-          .describedAs("Directory with suffix -RenamePending.json should 
exist.")
-          .isTrue();
-    }
-  }
-
-  /**
-   * Test case to verify crash recovery during listing with multiple pending 
rename JSON files.
-   *
-   * This test simulates a scenario where multiple pending rename JSON files 
exist, ensuring that
-   * crash recovery properly handles the situation. It verifies that two redo 
rename calls are made
-   * and that the list operation returns the correct number of paths.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testListCrashRecoveryWithMultipleJsonFile() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      Path path = new Path("/hbase/A1/A2");
-
-      // 1st Json file
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-
-      // 2nd Json file
-      Path path2 = new Path("/hbase/A1/A3");
-      fs.create(new Path(path2, "file3.txt"));
-
-      Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
-      VersionedFileStatus fileStatus
-          = (VersionedFileStatus) fs.getFileStatus(path2);
-
-      new RenameAtomicity(path2, new Path("/hbase/test4"),
-          renameJson2, getTestTracingContext(fs, true),
-          fileStatus.getEtag(), client).preRename();
-
-      fs.create(new Path(path, "file2.txt"));
-
-      AtomicInteger redoRenameCall = new AtomicInteger(0);
-      Mockito.doAnswer(answer -> {
-        redoRenameCall.incrementAndGet();
-        return answer.callRealMethod();
-      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
-          Mockito.anyInt(), Mockito.any(TracingContext.class));
-
-      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
-
-      Assertions.assertThat(fileStatuses.length)
-          .describedAs("List should return 2 paths")
-          .isEqualTo(2);
-
-      Assertions.assertThat(redoRenameCall.get())
-          .describedAs("2 redo rename calls should be made")
-          .isEqualTo(2);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * Test case to verify path status when a pending rename JSON file exists.
-   *
-   * This test simulates a scenario where a rename operation was pending, and 
ensures that
-   * the path status retrieval triggers a redo rename operation. The test also 
checks that
-   * the correct error code (`PATH_NOT_FOUND`) is returned.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testGetPathStatusWithPendingJsonFile() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      fs = createJsonFile(path, renameJson);
-
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-
-      fs.create(new Path("/hbase/A1/file1.txt"));
-      fs.create(new Path("/hbase/A1/file2.txt"));
-
-      AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
-
-      AtomicInteger redoRenameCall = new AtomicInteger(0);
-      Mockito.doAnswer(answer -> {
-        redoRenameCall.incrementAndGet();
-        return answer.callRealMethod();
-      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
-          Mockito.anyInt(), Mockito.any(TracingContext.class));
-
-      TracingContext tracingContext = new TracingContext(
-          conf.getClientCorrelationId(), fs.getFileSystemId(),
-          FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, 
null);
-
-      AzureServiceErrorCode azureServiceErrorCode = intercept(
-          AbfsRestOperationException.class, () -> client.getPathStatus(
-              path.toUri().getPath(), true,
-              tracingContext, null)).getErrorCode();
-
-      Assertions.assertThat(azureServiceErrorCode.getErrorCode())
-          .describedAs("Path had to be recovered from atomic rename 
operation.")
-          .isEqualTo(PATH_NOT_FOUND.getErrorCode());
-
-      Assertions.assertThat(redoRenameCall.get())
-          .describedAs("There should be one redo rename call")
-          .isEqualTo(1);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
-  /**
-   * Test case to verify path status when there is no pending rename JSON file.
-   *
-   * This test ensures that when no rename pending JSON file is present, the 
path status is
-   * successfully retrieved, the ETag is present, and no redo rename operation 
is triggered.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testGetPathStatusWithoutPendingJsonFile() throws Exception {
-    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
-      assumeBlobServiceType();
-
-      Path path = new Path("/hbase/A1/A2");
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-
-      fs.create(new Path(path, "file1.txt"));
-      fs.create(new Path(path, "file2.txt"));
-
-      AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
-
-      AtomicInteger redoRenameCall = new AtomicInteger(0);
-      Mockito.doAnswer(answer -> {
-        redoRenameCall.incrementAndGet();
-        return answer.callRealMethod();
-      }).when(client).getRedoRenameAtomicity(
-          Mockito.any(Path.class), Mockito.anyInt(),
-          Mockito.any(TracingContext.class));
-
-      TracingContext tracingContext = new TracingContext(
-          conf.getClientCorrelationId(), fs.getFileSystemId(),
-          FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT,
-          null);
-
-      AbfsHttpOperation abfsHttpOperation = client.getPathStatus(
-          path.toUri().getPath(), true,
-          tracingContext, null).getResult();
-
-      Assertions.assertThat(abfsHttpOperation.getStatusCode())
-          .describedAs("Path should be found.")
-          .isEqualTo(HTTP_OK);
-
-      Assertions.assertThat(extractEtagHeader(abfsHttpOperation))
-          .describedAs("Etag should be present.")
-          .isNotNull();
-
-      Assertions.assertThat(redoRenameCall.get())
-          .describedAs("There should be no redo rename call.")
-          .isEqualTo(0);
-    }
-  }
-
-  /**
-   * Test case to verify path status when there is a pending rename JSON 
directory.
-   *
-   * This test simulates the scenario where a directory is created with a 
rename pending JSON
-   * file (indicated by a specific suffix). It ensures that the path is found, 
the ETag is present,
-   * and no redo rename operation is triggered. It also verifies that the 
rename pending directory
-   * exists.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testGetPathStatusWithPendingJsonDir() throws Exception {
-    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
-      assumeBlobServiceType();
-
-      Path path = new Path("/hbase/A1/A2");
-      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
-
-      fs.create(new Path(path, "file1.txt"));
-      fs.create(new Path(path, "file2.txt"));
-
-      fs.mkdirs(new Path(path.getParent(), path.getName() + SUFFIX));
-
-      AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+      AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
 
       AtomicInteger redoRenameCall = new AtomicInteger(0);
       Mockito.doAnswer(answer -> {
@@ -2499,71 +1651,6 @@ public void testGetPathStatusWithPendingJsonDir() throws 
Exception {
     }
   }
 
-  /**
-   * Test case to verify the behavior when the ETag of a file changes during a 
rename operation.
-   *
-   * This test simulates a scenario where the ETag of a file changes after the 
creation of a
-   * rename pending JSON file. The steps include:
-   * - Creating a rename pending JSON file with an old ETag.
-   * - Deleting the original directory for an ETag change.
-   * - Creating new files in the directory.
-   * - Verifying that the copy blob call is not triggered.
-   * - Verifying that the rename atomicity operation is called once.
-   *
-   * The test ensures that the system correctly handles the ETag change during 
the rename process.
-   *
-   * @throws Exception if any error occurs during the test execution
-   */
-  @Test
-  public void testETagChangedDuringRename() throws Exception {
-    AzureBlobFileSystem fs = null;
-    try {
-      assumeBlobServiceType();
-      Path path = new Path("/hbase/A1/A2");
-      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
-      // Create rename pending json file with old etag
-      fs = createJsonFile(path, renameJson);
-      AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
-      fs.getAbfsStore().setClient(abfsBlobClient);
-
-      // Delete the directory to change etag
-      fs.delete(path, true);
-
-      fs.create(new Path(path, "file1.txt"));
-      fs.create(new Path(path, "file2.txt"));
-      AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0);
-      Mockito.doAnswer(copyBlob -> {
-            numberOfCopyBlobCalls.incrementAndGet();
-            return copyBlob.callRealMethod();
-          })
-          .when(abfsBlobClient)
-          .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
-              Mockito.nullable(String.class),
-              Mockito.any(TracingContext.class));
-
-      AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0);
-      Mockito.doAnswer(redoRenameAtomicity -> {
-            numberOfRedoRenameAtomicityCalls.incrementAndGet();
-            return redoRenameAtomicity.callRealMethod();
-          })
-          .when(abfsBlobClient)
-          .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(),
-              Mockito.any(TracingContext.class));
-      // Call list status to trigger rename redo
-      fs.listStatus(path.getParent());
-      Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get())
-          .describedAs("There should be one call to getRedoRenameAtomicity")
-          .isEqualTo(1);
-      Assertions.assertThat(numberOfCopyBlobCalls.get())
-          .describedAs("There should be no copy blob call")
-          .isEqualTo(0);
-    } finally {
-      if (fs != null) {
-        fs.close();
-      }
-    }
-  }
-
   /**
    * Test to verify the idempotency of the `rename` operation in Azure Blob 
File System when retrying
    * after a failure. The test simulates a "path not found" error (HTTP 404) 
on the first attempt,
@@ -2588,7 +1675,7 @@ public void testRenamePathRetryIdempotency() throws 
Exception {
       Path destFilePath = new Path(sourceDir, "file2");
 
       final List<AbfsHttpHeader> headers = new ArrayList<>();
-      mockRetriedRequest(abfsClient, headers);
+      mockRetriedRequest(abfsClient, headers, 0);
 
       AbfsRestOperation getPathRestOp = Mockito.mock(AbfsRestOperation.class);
       AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
@@ -2612,7 +1699,9 @@ public void testRenamePathRetryIdempotency() throws 
Exception {
           Mockito.nullable(String.class), Mockito.nullable(Boolean.class),
           Mockito.nullable(TracingContext.class),
           Mockito.nullable(ContextEncryptionAdapter.class));
-      fs.rename(sourceFilePath, destFilePath);
+      Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
+          .describedAs("Rename should succeed.")
+          .isTrue();
     }
   }
 
@@ -2672,7 +1761,7 @@ public void 
testFailureInGetPathStatusDuringRenameRecovery() throws Exception {
       fs.getAbfsStore().setClient(abfsDfsClient);
       final String[] clientTransactionId = new String[1];
       mockAddClientTransactionIdToHeader(abfsDfsClient, clientTransactionId);
-      mockRetriedRequest(abfsDfsClient, new ArrayList<>());
+      mockRetriedRequest(abfsDfsClient, new ArrayList<>(), 1);
       int[] flag = new int[1];
       Mockito.doAnswer(getPathStatus -> {
         if (flag[0] == 1) {
@@ -2703,41 +1792,569 @@ public void 
testFailureInGetPathStatusDuringRenameRecovery() throws Exception {
   }
 
   /**
-   * Mocks the retry behavior for an AbfsDfsClient request. The method 
intercepts
-   * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the
-   * first invocation. It creates a mock HTTP operation with a PUT method and
-   * specific status codes and error messages.
+   * Tests renaming a directory when the destination parent directory does not 
exist.
+   * The test verifies that the rename operation fails as expected.
    *
-   * @param abfsDfsClient The AbfsDfsClient to mock operations for.
-   * @param headers The list of HTTP headers to which request headers will be 
added.
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testRenameWithDestParentNotExist() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      Path src = new Path("/A1/A2");
+      Path dst = new Path("/A3/A4");
+      fs.create(new Path(src, "file.txt"));
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should fail as destination parent not exist.")
+          .isFalse();
+    }
+  }
+
+  /**
+   * Tests renaming a directory when the destination parent directory is the 
root.
+   * The test verifies that the rename operation succeeds as expected.
    *
-   * @throws Exception If an error occurs during mock creation or operation 
execution.
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testRenameWithDestParentAsRoot() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      Path src = new Path("/A1/A2");
+      Path dst = new Path("/A3");
+      fs.create(new Path(src, "file.txt"));
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path(dst, "file.txt")))
+          .describedAs("File should exist in destination directory.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Tests renaming a file when the destination is root.
+   * The test verifies that the rename operation succeeds as expected.
+   *
+   * @throws Exception if an error occurs during the test execution
    */
-   private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
-       final List<AbfsHttpHeader> headers) throws Exception {
-     TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
-         new MockIntercept<AbfsRestOperation>() {
-           private int count = 0;
-
-           @Override
-           public void answer(final AbfsRestOperation mockedObj,
-               final InvocationOnMock answer)
-               throws AbfsRestOperationException {
-             if (count == 0) {
-               count = 1;
-               AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
-               Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
-               
Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
-               Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
-                   .getStorageErrorCode();
-               Mockito.doReturn(true).when(mockedObj).hasResult();
-               Mockito.doReturn(op).when(mockedObj).getResult();
-               Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
-               headers.addAll(mockedObj.getRequestHeaders());
-               throw new AbfsRestOperationException(HTTP_NOT_FOUND,
-                   SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, 
op);
-             }
-           }
-         }, 1);
-   }
+  @Test
+  public void testFileRenameWithDestAsRoot() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      Path src = new Path("/A1/A2/file.txt");
+      Path dst = new Path("/");
+      fs.create(src);
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path(dst, "file.txt")))
+          .describedAs("File should exist in root.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Tests renaming a directory when the destination is root.
+   * The test verifies that the rename operation succeeds as expected.
+   *
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testDirRenameWithDestAsRoot() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      Path src = new Path("/A1/A2");
+      Path dst = new Path("/");
+      fs.create(new Path(src, "file.txt"));
+
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path(dst, src.getName())))
+          .describedAs("A2 directory should exist in root.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Tests the rename operation with multiple directories in the source path.
+   * This test verifies that the rename operation correctly handles
+   * multiple directories and files, ensuring that the source directory
+   * is renamed to the destination path.
+   *
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testRenameWithMultipleDirsInSource() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      assumeBlobServiceType();
+      fs.mkdirs(new Path("/testDir/dir1"));
+      for (int i = 0; i < 10; i++) {
+        fs.create(new Path("/testDir/dir1/file" + i));
+      }
+      fs.mkdirs(new Path("/testDir/dir2"));
+      fs.create(new Path("/testDir/dir2/file2"));
+      createAzCopyFolder(new Path("/testDir/dir3"));
+      Assertions.assertThat(fs.rename(new Path("/testDir"),
+              new Path("/testDir2")))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path("/testDir")))
+          .describedAs("Old directory should not exist.")
+          .isFalse();
+      Assertions.assertThat(fs.exists(new Path("/testDir2")))
+          .describedAs("New directory should exist.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Tests the rename operation with multiple implicit directories in the 
source path.
+   * This test verifies that the rename operation correctly handles
+   * multiple directories and files, ensuring that the source directory
+   * is renamed to the destination path.
+   *
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testRenameWithMultipleImplicitDirsInSource() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      assumeBlobServiceType();
+      createAzCopyFolder(new Path("/testDir/dir1"));
+      for (int i = 0; i < 10; i++) {
+        createAzCopyFile(new Path("/testDir/dir1/file" + i));
+      }
+      createAzCopyFolder(new Path("/testDir/dir2"));
+      createAzCopyFile(new Path("/testDir/dir2/file2"));
+      createAzCopyFolder(new Path("/testDir/dir3"));
+      Assertions.assertThat(fs.rename(new Path("/testDir"),
+              new Path("/testDir2")))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path("/testDir")))
+          .describedAs("Old directory should not exist.")
+          .isFalse();
+      Assertions.assertThat(fs.exists(new Path("/testDir2")))
+          .describedAs("New directory should exist.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Tests renaming a directory with an explicit directory in the source path.
+   * This test verifies that the rename operation correctly handles
+   * the explicit directory and files, ensuring that the source directory
+   * is renamed to the destination path.
+   *
+   * @throws Exception if an error occurs during the test execution
+   */
+  @Test
+  public void testRenameWithExplicitDirInSource() throws Exception {
+    try (AzureBlobFileSystem fs = this.getFileSystem()) {
+      assumeBlobServiceType();
+      fs.create(new Path("/testDir/dir3/file2"));
+      fs.create(new Path("/testDir/dir3/file1"));
+      Assertions.assertThat(fs.rename(new Path("/testDir"),
+              new Path("/testDir2")))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+      Assertions.assertThat(fs.exists(new Path("/testDir")))
+          .describedAs("Old directory should not exist.")
+          .isFalse();
+      Assertions.assertThat(fs.exists(new Path("/testDir2")))
+          .describedAs("New directory should exist.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and 
verification
+   * of client interactions in tests. It replaces the actual store and client 
with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Simulates a rename failure, performs a recovery action, and verifies that 
the "RenamePendingJson"
+   * file is deleted. It checks that the rename operation is successfully 
completed after recovery.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @param client the AbfsBlobClient instance
+   * @param srcPath the source path for the rename operation
+   * @param recoveryCallable the recovery action to perform
+   * @throws Exception if an error occurs during recovery or verification
+   */
+  private void crashRenameAndRecover(final AzureBlobFileSystem fs,
+      AbfsBlobClient client,
+      final String srcPath,
+      final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryCallable)
+      throws Exception {
+    crashRename(fs, client, srcPath);
+    AzureBlobFileSystem fs2 = Mockito.spy(getFileSystem());
+    fs2.setWorkingDirectory(new Path(ROOT_PATH));
+    client = (AbfsBlobClient) addSpyHooksOnClient(fs2);
+    int[] renameJsonDeleteCounter = new int[1];
+    Mockito.doAnswer(answer -> {
+          if ((ROOT_PATH + srcPath + SUFFIX)
+              .equalsIgnoreCase(((Path) 
answer.getArgument(0)).toUri().getPath())) {
+            renameJsonDeleteCounter[0] = 1;
+          }
+          return answer.callRealMethod();
+        })
+        .when(client)
+        .deleteBlobPath(Mockito.any(Path.class), 
Mockito.nullable(String.class),
+            Mockito.any(TracingContext.class));
+    recoveryCallable.apply(fs2);
+    Assertions.assertThat(renameJsonDeleteCounter[0])
+        .describedAs("RenamePendingJson should be deleted")
+        .isEqualTo(1);
+    //List would complete the rename orchestration.
+    assertFalse(fs2.exists(new Path("hbase/test1/test2")));
+    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3")));
+    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3")));
+    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file")));
+    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file")));
+    assertFalse(fs2.exists(new Path("hbase/test1/test2/test3/file1")));
+    assertTrue(fs2.exists(new Path("hbase/test4/test2/test3/file1")));
+  }
+
+  /**
+   * Simulates a rename failure by triggering an `AbfsRestOperationException` 
during the rename process.
+   * It intercepts the exception and ensures that all leases acquired during 
the atomic rename are released.
+   *
+   * @param fs the AzureBlobFileSystem instance used for the rename operation
+   * @param client the AbfsBlobClient instance used for mocking the rename 
failure
+   * @param srcPath the source path for the rename operation
+   * @throws Exception if an error occurs during the simulated failure or 
lease release
+   */
+  private void crashRename(final AzureBlobFileSystem fs,
+      final AbfsBlobClient client,
+      final String srcPath) throws Exception {
+    BlobRenameHandler[] blobRenameHandlers = new BlobRenameHandler[1];
+    AbfsClientTestUtil.mockGetRenameBlobHandler(client,
+        blobRenameHandler -> {
+          blobRenameHandlers[0] = blobRenameHandler;
+          return null;
+        });
+    //Fail rename orchestration on path hbase/test1/test2/test3/file1
+    Mockito.doThrow(new AbfsRestOperationException(HTTP_FORBIDDEN, "", "",
+            new Exception()))
+        .when(client)
+        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+            Mockito.nullable(String.class),
+            Mockito.any(TracingContext.class));
+    LambdaTestUtils.intercept(AccessDeniedException.class, () -> {
+      fs.rename(new Path(srcPath),
+          new Path("hbase/test4"));
+    });
+    //Release all the leases taken by atomic rename orchestration
+    List<AbfsLease> leases = new 
ArrayList<>(blobRenameHandlers[0].getLeases());
+    for (AbfsLease lease : leases) {
+      lease.free();
+    }
+  }
+
+  /**
+   * A helper method to set up the test environment and execute the common 
logic for handling
+   * failed rename operations and recovery in HBase. This method performs the 
necessary setup
+   * (creating directories and files) and then triggers the 
`crashRenameAndRecover` method
+   * with a provided recovery action.
+   * This method is used by different tests that require different recovery 
actions, such as
+   * performing `listStatus` or checking the existence of a path after a 
failed rename.
+   *
+   * @param fs the AzureBlobFileSystem instance to be used in the test
+   * @param client the AbfsBlobClient instance to be used in the test
+   * @param srcPath the source path for the rename operation
+   * @param failedCopyPath the path that simulates a failed copy during rename
+   * @param recoveryAction the specific recovery action to be performed after 
the rename failure
+   *                       (e.g., listing directory status or checking path 
existence)
+   * @throws Exception if any error occurs during setup or execution of the 
recovery action
+   */
+  private void setupAndTestHBaseFailedRenameRecovery(
+      final AzureBlobFileSystem fs,
+      final AbfsBlobClient client,
+      final String srcPath,
+      final String failedCopyPath,
+      final FunctionRaisingIOE<AzureBlobFileSystem, Void> recoveryAction)
+      throws Exception {
+    fs.setWorkingDirectory(new Path("/"));
+    fs.mkdirs(new Path(srcPath));
+    fs.mkdirs(new Path(srcPath, "test3"));
+    fs.create(new Path(srcPath + "/test3/file"));
+    fs.create(new Path(failedCopyPath));
+    fs.mkdirs(new Path("hbase/test4/"));
+    fs.create(new Path("hbase/test4/file1"));
+    crashRenameAndRecover(fs, client, srcPath, recoveryAction);
+  }
+
+  /**
+   * Tests renaming a directory in AzureBlobFileSystem when the creation of 
the "RenamePendingJson"
+   * file fails on the first attempt. It ensures the renaming operation is 
retried.
+   * The test verifies that the creation of the "RenamePendingJson" file is 
attempted twice:
+   * once on failure and once on retry.
+   *
+   * @param fs the AzureBlobFileSystem instance for the test
+   * @throws Exception if an error occurs during the test
+   */
+  private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem 
fs)
+      throws Exception {
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+    Path src = new Path("hbase/test1/test2");
+    Path dest = new Path("hbase/test4");
+    fs.mkdirs(src);
+    fs.mkdirs(new Path(src, "test3"));
+    final int[] renamePendingJsonWriteCounter = new int[1];
+    /*
+     * Fail the creation of RenamePendingJson file on the first attempt.
+     */
+    Answer renamePendingJsonCreateAns = createAnswer -> {
+      Path path = createAnswer.getArgument(0);
+      Mockito.doAnswer(clientFlushAns -> {
+            if (renamePendingJsonWriteCounter[0]++ == 0) {
+              fs.delete(path, true);
+            }
+            return clientFlushAns.callRealMethod();
+          })
+          .when(client)
+          .flush(Mockito.any(byte[].class), Mockito.anyString(),
+              Mockito.anyBoolean(), Mockito.nullable(String.class),
+              Mockito.nullable(String.class), Mockito.anyString(),
+              Mockito.nullable(ContextEncryptionAdapter.class),
+              Mockito.any(TracingContext.class));
+      return createAnswer.callRealMethod();
+    };
+    RenameAtomicityTestUtils.addCreatePathMock(client,
+        renamePendingJsonCreateAns);
+    fs.rename(src, dest);
+    Assertions.assertThat(renamePendingJsonWriteCounter[0])
+        .describedAs("Creation of RenamePendingJson should be attempted twice")
+        .isEqualTo(2);
+  }
+
+  /**
+   * Tests the behavior of the redo operation when an invalid 
"RenamePendingJson" file exists.
+   * It verifies that the file is deleted and that no copy operation is 
performed.
+   * The test simulates a scenario where the "RenamePendingJson" file is 
partially written and
+   * ensures that the `redo` method correctly deletes the file and does not 
trigger a copy operation.
+   *
+   * @param fs the AzureBlobFileSystem instance for the test
+   * @throws Exception if an error occurs during the test
+   */
+  private void testAtomicityRedoInvalidFile(final AzureBlobFileSystem fs)
+      throws Exception {
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+    Path path = new Path("/hbase/test1/test2");
+    fs.mkdirs(new Path(path, "test3"));
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    OutputStream os = fs.create(renameJson);
+    os.write("{".getBytes(StandardCharsets.UTF_8));
+    os.close();
+    int[] renameJsonDeleteCounter = new int[1];
+    Mockito.doAnswer(deleteAnswer -> {
+          Path ansPath = deleteAnswer.getArgument(0);
+          if (renameJson.toUri()
+              .getPath()
+              .equalsIgnoreCase(ansPath.toUri().getPath())) {
+            renameJsonDeleteCounter[0]++;
+          }
+          return deleteAnswer.callRealMethod();
+        })
+        .when(client)
+        .deleteBlobPath(Mockito.any(Path.class), 
Mockito.nullable(String.class),
+            Mockito.any(TracingContext.class));
+    new RenameAtomicity(renameJson, 1,
+        getTestTracingContext(fs, true), null, client).redo();
+    Assertions.assertThat(renameJsonDeleteCounter[0])
+        .describedAs("RenamePendingJson should be deleted")
+        .isEqualTo(1);
+    Mockito.verify(client, Mockito.times(0)).copyBlob(Mockito.any(Path.class),
+        Mockito.any(Path.class), Mockito.nullable(String.class),
+        Mockito.any(TracingContext.class));
+  }
+
+  /**
+   * Mocks the progress status for a copy blob operation.
+   * This method simulates a copy operation that is pending and not yet 
completed.
+   * It intercepts the `copyBlob` method and modifies its response to return a 
"COPY_STATUS_PENDING"
+   * status for the copy operation.
+   *
+   * @param spiedClient The {@link AbfsBlobClient} instance that is being 
spied on.
+   * @throws AzureBlobFileSystemException if the mock setup fails.
+   */
+  private void addMockForProgressStatusOnCopyOperation(final AbfsBlobClient 
spiedClient)
+      throws AzureBlobFileSystemException {
+    Mockito.doAnswer(answer -> {
+          AbfsRestOperation op = Mockito.spy(
+              (AbfsRestOperation) answer.callRealMethod());
+          AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+          Mockito.doReturn(COPY_STATUS_PENDING).when(httpOp).getResponseHeader(
+              HttpHeaderConfigurations.X_MS_COPY_STATUS);
+          Mockito.doReturn(httpOp).when(op).getResult();
+          return op;
+        })
+        .when(spiedClient)
+        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+            Mockito.nullable(String.class), Mockito.any(TracingContext.class));
+  }
+
+  /**
+   * Mocks the final status of a blob copy operation.
+   * This method ensures that when checking the status of a copy operation in 
progress,
+   * it returns the specified final status (e.g., success, failure, aborted).
+   *
+   * @param spiedClient The mocked Azure Blob client to apply the mock 
behavior.
+   * @param requiredCopyFinalStatus The final status of the copy operation to 
be returned
+   *                                (e.g., COPY_STATUS_FAILED, 
COPY_STATUS_ABORTED).
+   */
+  private void addMockForCopyOperationFinalStatus(final AbfsBlobClient 
spiedClient,
+      final String requiredCopyFinalStatus) {
+    AbfsClientTestUtil.mockGetRenameBlobHandler(spiedClient,
+        blobRenameHandler -> {
+          Mockito.doAnswer(onHandleCopyInProgress -> {
+                Path handlePath = onHandleCopyInProgress.getArgument(0);
+                TracingContext tracingContext = 
onHandleCopyInProgress.getArgument(
+                    1);
+                Mockito.doAnswer(onStatusCheck -> {
+                      AbfsRestOperation op = Mockito.spy(
+                          (AbfsRestOperation) onStatusCheck.callRealMethod());
+                      AbfsHttpOperation httpOp = Mockito.spy(op.getResult());
+                      Mockito.doReturn(requiredCopyFinalStatus)
+                          .when(httpOp)
+                          .getResponseHeader(
+                              HttpHeaderConfigurations.X_MS_COPY_STATUS);
+                      Mockito.doReturn(httpOp).when(op).getResult();
+                      return op;
+                    })
+                    .when(spiedClient)
+                    .getPathStatus(handlePath.toUri().getPath(),
+                        tracingContext, null, false);
+                return onHandleCopyInProgress.callRealMethod();
+              })
+              .when(blobRenameHandler)
+              .handleCopyInProgress(Mockito.any(Path.class),
+                  Mockito.any(TracingContext.class), 
Mockito.any(String.class));
+          return null;
+        });
+  }
+
+  /**
+   * Helper method to configure the AzureBlobFileSystem and rename directories.
+   *
+   * @param currentFs The current AzureBlobFileSystem to use for renaming.
+   * @param producerQueueSize Maximum size of the producer queue.
+   * @param consumerMaxLag Maximum lag allowed for the consumer.
+   * @param maxThread Maximum threads for the rename operation.
+   * @param src The source path of the directory to rename.
+   * @param dst The destination path of the renamed directory.
+   * @throws IOException If an I/O error occurs during the operation.
+   */
+  private void renameDir(AzureBlobFileSystem currentFs, String 
producerQueueSize,
+      String consumerMaxLag, String maxThread, Path src, Path dst)
+      throws Exception {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, producerQueueSize);
+    config.set(FS_AZURE_CONSUMER_MAX_LAG, consumerMaxLag);
+    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, maxThread);
+    try (AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(currentFs.getUri(), config)) {
+      Assertions.assertThat(fs.rename(src, dst))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Performs the rename operation and validates the existence of the 
directories and files.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @param src the source path to be renamed
+   * @param dst the destination path for the rename
+   * @param fileName the name of the file to be renamed
+   */
+  private void performRenameAndValidate(AzureBlobFileSystem fs, Path src, Path 
dst, String fileName)
+      throws IOException {
+    // Assert the source directory exists
+    Assertions.assertThat(fs.exists(src))
+        .describedAs("Old directory should exist before rename")
+        .isTrue();
+
+    // Perform rename
+    Assertions.assertThat(fs.rename(src, dst))
+        .describedAs("Rename should succeed.")
+        .isTrue();
+
+    // Assert the destination directory and file exist after rename
+    Assertions.assertThat(fs.exists(new Path(dst, fileName)))
+        .describedAs("Rename should be successful")
+        .isTrue();
+
+    // Assert the source directory no longer exists
+    Assertions.assertThat(fs.exists(src))
+        .describedAs("Old directory should not exist")
+        .isFalse();
+
+    // Assert the new destination directory exists
+    Assertions.assertThat(fs.exists(dst))
+        .describedAs("New directory should exist")
+        .isTrue();
+  }
+
+  /**
+   * Validates the atomic rename key for a specific path.
+   *
+   * @param abfsBlobClient the AbfsBlobClient instance
+   * @param path the path to check for atomic rename key
+   * @param expected the expected value (true or false)
+   */
+  private void validateAtomicRenameKey(AbfsBlobClient abfsBlobClient, String 
path, boolean expected) {
+    Assertions.assertThat(abfsBlobClient.isAtomicRenameKey(path))
+        .describedAs("Atomic rename key check for path: " + path)
+        .isEqualTo(expected);
+  }
+
+  /**
+   * Mocks the retry behavior for an AbfsDfsClient request. The method 
intercepts
+   * the Abfs operation and simulates an HTTP conflict (HTTP 404) error on the
+   * first invocation. It creates a mock HTTP operation with a PUT method and
+   * specific status codes and error messages.
+   *
+   * @param abfsDfsClient The AbfsDfsClient to mock operations for.
+   * @param headers The list of HTTP headers to which request headers will be 
added.
+   *
+   * @throws Exception If an error occurs during mock creation or operation 
execution.
+   */
+  private void mockRetriedRequest(AbfsDfsClient abfsDfsClient,
+      final List<AbfsHttpHeader> headers, int failedCall) throws Exception {
+    TestAbfsClient.mockAbfsOperationCreation(abfsDfsClient,
+        new MockIntercept<AbfsRestOperation>() {
+          private int count = 0;
+
+          @Override
+          public void answer(final AbfsRestOperation mockedObj,
+              final InvocationOnMock answer)
+              throws AbfsRestOperationException {
+            if (count == 0) {
+              count = 1;
+              AbfsHttpOperation op = Mockito.mock(AbfsHttpOperation.class);
+              Mockito.doReturn(HTTP_METHOD_PUT).when(op).getMethod();
+              Mockito.doReturn(EMPTY_STRING).when(op).getStorageErrorMessage();
+              Mockito.doReturn(SOURCE_PATH_NOT_FOUND.getErrorCode()).when(op)
+                  .getStorageErrorCode();
+              Mockito.doReturn(true).when(mockedObj).hasResult();
+              Mockito.doReturn(op).when(mockedObj).getResult();
+              Mockito.doReturn(HTTP_NOT_FOUND).when(op).getStatusCode();
+              headers.addAll(mockedObj.getRequestHeaders());
+              throw new AbfsRestOperationException(HTTP_NOT_FOUND,
+                  SOURCE_PATH_NOT_FOUND.getErrorCode(), EMPTY_STRING, null, 
op);
+            }
+          }
+        }, failedCall);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java
new file mode 100644
index 00000000000..e4d9f826000
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRenameRecovery.java
@@ -0,0 +1,770 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.RenameAtomicity;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CONSUMER_MAX_LAG;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_PRODUCER_QUEUE_MAX_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_ALREADY_EXISTS;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.BLOB_PATH_NOT_FOUND;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.PATH_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
+import static org.apache.hadoop.fs.azurebfs.utils.AbfsTestUtils.createFiles;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test rename recovery operation.
+ */
+public class ITestAzureBlobFileSystemRenameRecovery extends
+    AbstractAbfsIntegrationTest {
+
+  private static final int FAILED_CALL = 15;
+
+  private static final int TOTAL_FILES = 25;
+
+  public ITestAzureBlobFileSystemRenameRecovery() throws Exception {
+    super();
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameCopyFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the delete operation.
+   * Simulates an error on the 6th delete operation and verifies the behavior.
+   */
+  @Test
+  public void testRenameDeleteFailureInBetween() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of delete operations
+      AtomicInteger deleteCall = new AtomicInteger(0);
+      Mockito.doAnswer(deleteRequest -> {
+        if (deleteCall.get() == FAILED_CALL) {
+          throw new AbfsRestOperationException(
+              BLOB_PATH_NOT_FOUND.getStatusCode(),
+              BLOB_PATH_NOT_FOUND.getErrorCode(),
+              BLOB_PATH_NOT_FOUND.getErrorMessage(),
+              new Exception());
+        }
+        deleteCall.incrementAndGet();
+        return deleteRequest.callRealMethod();
+      }).when(client).deleteBlobPath(Mockito.any(Path.class),
+          Mockito.anyString(), Mockito.any(TracingContext.class));
+
+      renameOperationWithRecovery(fs, src, dst, deleteCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in 
the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWhenDestAlreadyExist() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+      // Create the destination directory
+      fs.mkdirs(dst);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      // Assertions to validate source and dest status before rename
+      validateRename(fs, src, dst, true, true, false);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Tests renaming a directory with a failure during the copy operation.
+   * Since, destination path already exists, there will be adjustment in the
+   * destination path. After crash recovery, recovery should succeed even in 
the
+   * case when destination path already exists.
+   * Simulates an error when copying on the 6th call.
+   */
+  @Test
+  public void testRenameRecoveryWithMarkerPresentInDest() throws Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Test to check behaviour when rename is called on a atomic rename directory
+   * for which rename pending json file is already present.
+   * @throws Exception in case of failure
+   */
+  @Test
+  public void testRenameWhenAlreadyRenamePendingJsonFilePresent() throws 
Exception {
+    try (AzureBlobFileSystem fs = 
Mockito.spy(this.getFileSystem(getConfig()))) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+      fs.getAbfsStore().setClient(client);
+      Path src = new Path("/hbase/A1/A2");
+      Path dst = new Path("/hbase/A1/A3");
+
+      // Create sample files in the source directory
+      createFiles(fs, src, TOTAL_FILES);
+
+      // Track the number of copy operations
+      AtomicInteger copyCall = new AtomicInteger(0);
+      renameCrashInBetween(fs, src, dst, client, copyCall);
+    }
+  }
+
+  /**
+   * Test case to verify crash recovery with a single child folder.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists 
for a single child folder
+   * under the parent directory. It ensures that when listing the files in the 
parent directory,
+   * only the child folder is returned, and no additional files are listed.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithSingleChildFolder() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 0 file")
+        .isEqualTo(0);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery with multiple child folders.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists, 
and multiple files are
+   * created in the parent directory. It ensures that when listing the files 
in the parent directory,
+   * the correct number of files is returned.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithMultipleChildFolder() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path("/hbase/A1"));
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 2 files")
+        .isEqualTo(2);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery with a pending rename JSON file.
+   *
+   * This test simulates a scenario where a pending rename JSON file exists in 
the parent directory,
+   * and it ensures that after the deletion of the target directory and 
creation of new files,
+   * the listing operation correctly returns the remaining files.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithPendingJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.delete(path, true);
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 2 files")
+        .isEqualTo(2);
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, false);
+  }
+
+  /**
+   * Test case to verify crash recovery when no pending rename JSON file 
exists.
+   *
+   * This test simulates a scenario where there is no pending rename JSON file 
in the directory.
+   * It ensures that the listing operation correctly returns all files in the 
parent directory.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithoutAnyPendingJsonFile() throws 
Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    fs.delete(renameJson, true);
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 3 files")
+        .isEqualTo(3);
+    // Pending json file not present, no recovery take place, so source 
directory should exist.
+    assertPendingJsonFile(fs, renameJson, fileStatuses, path, true);
+  }
+
+  /**
+   * Test case to verify crash recovery when a pending rename JSON directory 
exists.
+   *
+   * This test simulates a scenario where a pending rename JSON directory 
exists, ensuring that the
+   * listing operation correctly returns all files in the parent directory 
without triggering a redo
+   * rename operation. It also checks that the directory with the suffix 
"-RenamePending.json" exists.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithPendingJsonDir() throws Exception {
+    try (AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem())) {
+      assumeBlobServiceType();
+      AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+      Path path = new Path("/hbase/A1/A2");
+      Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+      fs.mkdirs(renameJson);
+
+      fs.create(new Path(path.getParent(), "file1.txt"));
+      fs.create(new Path(path, "file2.txt"));
+
+      AtomicInteger redoRenameCall = new AtomicInteger(0);
+      Mockito.doAnswer(answer -> {
+        redoRenameCall.incrementAndGet();
+        return answer.callRealMethod();
+      }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+          Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+      FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+      Assertions.assertThat(fileStatuses.length)
+          .describedAs("List should return 3 files")
+          .isEqualTo(3);
+
+      Assertions.assertThat(redoRenameCall.get())
+          .describedAs("No redo rename call should be made")
+          .isEqualTo(0);
+
+      Assertions.assertThat(
+              Arrays.stream(fileStatuses)
+                  .anyMatch(status -> 
renameJson.toUri().getPath().equals(status.getPath().toUri().getPath())))
+          .describedAs("Directory with suffix -RenamePending.json should 
exist.")
+          .isTrue();
+    }
+  }
+
+  /**
+   * Test case to verify crash recovery during listing with multiple pending 
rename JSON files.
+   *
+   * This test simulates a scenario where multiple pending rename JSON files 
exist, ensuring that
+   * crash recovery properly handles the situation. It verifies that two redo 
rename calls are made
+   * and that the list operation returns the correct number of paths.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testListCrashRecoveryWithMultipleJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+
+    // 1st Json file
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+    // 2nd Json file
+    Path path2 = new Path("/hbase/A1/A3");
+    fs.create(new Path(path2, "file3.txt"));
+
+    Path renameJson2 = new Path(path2.getParent(), path2.getName() + SUFFIX);
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path2);
+
+    new RenameAtomicity(path2, new Path("/hbase/test4"),
+        renameJson2, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client).preRename();
+
+    fs.create(new Path(path, "file2.txt"));
+
+    AtomicInteger redoRenameCall = new AtomicInteger(0);
+    Mockito.doAnswer(answer -> {
+      redoRenameCall.incrementAndGet();
+      return answer.callRealMethod();
+    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+        Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+    FileStatus[] fileStatuses = fs.listStatus(path.getParent());
+
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List should return 0 paths")
+        .isEqualTo(0);
+
+    Assertions.assertThat(redoRenameCall.get())
+        .describedAs("2 redo rename calls should be made")
+        .isEqualTo(2);
+    assertPathStatus(fs, path, false,
+        "Source directory should not exist.");
+    assertPathStatus(fs, new Path("/hbase/test4/file.txt"), true,
+        "File in destination directory should exist.");
+    assertPathStatus(fs, path2, false,
+        "Source directory should not exist");
+    assertPathStatus(fs, new Path("/hbase/test4/file2.txt"), true,
+        "File in destination directory should exist.");
+    assertPathStatus(fs, renameJson, false,
+        "Rename Pending Json file should not exist.");
+    assertPathStatus(fs, renameJson2, false,
+        "Rename Pending Json file should not exist.");
+  }
+
+  /**
+   * Test case to verify path status when a pending rename JSON file exists.
+   *
+   * This test simulates a scenario where a rename operation was pending, and 
ensures that
+   * the path status retrieval triggers a redo rename operation. The test also 
checks that
+   * the correct error code (`PATH_NOT_FOUND`) is returned.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testGetPathStatusWithPendingJsonFile() throws Exception {
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+    fs.create(new Path("/hbase/A1/file1.txt"));
+    fs.create(new Path("/hbase/A1/file2.txt"));
+
+    AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
+
+    AtomicInteger redoRenameCall = new AtomicInteger(0);
+    Mockito.doAnswer(answer -> {
+      redoRenameCall.incrementAndGet();
+      return answer.callRealMethod();
+    }).when(client).getRedoRenameAtomicity(Mockito.any(Path.class),
+        Mockito.anyInt(), Mockito.any(TracingContext.class));
+
+    TracingContext tracingContext = new TracingContext(
+        conf.getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, 
null);
+
+    AzureServiceErrorCode azureServiceErrorCode = intercept(
+        AbfsRestOperationException.class, () -> client.getPathStatus(
+            path.toUri().getPath(), true,
+            tracingContext, null)).getErrorCode();
+
+    Assertions.assertThat(azureServiceErrorCode.getErrorCode())
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND.getErrorCode());
+
+    Assertions.assertThat(redoRenameCall.get())
+        .describedAs("There should be one redo rename call")
+        .isEqualTo(1);
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+  }
+
+  /**
+   * Test case to verify the behavior when the ETag of a file changes during a 
rename operation.
+   *
+   * This test simulates a scenario where the ETag of a file changes after the 
creation of a
+   * rename pending JSON file. The steps include:
+   * - Creating a rename pending JSON file with an old ETag.
+   * - Deleting the original directory for an ETag change.
+   * - Creating new files in the directory.
+   * - Verifying that the copy blob call is not triggered.
+   * - Verifying that the rename atomicity operation is called once.
+   *
+   * The test ensures that the system correctly handles the ETag change during 
the rename process.
+   *
+   * @throws Exception if any error occurs during the test execution
+   */
+  @Test
+  public void testETagChangedDuringRename() throws Exception {
+    assumeBlobServiceType();
+    Path path = new Path("/hbase/A1/A2");
+    Path renameJson = new Path(path.getParent(), path.getName() + SUFFIX);
+    // Create rename pending json file with old etag
+    AzureBlobFileSystem fs = createJsonFile(path, renameJson);
+    AbfsBlobClient abfsBlobClient = (AbfsBlobClient) addSpyHooksOnClient(fs);
+    fs.getAbfsStore().setClient(abfsBlobClient);
+
+    // Delete the directory to change etag
+    fs.delete(path, true);
+
+    fs.create(new Path(path, "file1.txt"));
+    fs.create(new Path(path, "file2.txt"));
+    AtomicInteger numberOfCopyBlobCalls = new AtomicInteger(0);
+    Mockito.doAnswer(copyBlob -> {
+          numberOfCopyBlobCalls.incrementAndGet();
+          return copyBlob.callRealMethod();
+        })
+        .when(abfsBlobClient)
+        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+            Mockito.nullable(String.class),
+            Mockito.any(TracingContext.class));
+
+    AtomicInteger numberOfRedoRenameAtomicityCalls = new AtomicInteger(0);
+    Mockito.doAnswer(redoRenameAtomicity -> {
+          numberOfRedoRenameAtomicityCalls.incrementAndGet();
+          return redoRenameAtomicity.callRealMethod();
+        })
+        .when(abfsBlobClient)
+        .getRedoRenameAtomicity(Mockito.any(Path.class), Mockito.anyInt(),
+            Mockito.any(TracingContext.class));
+    // Call list status to trigger rename redo
+    fs.listStatus(path.getParent());
+    Assertions.assertThat(numberOfRedoRenameAtomicityCalls.get())
+        .describedAs("There should be one call to getRedoRenameAtomicity")
+        .isEqualTo(1);
+    Assertions.assertThat(numberOfCopyBlobCalls.get())
+        .describedAs("There should be no copy blob call")
+        .isEqualTo(0);
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+  }
+
+  /**
+   * Triggers rename recovery by calling getPathStatus on the source path.
+   * This simulates a scenario where the rename operation was interrupted,
+   * and the system needs to recover the state of the source path.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to trigger recovery on.
+   * @throws Exception If an error occurs during the recovery process.
+   */
+  private void triggerRenameRecovery(AzureBlobFileSystem fs, Path src) throws 
Exception {
+    // Trigger rename recovery
+    TracingContext tracingContext = new TracingContext(
+        getConfiguration().getClientCorrelationId(), fs.getFileSystemId(),
+        FSOperationType.GET_FILESTATUS, TracingHeaderFormat.ALL_ID_FORMAT, 
null);
+    AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+        AbfsRestOperationException.class, () -> {
+          fs.getAbfsStore().getClient().getPathStatus(src.toUri().getPath(), 
true,
+              tracingContext, null);
+        }).getErrorCode();
+    Assertions.assertThat(errorCode)
+        .describedAs("Path had to be recovered from atomic rename operation.")
+        .isEqualTo(PATH_NOT_FOUND);
+  }
+
+  /**
+   * Simulates a failure during the rename operation by throwing an exception
+   * when the copyBlob method is called. This is used to test the behavior of
+   * the rename recovery operation when a blob already exists at the 
destination.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param src The source path to rename.
+   * @param dst The destination path for the rename operation.
+   * @param client The AbfsBlobClient instance.
+   * @param copyCall The AtomicInteger to track the number of copy calls.
+   * @throws AzureBlobFileSystemException If an error occurs during the 
operation.
+   */
+  private void renameCrashInBetween(AzureBlobFileSystem fs, Path src, Path dst,
+      AbfsBlobClient client, AtomicInteger copyCall)
+      throws Exception {
+    Mockito.doAnswer(copyRequest -> {
+      if (copyCall.get() == FAILED_CALL) {
+        throw new AbfsRestOperationException(
+            BLOB_ALREADY_EXISTS.getStatusCode(),
+            BLOB_ALREADY_EXISTS.getErrorCode(),
+            BLOB_ALREADY_EXISTS.getErrorMessage(),
+            new Exception());
+      }
+      copyCall.incrementAndGet();
+      return copyRequest.callRealMethod();
+    }).when(client).copyBlob(Mockito.any(Path.class),
+        Mockito.any(Path.class), Mockito.nullable(String.class),
+        Mockito.any(TracingContext.class));
+    renameOperationWithRecovery(fs, src, dst, copyCall);
+  }
+
+  /**
+   * Helper method to create the configuration for the AzureBlobFileSystem.
+   *
+   * @return The configuration object.
+   */
+  private Configuration getConfig() {
+    Configuration config = new Configuration(this.getRawConfiguration());
+    config.set(FS_AZURE_PRODUCER_QUEUE_MAX_SIZE, "5");
+    config.set(FS_AZURE_CONSUMER_MAX_LAG, "3");
+    config.set(FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD, "2");
+    return config;
+  }
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and 
verification
+   * of client interactions in tests. It replaces the actual store and client 
with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Helper method to validate that the rename was successful and that the 
destination exists.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param dst The destination path.
+   * @param src The source path.
+   * @throws IOException If an I/O error occurs during the validation.
+   */
+  private void validateRename(AzureBlobFileSystem fs, Path src, Path dst,
+      boolean isSrcExist, boolean isDstExist, boolean isJsonExist) throws 
Exception {
+    // Validate pending JSON file status
+    assertPathStatus(fs,
+        new Path(src.getParent(), src.getName() + SUFFIX), isJsonExist,
+        "Pending JSON file");
+
+    // Validate source directory status
+    assertPathStatus(fs, src, isSrcExist, "Source directory");
+
+    // Validate destination directory status
+    assertPathStatus(fs, dst, isDstExist, "Destination directory");
+  }
+
+  /**
+   * Helper method to assert the status of a path in the AzureBlobFileSystem.
+   *
+   * @param fs The AzureBlobFileSystem instance to check the existence on.
+   * @param path The path to check.
+   * @param shouldExist Whether the path should exist or not.
+   * @param description A description for the assertion.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPathStatus(AzureBlobFileSystem fs, Path path,
+      boolean shouldExist, String description) throws Exception{
+    TracingContext tracingContext = getTestTracingContext(fs, true);
+    AbfsBlobClient client = ((AbfsBlobClient) fs.getAbfsClient());
+    if (shouldExist) {
+      int actualStatus = client.getPathStatus(
+              path.toUri().getPath(), tracingContext,
+              null, true)
+          .getResult().getStatusCode();
+      Assertions.assertThat(actualStatus)
+          .describedAs("%s should exists", description)
+          .isEqualTo(HTTP_OK);
+    } else {
+      AzureServiceErrorCode errorCode = LambdaTestUtils.intercept(
+          AbfsRestOperationException.class, () -> {
+            client.getPathStatus(path.toUri().getPath(), true,
+                tracingContext, null);
+          }).getErrorCode();
+      Assertions.assertThat(errorCode)
+          .describedAs("%s should not exists", description)
+          .isEqualTo(BLOB_PATH_NOT_FOUND);
+    }
+  }
+
+  /**
+   * Helper method to create a json file.
+   * @param path parent path
+   * @param renameJson rename json path
+   * @return file system
+   * @throws IOException in case of failure
+   */
+  private AzureBlobFileSystem createJsonFile(Path path, Path renameJson)
+      throws IOException {
+    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+    assumeBlobServiceType();
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    fs.setWorkingDirectory(new Path(ROOT_PATH));
+    fs.create(new Path(path, "file.txt"));
+
+    VersionedFileStatus fileStatus
+        = (VersionedFileStatus) fs.getFileStatus(path);
+
+    new RenameAtomicity(path, new Path("/hbase/test4"),
+        renameJson, getTestTracingContext(fs, true),
+        fileStatus.getEtag(), client)
+        .preRename();
+
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should exist.")
+        .isTrue();
+
+    return fs;
+  }
+
+  /**
+   * Helper method to perform the rename operation and validate the results.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for the rename 
operation.
+   * @param src The source path (directory).
+   * @param dst The destination path (directory).
+   * @param countCall The AtomicInteger to track the number of operations.
+   * @throws Exception If an error occurs during the rename operation.
+   */
+  private void renameOperationWithRecovery(AzureBlobFileSystem fs, Path src,
+      Path dst, AtomicInteger countCall) throws Exception {
+    Assertions.assertThat(fs.rename(src, dst))
+        .describedAs("Rename should crash in between.")
+        .isFalse();
+
+    // Validate copy operation count
+    Assertions.assertThat(countCall.get())
+        .describedAs("Operation count should be less than 10.")
+        .isLessThan(TOTAL_FILES);
+
+    // Assertions to validate renamed destination and source
+    validateRename(fs, src, dst, true, true, true);
+
+    // Validate that rename redo operation was triggered
+    countCall.set(0);
+    triggerRenameRecovery(fs, src);
+
+    Assertions.assertThat(countCall.get())
+        .describedAs("Operation count should be greater than 0.")
+        .isGreaterThan(0);
+
+    // Validate final state of destination and source
+    validateRename(fs, src, dst, false, true, false);
+  }
+
+  /**
+   * Helper method to assert that the pending JSON file does not exist
+   * and that the list of file statuses does not contain the rename pending 
JSON file.
+   *
+   * @param fs The AzureBlobFileSystem instance.
+   * @param renameJson The path of the rename pending JSON file.
+   * @param fileStatuses The array of FileStatus objects to check.
+   * @param srcPath The source path to check.
+   * @throws Exception If an error occurs during the assertion.
+   */
+  private void assertPendingJsonFile(AzureBlobFileSystem fs,
+      Path renameJson, FileStatus[] fileStatuses,
+      Path srcPath, boolean isSrcPathExist) throws Exception {
+    Assertions.assertThat(fs.exists(renameJson))
+        .describedAs("Rename Pending Json file should not exist.")
+        .isFalse();
+
+    Assertions.assertThat(
+            Arrays.stream(fileStatuses)
+                .anyMatch(status ->
+                    renameJson.toUri().getPath()
+                        .equals(status.getPath().toUri().getPath())))
+        .describedAs(
+            "List status should not contains any file with suffix 
-RenamePending.json.")
+        .isFalse();
+
+    Assertions.assertThat(
+            Arrays.stream(fileStatuses)
+                .anyMatch(status ->
+                    srcPath.toUri().getPath()
+                        .equals(status.getPath().toUri().getPath())))
+        .describedAs(
+            "List status should not contains source path.")
+        .isEqualTo(isSrcPathExist);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index a6652fe85f1..9af8f0f5a6f 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -352,7 +352,7 @@ public static void mockGetRenameBlobHandler(AbfsBlobClient 
blobClient,
               Mockito.doAnswer(answer1 -> {
                 functionRaisingIOE.apply(blobRenameHandler);
                 return answer1.callRealMethod();
-              }).when(blobRenameHandler).execute();
+              }).when(blobRenameHandler).execute(Mockito.anyBoolean());
               return blobRenameHandler;
             })
             .when(blobClient)
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java
index 9faa54e7ba1..d6b11865130 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AbfsTestUtils.java
@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.fs.azurebfs.utils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import com.microsoft.azure.storage.CloudStorageAccount;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -25,10 +32,13 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SCHEME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_SECURE_SCHEME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONTAINER_PREFIX;
@@ -40,6 +50,8 @@ public final class AbfsTestUtils extends 
AbstractAbfsIntegrationTest {
     private static final Logger LOG =
             LoggerFactory.getLogger(AbfsTestUtils.class);
 
+  private static final int TOTAL_THREADS_IN_POOL = 5;
+
   public AbfsTestUtils() throws Exception {
     super();
   }
@@ -95,4 +107,29 @@ public static void disableFilesystemCaching(Configuration 
conf) {
         conf.setBoolean(String.format("fs.%s.impl.disable.cache", 
ABFS_SCHEME), true);
         conf.setBoolean(String.format("fs.%s.impl.disable.cache", 
ABFS_SECURE_SCHEME), true);
     }
+
+  /**
+   * Helper method to create files in the given directory.
+   *
+   * @param fs The AzureBlobFileSystem instance to use for file creation.
+   * @param path The source path (directory).
+   * @param numFiles The number of files to create.
+   * @throws ExecutionException, InterruptedException If an error occurs 
during file creation.
+   */
+  public static void createFiles(AzureBlobFileSystem fs, Path path, int 
numFiles)
+      throws ExecutionException, InterruptedException {
+    ExecutorService executorService =
+        Executors.newFixedThreadPool(TOTAL_THREADS_IN_POOL);
+    List<Future> futures = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      final int iter = i;
+      Future future = executorService.submit(() ->
+          fs.create(new Path(path, FILE + iter + ".txt")));
+      futures.add(future);
+    }
+    for (Future future : futures) {
+      future.get();
+    }
+    executorService.shutdown();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to