This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc5a5b8543a HADOOP-19658. ABFS:Create and rename idempotency for FNS 
Blob (#7914)
bc5a5b8543a is described below

commit bc5a5b8543a2cc35d3c2e8ccb52329de62e78639
Author: Anmol Asrani <anmol.asrani...@gmail.com>
AuthorDate: Wed Sep 3 04:12:35 2025 +0000

    HADOOP-19658. ABFS:Create and rename idempotency for FNS Blob (#7914)
    
    Contributed by Anmol Asrani
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  14 ++
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   2 +
 .../constants/FileSystemConfigurations.java        |   2 +
 .../fs/azurebfs/services/AbfsBlobClient.java       |  32 ++-
 .../hadoop/fs/azurebfs/services/AbfsIoUtils.java   |  12 +-
 .../fs/azurebfs/ITestAbfsNetworkStatistics.java    | 264 +++++++++++----------
 .../azurebfs/ITestAzureBlobFileSystemCreate.java   | 110 ++++++++-
 .../fs/azurebfs/ITestAzureBlobFileSystemLease.java |   2 +-
 .../azurebfs/ITestAzureBlobFileSystemRename.java   |  82 +++++++
 .../fs/azurebfs/ITestWasbAbfsCompatibility.java    |   6 +-
 .../fs/azurebfs/services/AbfsClientTestUtil.java   | 117 +++++++++
 11 files changed, 508 insertions(+), 135 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index f570f82c5be..7ae8f76b187 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -492,6 +492,10 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
   private boolean enableClientTransactionId;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY,
+      DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY)
+  private boolean enableCreateIdempotency;
+
   private String clientProvidedEncryptionKey;
   private String clientProvidedEncryptionKeySHA;
 
@@ -1047,6 +1051,12 @@ public String getAzureAtomicRenameDirs() {
   }
 
   public boolean isConditionalCreateOverwriteEnabled() {
+    // If either the configured FS service type or the ingress service type is 
BLOB,
+    // conditional create-overwrite is not used.
+    if (getIsCreateIdempotencyEnabled() && (getFsConfiguredServiceType() == 
AbfsServiceType.BLOB
+        || getIngressServiceType() == AbfsServiceType.BLOB)) {
+      return false;
+    }
     return this.enableConditionalCreateOverwrite;
   }
 
@@ -1178,6 +1188,10 @@ public boolean getIsClientTransactionIdEnabled() {
     return enableClientTransactionId;
   }
 
+  public boolean getIsCreateIdempotencyEnabled() {
+    return enableCreateIdempotency;
+  }
+
   /**
    * Enum config to allow user to pick format of x-ms-client-request-id header
    * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index ef39fb11b2d..7d73f1a3fe7 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -430,6 +430,8 @@ public static String containerProperty(String property, 
String fsName, String ac
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = 
"fs.azure.blob.dir.delete.max.thread";
   /**Flag to enable/disable sending client transactional ID during 
create/rename operations: {@value}*/
   public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
"fs.azure.enable.client.transaction.id";
+  /**Flag to enable/disable create idempotency during create operation: 
{@value}*/
+  public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = 
"fs.azure.enable.create.blob.idempotency";
 
   private ConfigurationKeys() {}
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index d53e936fd5c..640a658b955 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -240,5 +240,7 @@ public final class FileSystemConfigurations {
 
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
true;
 
+  public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY 
= true;
+
   private FileSystemConfigurations() {}
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index bb46a97835f..77aca70990f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -509,9 +509,34 @@ public AbfsRestOperation createPath(final String path,
       final TracingContext tracingContext) throws AzureBlobFileSystemException 
{
     AbfsRestOperation op;
     if (isFileCreation) {
-      // Create a file with the specified parameters
-      op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
-          contextEncryptionAdapter, tracingContext);
+      if (getAbfsConfiguration().getIsCreateIdempotencyEnabled()) {
+        AbfsRestOperation statusOp = null;
+        try {
+          // Check if the file already exists by calling GetPathStatus
+          statusOp = getPathStatus(path, tracingContext, null, false);
+        } catch (AbfsRestOperationException ex) {
+          // If the path does not exist, continue with file creation
+          // For other errors, rethrow the exception
+          if (ex.getStatusCode() != HTTP_NOT_FOUND) {
+            throw ex;
+          }
+        }
+        // If the file exists and overwrite is not allowed, throw conflict
+        if (statusOp != null && statusOp.hasResult() && !overwrite) {
+          throw new AbfsRestOperationException(
+              HTTP_CONFLICT,
+              AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
+              PATH_EXISTS,
+              null);
+        } else {
+          // Proceed with file creation (force overwrite = true)
+          op = createFile(path, true, permissions, isAppendBlob, eTag,
+              contextEncryptionAdapter, tracingContext);
+        }
+      } else {
+        op = createFile(path, overwrite, permissions, isAppendBlob, eTag,
+            contextEncryptionAdapter, tracingContext);
+      }
     } else {
       // Create a directory with the specified parameters
       op = createDirectory(path, permissions, isAppendBlob, eTag,
@@ -584,7 +609,6 @@ public AbfsRestOperation createPathRestOp(final String path,
     if (eTag != null && !eTag.isEmpty()) {
       requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, 
eTag));
     }
-
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
         AbfsRestOperationType.PutBlob,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
index 44fa2d8d8bd..22fd9e15b6b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsIoUtils.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +56,15 @@ public static void dumpHeadersToDebugLog(final String origin,
         if (key == null) {
           key = "HTTP Response";
         }
-        String values = StringUtils.join(";", entry.getValue());
+        List<String> valuesList = entry.getValue();
+        if (valuesList == null) {
+          valuesList = Collections.emptyList();
+        } else {
+          valuesList = valuesList.stream()
+              .map(v -> v == null ? "" : v) // replace null with empty string
+              .collect(Collectors.toList());
+        }
+        String values = StringUtils.join(";", valuesList);
         if (key.contains("Cookie")) {
           values = "*cookie info*";
         }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
index e29bfc5f624..5eb0c3d9910 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java
@@ -25,9 +25,11 @@
 import org.slf4j.LoggerFactory;
 import org.junit.jupiter.api.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
@@ -41,6 +43,8 @@
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.GET_RESPONSES;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.SEND_REQUESTS;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY;
 
 public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
 
@@ -74,134 +78,69 @@ private int countDirectory(String path) {
   public void testAbfsHttpSendStatistics() throws IOException {
     describe("Test to check correct values of statistics after Abfs http send "
         + "request is done.");
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, false);
+    FileSystem fileSystem = FileSystem.newInstance(conf);
+    try (AzureBlobFileSystem fs = (AzureBlobFileSystem) fileSystem) {
+         Map<String, Long> metricMap;
+         Path sendRequestPath = path(getMethodName());
+         String path = sendRequestPath.toString();
+         int directory = countDirectory(path);
+         String testNetworkStatsString = "http_send";
+
+         metricMap = getInstrumentationMap(fs);
+      long expectedConnectionsMade = metricMap.get(
+          CONNECTIONS_MADE.getStatName());
+      long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
+      long expectedBytesSent = 0;
+      AbfsClient client = fs.getAbfsStore()
+          .getClientHandler()
+          .getIngressClient();
 
-    AzureBlobFileSystem fs = getFileSystem();
-    Map<String, Long> metricMap;
-    Path sendRequestPath = path(getMethodName());
-    String path = sendRequestPath.toString();
-    int directory = countDirectory(path);
-    String testNetworkStatsString = "http_send";
-
-    metricMap = getInstrumentationMap(fs);
-    long expectedConnectionsMade = 
metricMap.get(CONNECTIONS_MADE.getStatName());
-    long expectedRequestsSent = metricMap.get(SEND_REQUESTS.getStatName());
-    long expectedBytesSent = 0;
-    AbfsClient client = 
fs.getAbfsStore().getClientHandler().getIngressClient();
-
-    // --------------------------------------------------------------------
-     // Operation: Creating AbfsOutputStream
-    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
-        sendRequestPath)) {
-       // Network stats calculation: For Creating AbfsOutputStream:
-       // 1 create request = 1 connection made and 1 send request
-      if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
-        expectedRequestsSent += (directory);
-        // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
-        expectedConnectionsMade += ((directory * 2));
-      } else {
-        expectedRequestsSent++;
-        expectedConnectionsMade++;
-      }
-      // --------------------------------------------------------------------
-
-      // Operation: Write small data
-      // Network stats calculation: No additions.
-      // Data written is less than the buffer size and hence will not
-      // trigger any append request to store
-      out.write(testNetworkStatsString.getBytes());
-      // --------------------------------------------------------------------
-
-       // Operation: HFlush
-       // Flushes all outstanding data (i.e. the current unfinished packet)
-       // from the client into the service on all DataNode replicas.
-      out.hflush();
-      /*
-       * Network stats calculation:
-       * 3 possibilities here:
-       * A. As there is pending data to be written to store, this will result 
in:
-       *    1 append + 1 flush = 2 connections and 2 send requests
-       *
-       * B. If config "fs.azure.enable.small.write.optimization" is enabled, 
append
-       *   and flush call will be merged for small data in buffer in this test.
-       *   In which case it will be:
-       *   1 append+flush request = 1 connection and 1 send request
-       *
-       * C. If the path is configured for append Blob files to be used, hflush
-       *   is a no-op. So in this case:
-       *   1 append = 1 connection and 1 send request
-       */
-      if 
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
-          || 
(fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) {
-        expectedConnectionsMade++;
-        expectedRequestsSent++;
-      } else {
-        expectedConnectionsMade += 2;
-        expectedRequestsSent += 2;
-      }
-      expectedBytesSent += testNetworkStatsString.getBytes().length;
       // --------------------------------------------------------------------
-
-      // Assertions
-      metricMap = getInstrumentationMap(fs);
-      assertAbfsStatistics(CONNECTIONS_MADE,
-          expectedConnectionsMade, metricMap);
-      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
-          metricMap);
-      assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
-          expectedBytesSent, metricMap);
-    }
-
-    // --------------------------------------------------------------------
-    // Operation: AbfsOutputStream close.
-    // Network Stats calculation: 1 flush (with close) is send.
-    // 1 flush request = 1 connection and 1 send request
-    // Flush with no data is a no-op for blob endpoint, hence update only for 
dfs endpoint.
-    if (client instanceof AbfsDfsClient) {
-      expectedConnectionsMade++;
-      expectedRequestsSent++;
-    }
-    // --------------------------------------------------------------------
-
-    // Operation: Re-create the file / create overwrite scenario
-    try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
-        sendRequestPath)) {
-      /*
-       * Network Stats calculation: create overwrite
-       * There are 2 possibilities here.
-       * A. create overwrite results in 1 server call
-       *    create with overwrite=true = 1 connection and 1 send request
-       *
-       * B. If config "fs.azure.enable.conditional.create.overwrite" is 
enabled,
-       *    create overwrite=false (will fail in this case as file is indeed 
present)
-       *    + getFileStatus to fetch the file ETag
-       *    + create overwrite=true
-       *    = 3 connections and 2 send requests in case of Dfs Client
-       *    = 1 ListBlob + 2 GPS + 2 PutBlob
-       */
-      if 
(fs.getAbfsStore().getAbfsConfiguration().isConditionalCreateOverwriteEnabled())
 {
+      // Operation: Creating AbfsOutputStream
+      try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+          sendRequestPath)) {
+        // Network stats calculation: For Creating AbfsOutputStream:
+        // 1 create request = 1 connection made and 1 send request
         if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
-          expectedRequestsSent += 2;
-          expectedConnectionsMade += 5;
+          expectedRequestsSent += (directory);
+          // Per directory, we have 2 calls :- 1 PutBlob and 1 ListBlobs call.
+          expectedConnectionsMade += ((directory * 2));
         } else {
-          expectedConnectionsMade += 3;
-          expectedRequestsSent += 2;
+          expectedRequestsSent++;
+          expectedConnectionsMade++;
         }
-      } else {
-        expectedConnectionsMade += 1;
-        expectedRequestsSent += 1;
-      }
-      // --------------------------------------------------------------------
+        // --------------------------------------------------------------------
 
-      // Operation: Multiple small appends + hflush
-      for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
+        // Operation: Write small data
+        // Network stats calculation: No additions.
+        // Data written is less than the buffer size and hence will not
+        // trigger any append request to store
         out.write(testNetworkStatsString.getBytes());
-        // Network stats calculation: no-op. Small write
+        // --------------------------------------------------------------------
+
+        // Operation: HFlush
+        // Flushes all outstanding data (i.e. the current unfinished packet)
+        // from the client into the service on all DataNode replicas.
         out.hflush();
-        // Network stats calculation: Hflush
-        // refer to previous comments for hFlush network stats calcualtion
-        // possibilities
+        /*
+         * Network stats calculation:
+         * 3 possibilities here:
+         * A. As there is pending data to be written to store, this will 
result in:
+         *    1 append + 1 flush = 2 connections and 2 send requests
+         *
+         * B. If config "fs.azure.enable.small.write.optimization" is enabled, 
append
+         *   and flush call will be merged for small data in buffer in this 
test.
+         *   In which case it will be:
+         *   1 append+flush request = 1 connection and 1 send request
+         *
+         * C. If the path is configured for append Blob files to be used, 
hflush
+         *   is a no-op. So in this case:
+         *   1 append = 1 connection and 1 send request
+         */
         if 
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
-            || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+            || 
(fs.getAbfsStore().getAbfsConfiguration().isSmallWriteOptimizationEnabled())) {
           expectedConnectionsMade++;
           expectedRequestsSent++;
         } else {
@@ -209,16 +148,91 @@ public void testAbfsHttpSendStatistics() throws 
IOException {
           expectedRequestsSent += 2;
         }
         expectedBytesSent += testNetworkStatsString.getBytes().length;
+        // --------------------------------------------------------------------
+
+        // Assertions
+        metricMap = getInstrumentationMap(fs);
+        assertAbfsStatistics(CONNECTIONS_MADE,
+            expectedConnectionsMade, metricMap);
+        assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent,
+            metricMap);
+        assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
+            expectedBytesSent, metricMap);
+      }
+
+      // --------------------------------------------------------------------
+      // Operation: AbfsOutputStream close.
+      // Network Stats calculation: 1 flush (with close) is send.
+      // 1 flush request = 1 connection and 1 send request
+      // Flush with no data is a no-op for blob endpoint, hence update only 
for dfs endpoint.
+      if (client instanceof AbfsDfsClient) {
+        expectedConnectionsMade++;
+        expectedRequestsSent++;
       }
       // --------------------------------------------------------------------
 
-      // Assertions
-      metricMap = fs.getInstrumentationMap();
-      assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade, 
metricMap);
-      assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
-      assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent, 
metricMap);
+      // Operation: Re-create the file / create overwrite scenario
+      try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
+          sendRequestPath)) {
+        /*
+         * Network Stats calculation: create overwrite
+         * There are 2 possibilities here.
+         * A. create overwrite results in 1 server call
+         *    create with overwrite=true = 1 connection and 1 send request
+         *
+         * B. If config "fs.azure.enable.conditional.create.overwrite" is 
enabled,
+         *    create overwrite=false (will fail in this case as file is indeed 
present)
+         *    + getFileStatus to fetch the file ETag
+         *    + create overwrite=true
+         *    = 3 connections and 2 send requests in case of Dfs Client
+         *    = 1 ListBlob + 2 GPS + 2 PutBlob
+         */
+        if (fs.getAbfsStore()
+            .getAbfsConfiguration()
+            .isConditionalCreateOverwriteEnabled()) {
+          if (client instanceof AbfsBlobClient && !getIsNamespaceEnabled(fs)) {
+            expectedRequestsSent += 2;
+            expectedConnectionsMade += 5;
+          } else {
+            expectedConnectionsMade += 3;
+            expectedRequestsSent += 2;
+          }
+        } else {
+          expectedConnectionsMade += 1;
+          expectedRequestsSent += 1;
+        }
+        // --------------------------------------------------------------------
+
+        // Operation: Multiple small appends + hflush
+        for (int i = 0; i < WRITE_OPERATION_LOOP_COUNT; i++) {
+          out.write(testNetworkStatsString.getBytes());
+          // Network stats calculation: no-op. Small write
+          out.hflush();
+          // Network stats calculation: Hflush
+          // refer to previous comments for hFlush network stats calcualtion
+          // possibilities
+          if (fs.getAbfsStore()
+              .isAppendBlobKey(fs.makeQualified(sendRequestPath).toString())
+              || (this.getConfiguration().isSmallWriteOptimizationEnabled())) {
+            expectedConnectionsMade++;
+            expectedRequestsSent++;
+          } else {
+            expectedConnectionsMade += 2;
+            expectedRequestsSent += 2;
+          }
+          expectedBytesSent += testNetworkStatsString.getBytes().length;
+        }
+        // --------------------------------------------------------------------
+
+        // Assertions
+        metricMap = fs.getInstrumentationMap();
+        assertAbfsStatistics(CONNECTIONS_MADE, expectedConnectionsMade,
+            metricMap);
+        assertAbfsStatistics(SEND_REQUESTS, expectedRequestsSent, metricMap);
+        assertAbfsStatistics(AbfsStatistic.BYTES_SENT, expectedBytesSent,
+            metricMap);
+      }
     }
-
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
index c91b8a1f93b..f07b9a5e1dc 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -88,6 +89,7 @@
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_MKDIR_OVERWRITE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
@@ -96,6 +98,7 @@
 import static org.apache.hadoop.fs.azurebfs.services.RenameAtomicity.SUFFIX;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -460,10 +463,18 @@ public void testDefaultCreateOverwriteFileTest() throws 
Throwable {
 
   public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite)
       throws Throwable {
+    if (enableConditionalCreateOverwrite) {
+      assumeHnsEnabled();
+      assumeDfsServiceType();
+      assumeThat(getIngressServiceType())
+          .as("DFS service type is required for this test")
+          .isEqualTo(AbfsServiceType.DFS);
+    }
     try (AzureBlobFileSystem currentFs = getFileSystem()) {
       Configuration config = new Configuration(this.getRawConfiguration());
       config.set("fs.azure.enable.conditional.create.overwrite",
           Boolean.toString(enableConditionalCreateOverwrite));
+      config.set("fs.azure.enable.create.idempotency", "false");
       AzureBlobFileSystemStore store = currentFs.getAbfsStore();
       AbfsClient client = store.getClientHandler().getIngressClient();
 
@@ -595,7 +606,11 @@ public void testCreateFileOverwrite(boolean 
enableConditionalCreateOverwrite)
   @Test
   public void testNegativeScenariosForCreateOverwriteDisabled()
       throws Throwable {
-
+    assumeHnsEnabled();
+    assumeDfsServiceType();
+    assumeThat(getIngressServiceType())
+        .as("DFS service type is required for this test")
+        .isEqualTo(AbfsServiceType.DFS);
     try (AzureBlobFileSystem currentFs = getFileSystem()) {
       Configuration config = new Configuration(this.getRawConfiguration());
       config.set("fs.azure.enable.conditional.create.overwrite",
@@ -1087,6 +1102,7 @@ public void testParallelCreateOverwriteFalse()
       throws Exception {
     Configuration configuration = getRawConfiguration();
     configuration.set(FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, "false");
+    configuration.set(FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY, "false");
     try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
         configuration)) {
       ExecutorService executorService = Executors.newFixedThreadPool(5);
@@ -2236,6 +2252,98 @@ public void 
testFailureInGetPathStatusDuringCreateRecovery() throws Exception {
     }
   }
 
+  /**
+   * Test to simulate a successful create operation followed by a connection 
reset
+   * on the response, triggering a retry.
+   *
+   * This test verifies that the create operation is retried in the event of a
+   * connection reset during the response phase. The test creates a mock
+   * AzureBlobFileSystem and its associated components to simulate the create
+   * operation and the connection reset. It then verifies that the create
+   * operation is retried once before succeeding.
+   *
+   * @throws Exception if an error occurs during the test execution.
+   */
+  @Test
+  public void testCreateIdempotencyForNonHnsBlob() throws Exception {
+    assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND 
BLOB").isFalse();
+    assumeHnsDisabled();
+    assumeBlobServiceType();
+    // Create a spy of AzureBlobFileSystem
+    try (AzureBlobFileSystem fs = Mockito.spy(
+        (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
+      // Create a spy of AzureBlobFileSystemStore
+      AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+
+      // Create spies for the client handler and blob client
+      AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+      AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+      fs.getAbfsStore().setClient(blobClient);
+      fs.getAbfsStore().setClientHandler(clientHandler);
+      // Set up the spies to return the mocked objects
+      Mockito.doReturn(clientHandler).when(store).getClientHandler();
+      Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+      Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+
+      AtomicInteger createCount = new AtomicInteger(0);
+
+      Mockito.doAnswer(answer -> {
+        // Set up the mock for the create operation
+        
AbfsClientTestUtil.setMockAbfsRestOperationForCreateOperation(blobClient,
+            (httpOperation) -> {
+              Mockito.doAnswer(invocation -> {
+                // Call the real processResponse method
+                invocation.callRealMethod();
+
+                int currentCount = createCount.incrementAndGet();
+                if (currentCount == 2) {
+                  Mockito.when(httpOperation.getStatusCode())
+                      .thenReturn(
+                          HTTP_INTERNAL_ERROR); // Status code 500 for 
Internal Server Error
+                  Mockito.when(httpOperation.getStorageErrorMessage())
+                      .thenReturn("CONNECTION_RESET"); // Error message
+                  throw new IOException("Connection Reset");
+                }
+                return null;
+              }).when(httpOperation).processResponse(
+                  Mockito.nullable(byte[].class),
+                  Mockito.anyInt(),
+                  Mockito.anyInt()
+              );
+
+              return httpOperation;
+            });
+        return answer.callRealMethod();
+      }).when(blobClient).createPath(
+          Mockito.anyString(),
+          Mockito.anyBoolean(),
+          Mockito.anyBoolean(),
+          Mockito.any(AzureBlobFileSystemStore.Permissions.class),
+          Mockito.anyBoolean(), Mockito.nullable(String.class), 
Mockito.any(ContextEncryptionAdapter.class),
+          any(TracingContext.class)
+      );
+
+      Path path = new Path("/test/file");
+      fs.create(path, false);
+      Mockito.verify(blobClient, Mockito.times(1)).createPath(
+          Mockito.anyString(),
+          Mockito.anyBoolean(),
+          Mockito.anyBoolean(),
+          Mockito.any(AzureBlobFileSystemStore.Permissions.class),
+          Mockito.anyBoolean(), Mockito.nullable(String.class), 
Mockito.any(ContextEncryptionAdapter.class),
+          any(TracingContext.class));
+
+      Mockito.verify(blobClient, Mockito.times(2)).createPathRestOp(
+          Mockito.anyString(),
+          Mockito.anyBoolean(),
+          Mockito.anyBoolean(),
+          Mockito.anyBoolean(),
+          Mockito.nullable(String.class), 
Mockito.any(ContextEncryptionAdapter.class),
+          any(TracingContext.class));
+      assertIsFile(fs, path);
+    }
+  }
+
   /**
    * Mocks and returns an instance of {@link AbfsDfsClient} for the given 
AzureBlobFileSystem.
    * This method sets up the necessary mock behavior for the client handler 
and ingress client.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
index e7dcc78ace9..95bd76c023b 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemLease.java
@@ -160,7 +160,7 @@ public void testTwoCreate() throws Exception {
 
     try (FSDataOutputStream out = fs.create(testFilePath)) {
       LambdaTestUtils.intercept(IOException.class,
-          isHNSEnabled ? PARALLEL_ACCESS
+          isHNSEnabled && getIngressServiceType() == AbfsServiceType.DFS ? 
PARALLEL_ACCESS
               : client instanceof AbfsBlobClient
                   ? ERR_NO_LEASE_ID_SPECIFIED_BLOB
                   : ERR_NO_LEASE_ID_SPECIFIED, () -> {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
index 1be40c09dbc..73a826c601a 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java
@@ -52,6 +52,7 @@
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
 import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
@@ -74,6 +75,7 @@
 import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_FORBIDDEN;
+import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
@@ -108,6 +110,7 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * Test rename operation.
@@ -1702,6 +1705,85 @@ public void testRenamePathRetryIdempotency() throws 
Exception {
     }
   }
 
+  /**
+   * Test to simulate a successful copy blob operation followed by a 
connection reset
+   * on the response, triggering a retry.
+   *
+   * This test verifies that the copy blob operation is retried in the event 
of a
+   * connection reset during the response phase. The test creates a mock
+   * AzureBlobFileSystem and its associated components to simulate the copy 
blob
+   * operation and the connection reset. It then verifies that the create
+   * operation is retried once before succeeding.
+   *
+   * @throws Exception if an error occurs during the test execution.
+   */
+  @Test
+  public void testRenameIdempotencyForNonHnsBlob() throws Exception {
+    assumeThat(isAppendBlobEnabled()).as("Not valid for APPEND 
BLOB").isFalse();
+    assumeHnsDisabled();
+    assumeBlobServiceType();
+    // Create a spy of AzureBlobFileSystem
+    try (AzureBlobFileSystem fs = Mockito.spy(
+        (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
+      // Create a spy of AzureBlobFileSystemStore
+      AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+
+      // Create spies for the client handler and blob client
+      AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
+      AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
+      fs.getAbfsStore().setClient(blobClient);
+      fs.getAbfsStore().setClientHandler(clientHandler);
+      // Set up the spies to return the mocked objects
+      Mockito.doReturn(clientHandler).when(store).getClientHandler();
+      Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
+      Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
+
+      AtomicInteger copyBlobCount = new AtomicInteger(0);
+      Path sourceDir = path("/testSrc");
+      assertMkdirs(fs, sourceDir);
+      String filename = "file1";
+      Path sourceFilePath = new Path(sourceDir, filename);
+      touch(sourceFilePath);
+      Path destFilePath = new Path(sourceDir, "file2");
+      Mockito.doAnswer(answer -> {
+        // Set up the mock for the create operation
+        
AbfsClientTestUtil.setMockAbfsRestOperationForCopyBlobOperation(blobClient,  
sourceFilePath, destFilePath,
+            (httpOperation) -> {
+              Mockito.doAnswer(invocation -> {
+                // Call the real processResponse method
+                invocation.callRealMethod();
+
+                int currentCount = copyBlobCount.incrementAndGet();
+                if (currentCount == 1) {
+                  Mockito.when(httpOperation.getStatusCode())
+                      .thenReturn(
+                          HTTP_INTERNAL_ERROR); // Status code 500 for 
Internal Server Error
+                  Mockito.when(httpOperation.getStorageErrorMessage())
+                      .thenReturn("CONNECTION_RESET"); // Error message
+                  throw new IOException("Connection Reset");
+                }
+                return null;
+              }).when(httpOperation).processResponse(
+                  Mockito.nullable(byte[].class),
+                  Mockito.anyInt(),
+                  Mockito.anyInt()
+              );
+
+              return httpOperation;
+            });
+        return answer.callRealMethod();
+      }).when(blobClient).copyBlob(
+          Mockito.any(Path.class),
+          Mockito.any(Path.class),
+          Mockito.nullable(String.class),
+          Mockito.any(TracingContext.class)
+      );
+      Assertions.assertThat(fs.rename(sourceFilePath, destFilePath))
+          .describedAs("Rename should succeed.")
+          .isTrue();
+    }
+  }
+
   /**
    * Test to verify that the client transaction ID is included in the response 
header
    * after renaming a file in Azure Blob Storage.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
index ec63c3dcda7..e9f219c20fb 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
@@ -622,8 +622,8 @@ public void testScenario8() throws Exception {
               .isEqualTo(HTTP_CONFLICT);
         }
         Assertions.assertThat(e.getMessage())
-            .as("Expected error message to contain 'AlreadyExists'")
-            .contains("AlreadyExists");
+            .as("Expected error message to contain 'Exists'")
+            .containsIgnoringCase("Exists");
       }
 
       // Remove file
@@ -2069,4 +2069,4 @@ private static void assertIsFile(Path path, FileStatus 
status) {
         .as("Expected a regular file, but was a symlink: %s %s", path, status)
         .isFalse();
   }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
index b9dcefc35e2..cd1a2af7d6c 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java
@@ -33,6 +33,7 @@
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.assertj.core.api.Assertions;
@@ -41,6 +42,7 @@
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.util.functional.FunctionRaisingIOE;
@@ -50,16 +52,22 @@
 import static 
org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPLICATION_XML;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCKLIST;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.BLOCK_BLOB_TYPE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_GET;
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.STAR;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_LENGTH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_MATCH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.IF_NONE_MATCH;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_TYPE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID;
+import static 
org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_CLOSE;
 import static 
org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.QUERY_PARAM_COMP;
+import static 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlob;
 import static 
org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType.PutBlockList;
 import static org.apache.hadoop.fs.azurebfs.services.AuthType.OAuth;
 import static 
org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION;
@@ -192,6 +200,115 @@ public static void 
setMockAbfsRestOperationForFlushOperation(
     addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
   }
 
+  /**
+   * Sets up a mocked {@link AbfsRestOperation} for a create (PutBlob) 
operation
+   * in the Azure Blob File System (ABFS).
+   * <p>
+   * This method is intended for use in testing scenarios where the behavior of
+   * a create request needs to be simulated. It configures a mock
+   * {@link AbfsRestOperation} with the appropriate request headers and 
parameters
+   * for a {@code PutBlob} call, and applies the provided {@code 
functionRaisingIOE}
+   * to customize the behavior of the underlying {@link AbfsHttpOperation}.
+   * <p>
+   *
+   * @param spiedClient        the spied instance of {@link AbfsClient} used
+   *                           for making HTTP requests
+   * @param functionRaisingIOE a function that customizes the behavior of the
+   *                           {@link AbfsRestOperation}'s associated
+   *                           {@link AbfsHttpOperation}, enabling the 
simulation
+   *                           of error conditions or special responses
+   * @throws Exception         if an error occurs while setting up the mocked
+   *                           operation
+   */
+  public static void setMockAbfsRestOperationForCreateOperation(
+      final AbfsClient spiedClient,
+      FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> 
functionRaisingIOE)
+      throws Exception {
+    List<AbfsHttpHeader> requestHeaders = 
ITestAbfsClient.getTestRequestHeaders(
+        spiedClient);
+    requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_TYPE, BLOCK_BLOB_TYPE));
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, 
AbfsHttpConstants.ZERO));
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML));
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = 
spiedClient.createDefaultUriQueryBuilder();
+    final URL url = spiedClient.createRequestUrl("/test/file", 
abfsUriQueryBuilder.toString());
+    AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
+        PutBlob, spiedClient, HTTP_METHOD_PUT,
+        url,
+        requestHeaders,
+        spiedClient.getAbfsConfiguration()));
+
+    Mockito.doReturn(abfsRestOperation)
+        .when(spiedClient)
+        .getAbfsRestOperation(eq(AbfsRestOperationType.PutBlob),
+            Mockito.anyString(), Mockito.any(URL.class), Mockito.anyList());
+
+    addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
+  }
+
+  /**
+   * Sets up a mocked {@link AbfsRestOperation} for a CopyBlob operation
+   * in the Azure Blob File System (ABFS).
+   * <p>
+   * This method is intended for use in testing scenarios where the behavior of
+   * a copyBlob request needs to be simulated. It configures a mock
+   * {@link AbfsRestOperation} with the appropriate request headers and 
parameters
+   * for a {@code CopyBlob} call, and applies the provided {@code 
functionRaisingIOE}
+   * to customize the behavior of the underlying {@link AbfsHttpOperation}.
+   * <p>
+   *
+   * @param spiedClient        the spied instance of {@link AbfsClient} used
+   *                           for making HTTP requests
+   * @param srcPath            the source blob path
+   * @param dstPath            the destination blob path
+   * @param functionRaisingIOE a function that customizes the behavior of the
+   *                           {@link AbfsRestOperation}'s associated
+   *                           {@link AbfsHttpOperation}, enabling the 
simulation
+   *                           of error conditions or special responses
+   * @throws Exception         if an error occurs while setting up the mocked
+   *                           operation
+   */
+  public static void setMockAbfsRestOperationForCopyBlobOperation(
+      final AbfsClient spiedClient,
+      final Path srcPath,
+      final Path dstPath,
+      FunctionRaisingIOE<AbfsHttpOperation, AbfsHttpOperation> 
functionRaisingIOE)
+      throws Exception {
+
+    // Prepare headers
+    List<AbfsHttpHeader> requestHeaders = 
ITestAbfsClient.getTestRequestHeaders(spiedClient);
+
+    // Add CopyBlob specific headers
+    AbfsUriQueryBuilder abfsUriQueryBuilderDst = 
spiedClient.createDefaultUriQueryBuilder();
+    AbfsUriQueryBuilder abfsUriQueryBuilderSrc = new AbfsUriQueryBuilder();
+
+    String dstBlobRelativePath = dstPath.toUri().getPath();
+    String srcBlobRelativePath = srcPath.toUri().getPath();
+
+    final URL url = spiedClient.createRequestUrl(
+        dstBlobRelativePath, abfsUriQueryBuilderDst.toString());
+    final String sourcePathUrl = spiedClient.createRequestUrl(
+        srcBlobRelativePath, abfsUriQueryBuilderSrc.toString()).toString();
+
+    requestHeaders.add(new AbfsHttpHeader(X_MS_COPY_SOURCE, sourcePathUrl));
+    requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
+
+    // Spy on the real CopyBlob operation
+    AbfsRestOperation abfsRestOperation = Mockito.spy(new AbfsRestOperation(
+        AbfsRestOperationType.CopyBlob,
+        spiedClient,
+        HTTP_METHOD_PUT,
+        url,
+        requestHeaders,
+        spiedClient.getAbfsConfiguration()));
+
+    Mockito.doReturn(abfsRestOperation)
+        .when(spiedClient)
+        .getAbfsRestOperation(eq(AbfsRestOperationType.CopyBlob),
+            Mockito.nullable(String.class), Mockito.any(URL.class), 
Mockito.anyList());
+
+    addMockBehaviourToRestOpAndHttpOp(abfsRestOperation, functionRaisingIOE);
+  }
+
   /**
    * Adding general mock behaviour to AbfsRestOperation and AbfsHttpOperation
    * to avoid any NPE occurring. These will avoid any network call made and


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to