This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7dcf76ded59 HADOOP-19472: [ABFS] Improve write workload performance 
for ABFS by efficient concurrency utilization (#7669)
7dcf76ded59 is described below

commit 7dcf76ded594cf3323662523dcc4df4816802499
Author: Anmol Asrani <[email protected]>
AuthorDate: Thu Oct 30 16:10:26 2025 +0530

    HADOOP-19472: [ABFS] Improve write workload performance for ABFS by 
efficient concurrency utilization (#7669)
    
    Contributed by Anmol Asrani.
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  87 ++-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  34 +-
 .../fs/azurebfs/WriteThreadPoolSizeManager.java    | 397 +++++++++++
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   2 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  28 +
 .../constants/FileSystemConfigurations.java        | 123 +++-
 .../contracts/exceptions/AbfsDriverException.java  |  20 +
 .../security/AbfsDelegationTokenIdentifier.java    |  19 +-
 .../security/AbfsDelegationTokenManager.java       |  15 +
 .../hadoop/fs/azurebfs/services/AbfsDfsClient.java |  22 +
 .../azurebfs/services/AbfsInputStreamContext.java  | 119 ++++
 .../azurebfs/TestWriteThreadPoolSizeManager.java   | 770 +++++++++++++++++++++
 .../azurebfs/services/ITestAbfsOutputStream.java   |   2 +-
 .../fs/azurebfs/services/TestAbfsOutputStream.java |   4 +-
 14 files changed, 1621 insertions(+), 21 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 7c355671cf8..c57a0ea2a7f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -483,6 +483,53 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
   private int maxApacheHttpClientIoExceptionsRetries;
 
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
+      DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
+  private boolean dynamicWriteThreadPoolEnablement;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS,
+      DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS)
+  private int writeThreadPoolKeepAliveTime;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+      MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+      MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+      DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS)
+  private int writeCpuMonitoringInterval;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+      MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+      MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT)
+  private int writeHighCpuThreshold;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+      MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+      MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT)
+  private int writeMediumCpuThreshold;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+      MinValue = MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+      MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
+  private int writeLowCpuThreshold;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
+      MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
+      DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
+  private int lowTierMemoryMultiplier;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
+      MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
+      DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
+  private int mediumTierMemoryMultiplier;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
+      MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
+      DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
+  private int highTierMemoryMultiplier;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
       FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = 
DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
       MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = 
MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
@@ -1641,16 +1688,52 @@ public ExponentialRetryPolicy 
getOauthTokenFetchRetryPolicy() {
         oauthTokenFetchRetryDeltaBackoff);
   }
 
-  public int getWriteMaxConcurrentRequestCount() {
+  public int getWriteConcurrentRequestCount() {
     if (this.writeMaxConcurrentRequestCount < 1) {
       return 4 * Runtime.getRuntime().availableProcessors();
     }
     return this.writeMaxConcurrentRequestCount;
   }
 
+  public int getWriteThreadPoolKeepAliveTime() {
+    return writeThreadPoolKeepAliveTime;
+  }
+
+  public int getWriteCpuMonitoringInterval() {
+    return writeCpuMonitoringInterval;
+  }
+
+  public boolean isDynamicWriteThreadPoolEnablement() {
+    return dynamicWriteThreadPoolEnablement;
+  }
+
+  public int getWriteLowCpuThreshold() {
+    return writeLowCpuThreshold;
+  }
+
+  public int getWriteMediumCpuThreshold() {
+    return writeMediumCpuThreshold;
+  }
+
+  public int getWriteHighCpuThreshold() {
+    return writeHighCpuThreshold;
+  }
+
+  public int getLowTierMemoryMultiplier() {
+    return lowTierMemoryMultiplier;
+  }
+
+  public int getMediumTierMemoryMultiplier() {
+    return mediumTierMemoryMultiplier;
+  }
+
+  public int getHighTierMemoryMultiplier() {
+    return highTierMemoryMultiplier;
+  }
+
   public int getMaxWriteRequestsToQueue() {
     if (this.maxWriteRequestsToQueue < 1) {
-      return 2 * getWriteMaxConcurrentRequestCount();
+      return 2 * getWriteConcurrentRequestCount();
     }
     return this.maxWriteRequestsToQueue;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d51559de18e..486e4b3cc9b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -45,6 +45,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.WeakHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -203,6 +204,7 @@ public class AzureBlobFileSystemStore implements Closeable, 
ListingSupport {
   private int blockOutputActiveBlocks;
   /** Bounded ThreadPool for this instance. */
   private ExecutorService boundedThreadPool;
+  private WriteThreadPoolSizeManager poolSizeManager;
 
   /** ABFS instance reference to be held by the store to avoid GC close. */
   private BackReference fsBackRef;
@@ -277,11 +279,19 @@ public AzureBlobFileSystemStore(
     }
     this.blockFactory = abfsStoreBuilder.blockFactory;
     this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
-    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        abfsConfiguration.getWriteMaxConcurrentRequestCount(),
-        abfsConfiguration.getMaxWriteRequestsToQueue(),
-        10L, TimeUnit.SECONDS,
-        "abfs-bounded");
+    if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+      this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+          getClient().getFileSystem() + "-" + UUID.randomUUID(),
+          abfsConfiguration);
+      poolSizeManager.startCPUMonitoring();
+      this.boundedThreadPool = poolSizeManager.getExecutorService();
+    } else {
+      this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+          abfsConfiguration.getWriteConcurrentRequestCount(),
+          abfsConfiguration.getMaxWriteRequestsToQueue(),
+          10L, TimeUnit.SECONDS,
+          "abfs-bounded");
+    }
   }
 
   /**
@@ -320,17 +330,19 @@ public void close() throws IOException {
     }
     try {
       Futures.allAsList(futures).get();
-      // shutdown the threadPool and set it to null.
-      HadoopExecutors.shutdown(boundedThreadPool, LOG,
-          30, TimeUnit.SECONDS);
-      boundedThreadPool = null;
+      if (!abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+        // shutdown the threadPool and set it to null.
+        HadoopExecutors.shutdown(boundedThreadPool, LOG,
+            30, TimeUnit.SECONDS);
+        boundedThreadPool = null;
+      }
     } catch (InterruptedException e) {
       LOG.error("Interrupted freeing leases", e);
       Thread.currentThread().interrupt();
     } catch (ExecutionException e) {
       LOG.error("Error freeing leases", e);
     } finally {
-      IOUtils.cleanupWithLogger(LOG, getClientHandler());
+      IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClientHandler());
     }
   }
 
@@ -797,7 +809,7 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
-            
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
+            
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteConcurrentRequestCount())
             
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .withLease(lease)
             .withEncryptionAdapter(contextEncryptionAdapter)
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
new file mode 100644
index 00000000000..d7887b67c64
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+
+/**
+ * Manages a thread pool for writing operations, adjusting the pool size based 
on CPU utilization.
+ */
+public final class WriteThreadPoolSizeManager implements Closeable {
+
+  /* Maximum allowed size for the thread pool. */
+  private final int maxThreadPoolSize;
+  /* Executor for periodically monitoring CPU usage. */
+  private final ScheduledExecutorService cpuMonitorExecutor;
+  /* Thread pool whose size is dynamically managed. */
+  private volatile ExecutorService boundedThreadPool;
+  /* Lock to ensure thread-safe updates to the thread pool. */
+  private final Lock lock = new ReentrantLock();
+  /* New computed max size for the thread pool after adjustment. */
+  private volatile int newMaxPoolSize;
+  /* Logger instance for logging events from WriteThreadPoolSizeManager. */
+  private static final Logger LOG = LoggerFactory.getLogger(
+      WriteThreadPoolSizeManager.class);
+  /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
+  private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
+      POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
+  /* Name of the filesystem associated with this manager. */
+  private final String filesystemName;
+  /* Initial size for the thread pool when created. */
+  private final int initialPoolSize;
+  /* Initially available heap memory. */
+  private final long initialAvailableHeapMemory;
+  /* The configuration instance. */
+  private final AbfsConfiguration abfsConfiguration;
+
+  /**
+   * Private constructor to initialize the write thread pool and CPU monitor 
executor
+   * based on system resources and ABFS configuration.
+   *
+   * @param filesystemName       Name of the ABFS filesystem.
+   * @param abfsConfiguration    Configuration containing pool size parameters.
+   */
+  private WriteThreadPoolSizeManager(String filesystemName,
+      AbfsConfiguration abfsConfiguration) {
+    this.filesystemName = filesystemName;
+    this.abfsConfiguration = abfsConfiguration;
+    int availableProcessors = Runtime.getRuntime().availableProcessors();
+    /* Get the heap space available when the instance is created */
+    this.initialAvailableHeapMemory = getAvailableHeapMemory();
+    /* Compute the max pool size */
+    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, 
initialAvailableHeapMemory);
+
+    /* Get the initial pool size from config, fallback to at least 1 */
+    this.initialPoolSize = Math.max(1,
+        abfsConfiguration.getWriteConcurrentRequestCount());
+
+    /* Set the upper bound for the thread pool size */
+    this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
+    AtomicInteger threadCount = new AtomicInteger(1);
+    this.boundedThreadPool = Executors.newFixedThreadPool(
+        initialPoolSize,
+        r -> {
+          Thread t = new Thread(r);
+          t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
+          return t;
+        }
+    );
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
+    executor.setKeepAliveTime(
+        abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+    executor.allowCoreThreadTimeOut(true);
+    /* Create a scheduled executor for CPU monitoring and pool adjustment */
+    this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+  }
+
+  /** Returns the internal {@link AbfsConfiguration}. */
+  private AbfsConfiguration getAbfsConfiguration() {
+    return abfsConfiguration;
+  }
+
+  /**
+   * Computes the maximum thread pool size based on the available processors
+   * and the initial available heap memory. The calculation uses a tiered
+   * multiplier derived from the memory-to-core ratio — systems with higher
+   * memory per core allow for a larger thread pool.
+   *
+   * @param availableProcessors the number of available CPU cores.
+   * @param initialAvailableHeapMemory the initial available heap memory, in 
bytes or GB (depending on implementation).
+   * @return the computed maximum thread pool size.
+   */
+  private int getComputedMaxPoolSize(final int availableProcessors, long 
initialAvailableHeapMemory) {
+    int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, 
availableProcessors);
+    LOG.debug("Computed max thread pool size: {} | Available processors: {} | 
Heap memory (GB): {}",
+        maxpoolSize, availableProcessors, initialAvailableHeapMemory);
+    return maxpoolSize;
+  }
+
+  /**
+   * Calculates the available heap memory in gigabytes.
+   * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap 
memory
+   * allowed for the JVM and subtracts the currently used memory (total - free)
+   * to determine how much heap memory is still available.
+   * The result is rounded up to the nearest gigabyte.
+   *
+   * @return the available heap memory in gigabytes
+   */
+  private long getAvailableHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
+    return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+  }
+
+  /**
+   * Returns aggressive thread count = CPU cores × multiplier based on heap 
tier.
+   */
+  private int getMemoryTierMaxThreads(long availableHeapGB, int 
availableProcessors) {
+    int multiplier;
+    if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
+      multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
+    } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
+      multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
+    } else {
+      multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
+    }
+    return availableProcessors * multiplier;
+  }
+
+  /**
+   * Returns the singleton instance of WriteThreadPoolSizeManager for the 
given filesystem.
+   *
+   * @param filesystemName the name of the filesystem.
+   * @param abfsConfiguration the configuration for the ABFS.
+   *
+   * @return the singleton instance.
+   */
+  public static synchronized WriteThreadPoolSizeManager getInstance(
+      String filesystemName, AbfsConfiguration abfsConfiguration) {
+    /* Check if an instance already exists in the map for the given filesystem 
*/
+    WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
+        filesystemName);
+
+    /* If an existing instance is found, return it */
+    if (existingInstance != null && existingInstance.boundedThreadPool != null
+        && !existingInstance.boundedThreadPool.isShutdown()) {
+      return existingInstance;
+    }
+
+    /* Otherwise, create a new instance, put it in the map, and return it */
+    LOG.debug(
+        "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
+        filesystemName);
+    WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
+        filesystemName, abfsConfiguration);
+    POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
+    return newInstance;
+  }
+
+  /**
+   * Adjusts the thread pool size to the specified maximum pool size.
+   *
+   * @param newMaxPoolSize the new maximum pool size.
+   */
+  private void adjustThreadPoolSize(int newMaxPoolSize) {
+    synchronized (this) {
+      ThreadPoolExecutor threadPoolExecutor
+          = ((ThreadPoolExecutor) boundedThreadPool);
+      int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
+
+      if (newMaxPoolSize >= currentCorePoolSize) {
+        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+      } else {
+        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+      }
+      LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size: 
{}, Active threads: {}",
+          newMaxPoolSize, threadPoolExecutor.getPoolSize(), 
threadPoolExecutor.getActiveCount());
+    }
+  }
+
+  /**
+   * Starts monitoring the CPU utilization and adjusts the thread pool size 
accordingly.
+   */
+  synchronized void startCPUMonitoring() {
+    cpuMonitorExecutor.scheduleAtFixedRate(() -> {
+      double cpuUtilization = getCpuUtilization();
+      LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
+      try {
+        adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(String.format(
+            "Thread pool size adjustment interrupted for filesystem %s",
+            filesystemName), e);
+      }
+    }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), 
TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Gets the current system CPU utilization.
+   *
+   * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if 
unavailable.
+   */
+  private double getCpuUtilization() {
+    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+        OperatingSystemMXBean.class);
+    double cpuLoad = osBean.getSystemCpuLoad();
+    if (cpuLoad < 0) {
+      LOG.warn("System CPU load value unavailable (returned -1.0). Defaulting 
to 0.0.");
+      return 0.0;
+    }
+    return cpuLoad;
+  }
+
+  /**
+   * Dynamically adjusts the thread pool size based on current CPU utilization
+   * and available heap memory relative to the initially available heap.
+   *
+   * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
+   * @throws InterruptedException if thread locking is interrupted
+   */
+  public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws 
InterruptedException {
+    lock.lock();
+    try {
+      ThreadPoolExecutor executor = (ThreadPoolExecutor) 
this.boundedThreadPool;
+      int currentPoolSize = executor.getMaximumPoolSize();
+      long currentHeap = getAvailableHeapMemory();
+      long initialHeap = initialAvailableHeapMemory;
+      LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB", 
currentHeap, initialHeap);
+      LOG.debug("Current CPU Utilization: {}", cpuUtilization);
+
+      if (cpuUtilization > 
(abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
+        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, 
currentHeap, initialHeap);
+      } else if (cpuUtilization > 
(abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
+        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, 
currentHeap, initialHeap);
+      } else if (cpuUtilization < 
(abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
+        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, 
currentHeap, initialHeap);
+      } else {
+        newMaxPoolSize = currentPoolSize;
+        LOG.debug("CPU load normal ({}). No change: current={}", 
cpuUtilization, currentPoolSize);
+      }
+      if (newMaxPoolSize != currentPoolSize) {
+        LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, 
newMaxPoolSize);
+        adjustThreadPoolSize(newMaxPoolSize);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Calculates reduced pool size under high CPU utilization.
+   */
+  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long 
currentHeap, long initialHeap) {
+    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+      LOG.debug("High CPU & low heap. Aggressively reducing: current={}, 
new={}",
+          currentPoolSize, currentPoolSize / 
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+      return Math.max(initialPoolSize, currentPoolSize / 
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+    }
+    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize 
/ HIGH_CPU_REDUCTION_FACTOR);
+    LOG.debug("High CPU ({}). Reducing pool size moderately: current={}, 
new={}",
+        abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize, 
reduced);
+    return reduced;
+  }
+
+  /**
+   * Calculates reduced pool size under medium CPU utilization.
+   */
+  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long 
currentHeap, long initialHeap) {
+    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+      int reduced = Math.max(initialPoolSize, currentPoolSize - 
currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+      LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}", 
currentPoolSize, reduced);
+      return reduced;
+    }
+    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize 
/ MEDIUM_CPU_REDUCTION_FACTOR);
+    LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
+        abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize, 
reduced);
+    return reduced;
+  }
+
+  /**
+   * Calculates increased pool size under low CPU utilization.
+   */
+  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long 
currentHeap, long initialHeap) {
+    if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) {
+      int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * 
LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
+      LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}", 
currentPoolSize, increased);
+      return increased;
+    } else {
+      // Decrease by 10%
+      int decreased = Math.max(1, (int) (currentPoolSize * 
LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
+      LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing: 
current={}, new={}", currentHeap, currentPoolSize, decreased);
+      return decreased;
+    }
+  }
+
+
+  /**
+   * Returns the executor service for the thread pool.
+   *
+   * @return the executor service.
+   */
+  public ExecutorService getExecutorService() {
+    return boundedThreadPool;
+  }
+
+  /**
+   * Returns the scheduled executor responsible for CPU monitoring and dynamic 
pool adjustment.
+   *
+   * @return the {@link ScheduledExecutorService} used for CPU monitoring.
+   */
+  public ScheduledExecutorService getCpuMonitorExecutor() {
+    return cpuMonitorExecutor;
+  }
+
+  /**
+   * Closes this manager by shutting down executors and cleaning up resources.
+   * Removes the instance from the active manager map.
+   *
+   * @throws IOException if an error occurs during shutdown.
+   */
+  @Override
+  public void close() throws IOException {
+    synchronized (this) {
+      try {
+        // Shutdown CPU monitor
+        if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
+          cpuMonitorExecutor.shutdown();
+        }
+        // Gracefully shutdown the bounded thread pool
+        if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
+          boundedThreadPool.shutdown();
+          if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS, 
TimeUnit.SECONDS)) {
+            LOG.warn("Bounded thread pool did not terminate in time, forcing 
shutdownNow for filesystem: {}", filesystemName);
+            boundedThreadPool.shutdownNow();
+          }
+          boundedThreadPool = null;
+        }
+        // Remove from the map
+        POOL_SIZE_MANAGER_MAP.remove(filesystemName);
+        LOG.debug("Closed and removed instance for filesystem: {}", 
filesystemName);
+      } catch (Exception e) {
+        LOG.warn("Failed to properly close instance for filesystem: {}", 
filesystemName, e);
+      }
+    }
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index fe4991c9582..a751101cf57 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -173,6 +173,8 @@ public final class AbfsHttpConstants {
   public static final char CHAR_EQUALS = '=';
   public static final char CHAR_STAR = '*';
   public static final char CHAR_PLUS = '+';
+  public static final int LOW_HEAP_SPACE_FACTOR = 4;
+  public static final double MEDIUM_HEAP_SPACE_FACTOR = 8;
 
   public static final int SPLIT_NO_LIMIT = -1;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 9afb37e35c7..899e96dadc1 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -459,6 +459,34 @@ public static String containerProperty(String property, 
String fsName, String ac
   public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = 
"fs.azure.blob.dir.rename.max.thread";
   /**Maximum number of thread per blob-delete orchestration: {@value}*/
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = 
"fs.azure.blob.dir.delete.max.thread";
+
+  /** Configuration key for the keep-alive time (ms) for the write thread 
pool. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS 
= "fs.azure.write.threadpool.keep.alive.time.millis";
+
+  /** Configuration key for the CPU monitoring interval (ms) during write 
operations. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 
"fs.azure.write.cpu.monitoring.interval.millis";
+
+  /** Configuration key to enable or disable dynamic write thread pool 
adjustment. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = 
"fs.azure.write.dynamic.threadpool.enablement";
+
+  /** Configuration key for the high CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.high.cpu.threshold.percent";
+
+  /** Configuration key for the medium CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.medium.cpu.threshold.percent";
+
+  /** Configuration key for the low CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.low.cpu.threshold.percent";
+
+  /** Configuration key for the low-tier memory multiplier for write 
workloads. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.low.tier.memory.multiplier";
+
+  /** Configuration key for the medium-tier memory multiplier for write 
workloads. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.medium.tier.memory.multiplier";
+
+  /** Configuration key for the high-tier memory multiplier for write 
workloads. Value: {@value}. */
+  public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.high.tier.memory.multiplier";
+
   /**Flag to enable/disable sending client transactional ID during 
create/rename operations: {@value}*/
   public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
"fs.azure.enable.client.transaction.id";
   /**Flag to enable/disable create idempotency during create operation: 
{@value}*/
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 83f636bf1d1..ea77f9d874a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -36,7 +36,25 @@ public final class FileSystemConfigurations {
   public static final boolean 
DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
   public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
 
-  private static final int SIXTY_SECONDS = 60_000;
+  public static final int SIXTY_SECONDS = 60;
+  public static final int THIRTY_SECONDS = 30;
+  /**
+   * Number of bytes in a gigabyte.
+   */
+  public static final long BYTES_PER_GIGABYTE = 1024L * 1024 * 1024;
+  /**
+   * Factor by which the pool size is increased when CPU utilization is low.
+   */
+  public static final double LOW_CPU_POOL_SIZE_INCREASE_FACTOR = 1.5;
+  public static final double LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR = 0.9;
+  public static final int HIGH_CPU_REDUCTION_FACTOR = 3;
+  public static final int HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR = 2;
+  public static final int MEDIUM_CPU_REDUCTION_FACTOR = 5;
+  public static final int MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR = 3;
+  public static final int HIGH_MEDIUM_HEAP_FACTOR = 2;
+  public static final double LOW_CPU_HEAP_FACTOR = 0.8;
+
+
 
   // Retry parameter defaults.
   public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 500;  // 500ms
@@ -73,7 +91,7 @@ public final class FileSystemConfigurations {
   public static final int ONE_KB = 1024;
   public static final int ONE_MB = ONE_KB * ONE_KB;
 
-  // Default upload and download buffer size
+  /** Default buffer sizes and optimization flags. */
   public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB;  // 8 MB
   public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB;  // 4 
MB
   public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION = 
false;
@@ -217,6 +235,7 @@ public final class FileSystemConfigurations {
 
   public static final int ZERO = 0;
   public static final int HUNDRED = 100;
+  public static final double HUNDRED_D = 100.0;
   public static final long THOUSAND = 1000L;
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
@@ -262,6 +281,106 @@ public final class FileSystemConfigurations {
 
   public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 
DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
 
+  /**
+   * Whether dynamic write thread pool adjustment is enabled by default.
+   */
+  public static final boolean DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = 
false;
+
+  /**
+   * Default keep-alive time (in milliseconds) for write thread pool threads.
+   */
+  public static final int DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS = 
30_000;
+
+  /**
+   * Minimum interval (in milliseconds) for CPU monitoring during write 
operations.
+   */
+  public static final int MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 10_000;
+
+  /**
+   * Maximum interval (in milliseconds) for CPU monitoring during write 
operations.
+   */
+  public static final int MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 60_000;
+
+  /**
+   * Default interval (in milliseconds) for CPU monitoring during write 
operations.
+   */
+  public static final int DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 
15_000;
+
+  /**
+   * Minimum CPU utilization percentage considered as high threshold for write 
scaling.
+   */
+  public static final int MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 65;
+
+  /**
+   * Maximum CPU utilization percentage considered as high threshold for write 
scaling.
+   */
+  public static final int MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 90;
+
+  /**
+   * Default CPU utilization percentage considered as high threshold for write 
scaling.
+   */
+  public static final int DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 80;
+
+  /**
+   * Minimum CPU utilization percentage considered as medium threshold for 
write scaling.
+   */
+  public static final int MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 45;
+
+  /**
+   * Maximum CPU utilization percentage considered as medium threshold for 
write scaling.
+   */
+  public static final int MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 65;
+
+  /**
+   * Default CPU utilization percentage considered as medium threshold for 
write scaling.
+   */
+  public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
+
+  /**
+   * Minimum CPU utilization percentage considered as low threshold for write 
scaling.
+   */
+  public static final int MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT = 10;
+
+  /**
+   * Maximum CPU utilization percentage considered as low threshold for write 
scaling.
+   */
+  public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
+
+  /**
+   * Default CPU utilization percentage considered as low threshold for write 
scaling.
+   */
+  public static final int DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT = 35;
+
+  /**
+   * Minimum multiplier applied to available memory for low-tier write 
workloads.
+   */
+  public static final int MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 3;
+
+  /**
+   * Default multiplier applied to available memory for low-tier write 
workloads.
+   */
+  public static final int DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 4;
+
+  /**
+   * Minimum multiplier applied to available memory for medium-tier write 
workloads.
+   */
+  public static final int MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
+
+  /**
+   * Default multiplier applied to available memory for medium-tier write 
workloads.
+   */
+  public static final int DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 8;
+
+  /**
+   * Minimum multiplier applied to available memory for high-tier write 
workloads.
+   */
+  public static final int MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 12;
+
+  /**
+   * Default multiplier applied to available memory for high-tier write 
workloads.
+   */
+  public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
+
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
true;
 
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY 
= true;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
index 7b2d03d6923..1b201f3349d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
@@ -30,8 +30,14 @@
 @InterfaceStability.Evolving
 public class AbfsDriverException extends AbfsRestOperationException {
 
+  /** Default error message used when no inner exception is provided. */
   private static final String ERROR_MESSAGE = "Runtime Exception Occurred In 
ABFS Driver";
 
+  /**
+   * Constructs an {@code AbfsDriverException} with the specified inner 
exception.
+   *
+   * @param innerException the underlying exception that caused the failure
+   */
   public AbfsDriverException(final Exception innerException) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
@@ -42,6 +48,13 @@ public AbfsDriverException(final Exception innerException) {
         innerException);
   }
 
+  /**
+   * Constructs an {@code AbfsDriverException} with the specified inner 
exception
+   * and activity ID for correlation.
+   *
+   * @param innerException the underlying exception that caused the failure
+   * @param activityId the request or operation ID for traceability
+   */
   public AbfsDriverException(final Exception innerException, final String 
activityId) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
@@ -52,6 +65,13 @@ public AbfsDriverException(final Exception innerException, 
final String activity
         null);
   }
 
+  /**
+   * Constructs an {@code AbfsDriverException} with a custom error message and
+   * inner exception.
+   *
+   * @param errorMessage a custom error message describing the failure
+   * @param innerException the underlying exception that caused the failure
+   */
   public AbfsDriverException(final String errorMessage, final Exception 
innerException) {
     super(
         AzureServiceErrorCode.UNKNOWN.getStatusCode(),
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
index 7272c13297e..91eca509211 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
@@ -37,16 +37,29 @@ public class AbfsDelegationTokenIdentifier extends 
DelegationTokenIdentifier {
    */
   public static final Text TOKEN_KIND = new Text("ABFS delegation");
 
-  public AbfsDelegationTokenIdentifier(){
+  /** Creates an {@code AbfsDelegationTokenIdentifier} with the default ABFS 
token kind. */
+  public AbfsDelegationTokenIdentifier() {
     super(TOKEN_KIND);
   }
 
+  /**
+   * Creates an {@code AbfsDelegationTokenIdentifier} with the specified token 
kind.
+   *
+   * @param kind the token kind to use
+   */
   public AbfsDelegationTokenIdentifier(Text kind) {
     super(kind);
   }
 
-  public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
-      Text realUser) {
+  /**
+   * Creates an {@code AbfsDelegationTokenIdentifier} with the specified 
details.
+   *
+   * @param kind the token kind
+   * @param owner the token owner
+   * @param renewer the token renewer
+   * @param realUser the real user on whose behalf the token was issued
+   */
+  public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer, 
Text realUser) {
     super(kind, owner, renewer, realUser);
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
index 85d0434a687..c976fbd0e0c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
@@ -143,23 +143,38 @@ public Token<DelegationTokenIdentifier> 
getDelegationToken(
     return token;
   }
 
+  /** Renews the given delegation token through the configured token manager.
+   * @param token the delegation token to renew
+   * @return the new expiration time of the token
+   * @throws IOException if renewal fails
+   */
   public long renewDelegationToken(Token<?> token)
       throws IOException {
 
     return tokenManager.renewDelegationToken(token);
   }
 
+  /** Cancels the given delegation token through the configured token manager.
+   * @param token the delegation token to cancel
+   * @throws IOException if cancellation fails
+   */
   public void cancelDelegationToken(Token<?> token)
           throws IOException {
 
     tokenManager.cancelDelegationToken(token);
   }
 
+  /** Returns the current {@link CustomDelegationTokenManager} instance (for 
testing).
+   * @return the token manager instance
+   */
   @VisibleForTesting
   public CustomDelegationTokenManager getTokenManager() {
     return tokenManager;
   }
 
+  /** Returns a string representation of this token manager for debugging 
purposes.
+   * @return a string describing this instance
+   */
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index f574f4704ab..cd94ee1c8fd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -154,6 +154,17 @@
  */
 public class AbfsDfsClient extends AbfsClient {
 
+  /**
+   * Creates an {@code AbfsDfsClient} instance.
+   *
+   * @param baseUrl the base URL of the DFS endpoint
+   * @param sharedKeyCredentials the shared key credentials
+   * @param abfsConfiguration the ABFS configuration
+   * @param tokenProvider the access token provider for authentication
+   * @param encryptionContextProvider the encryption context provider
+   * @param abfsClientContext the ABFS client context
+   * @throws IOException if client initialization fails
+   */
   public AbfsDfsClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
@@ -164,6 +175,17 @@ public AbfsDfsClient(final URL baseUrl,
         encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
   }
 
+  /**
+   * Creates an {@code AbfsDfsClient} instance.
+   *
+   * @param baseUrl the base URL of the DFS endpoint
+   * @param sharedKeyCredentials the shared key credentials
+   * @param abfsConfiguration the ABFS configuration
+   * @param sasTokenProvider the SAS token provider
+   * @param encryptionContextProvider the encryption context provider
+   * @param abfsClientContext the ABFS client context
+   * @throws IOException if client initialization fails
+   */
   public AbfsDfsClient(final URL baseUrl,
       final SharedKeyCredentials sharedKeyCredentials,
       final AbfsConfiguration abfsConfiguration,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index f6272492d60..15ee4809911 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -64,15 +64,33 @@ public class AbfsInputStreamContext extends 
AbfsStreamContext {
 
   private ContextEncryptionAdapter contextEncryptionAdapter = null;
 
+  /**
+   * Constructs a new {@link AbfsInputStreamContext}.
+   *
+   * @param sasTokenRenewPeriodForStreamsInSeconds SAS token renewal interval 
in seconds.
+   */
   public AbfsInputStreamContext(final long 
sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
 
+  /**
+   * Sets the read buffer size.
+   *
+   * @param readBufferSize buffer size in bytes.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
     this.readBufferSize = readBufferSize;
     return this;
   }
 
+  /**
+   * Sets the read-ahead queue depth.
+   * Defaults to the number of available processors if negative.
+   *
+   * @param readAheadQueueDepth queue depth.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withReadAheadQueueDepth(
           final int readAheadQueueDepth) {
     this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
@@ -81,83 +99,169 @@ public AbfsInputStreamContext withReadAheadQueueDepth(
     return this;
   }
 
+  /**
+   * Enables or disables tolerance for out-of-band appends.
+   *
+   * @param tolerateOobAppends whether OOB appends should be tolerated.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withTolerateOobAppends(
           final boolean tolerateOobAppends) {
     this.tolerateOobAppends = tolerateOobAppends;
     return this;
   }
 
+  /**
+   * Enables or disables read-ahead feature.
+   *
+   * @param isReadAheadEnabled whether read-ahead is enabled.
+   * @return this instance.
+   */
   public AbfsInputStreamContext isReadAheadEnabled(
           final boolean isReadAheadEnabled) {
     this.isReadAheadEnabled = isReadAheadEnabled;
     return this;
   }
 
+  /**
+   * Enables or disables read-ahead version 2.
+   *
+   * @param isReadAheadV2Enabled whether read-ahead V2 is enabled.
+   * @return this instance.
+   */
   public AbfsInputStreamContext isReadAheadV2Enabled(
       final boolean isReadAheadV2Enabled) {
     this.isReadAheadV2Enabled = isReadAheadV2Enabled;
     return this;
   }
 
+  /**
+   * Sets the read-ahead range.
+   *
+   * @param readAheadRange range in bytes.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withReadAheadRange(
           final int readAheadRange) {
     this.readAheadRange = readAheadRange;
     return this;
   }
 
+  /**
+   * Sets stream statistics collector.
+   *
+   * @param streamStatistics statistics instance.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withStreamStatistics(
       final AbfsInputStreamStatistics streamStatistics) {
     this.streamStatistics = streamStatistics;
     return this;
   }
 
+  /**
+   * Enables or disables complete read of small files in a single operation.
+   *
+   * @param readSmallFilesCompletely whether small files should be fully read.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withReadSmallFilesCompletely(
       final boolean readSmallFilesCompletely) {
     this.readSmallFilesCompletely = readSmallFilesCompletely;
     return this;
   }
 
+  /**
+   * Enables or disables footer read optimization.
+   *
+   * @param optimizeFooterRead whether footer read optimization is enabled.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withOptimizeFooterRead(
       final boolean optimizeFooterRead) {
     this.optimizeFooterRead = optimizeFooterRead;
     return this;
   }
 
+  /**
+   * Sets the footer read buffer size.
+   *
+   * @param footerReadBufferSize size in bytes.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withFooterReadBufferSize(final int 
footerReadBufferSize) {
     this.footerReadBufferSize = footerReadBufferSize;
     return this;
   }
 
+  /**
+   * Forces use of the configured read buffer size always.
+   *
+   * @param alwaysReadBufferSize whether to always use configured buffer size.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withShouldReadBufferSizeAlways(
       final boolean alwaysReadBufferSize) {
     this.alwaysReadBufferSize = alwaysReadBufferSize;
     return this;
   }
 
+  /**
+   * Sets the read-ahead block size.
+   *
+   * @param readAheadBlockSize block size in bytes.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withReadAheadBlockSize(
       final int readAheadBlockSize) {
     this.readAheadBlockSize = readAheadBlockSize;
     return this;
   }
 
+  /**
+   * Enables or disables buffered positional reads.
+   *
+   * @param bufferedPreadDisabled whether buffered pread is disabled.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withBufferedPreadDisabled(
       final boolean bufferedPreadDisabled) {
     this.bufferedPreadDisabled = bufferedPreadDisabled;
     return this;
   }
 
+  /**
+   * Sets a back reference to the filesystem that created this stream.
+   *
+   * @param fsBackRef filesystem back reference.
+   * @return this instance.
+   */
   public AbfsInputStreamContext withAbfsBackRef(
       final BackReference fsBackRef) {
     this.fsBackRef = fsBackRef;
     return this;
   }
 
+  /**
+   * Sets the context encryption adapter.
+   *
+   * @param contextEncryptionAdapter encryption adapter.
+   * @return this instance.
+   */
     public AbfsInputStreamContext withEncryptionAdapter(
         ContextEncryptionAdapter contextEncryptionAdapter){
       this.contextEncryptionAdapter = contextEncryptionAdapter;
       return this;
     }
 
+  /**
+   * Finalizes and validates the context configuration.
+   * <p>
+   * Ensures read-ahead range is valid and aligns read-ahead block size with
+   * read request size if necessary.
+   *
+   * @return this instance.
+   */
   public AbfsInputStreamContext build() {
     if (readBufferSize > readAheadBlockSize) {
       LOG.debug(
@@ -173,62 +277,77 @@ public AbfsInputStreamContext build() {
     return this;
   }
 
+  /** @return configured read buffer size. */
   public int getReadBufferSize() {
     return readBufferSize;
   }
 
+  /** @return read-ahead queue depth. */
   public int getReadAheadQueueDepth() {
     return readAheadQueueDepth;
   }
 
+  /** @return whether out-of-band appends are tolerated. */
   public boolean isTolerateOobAppends() {
     return tolerateOobAppends;
   }
 
+  /** @return whether read-ahead is enabled. */
   public boolean isReadAheadEnabled() {
     return isReadAheadEnabled;
   }
 
+  /** @return whether read-ahead V2 is enabled. */
   public boolean isReadAheadV2Enabled() {
     return isReadAheadV2Enabled;
   }
 
+  /** @return read-ahead range. */
   public int getReadAheadRange() {
     return readAheadRange;
   }
 
+  /** @return stream statistics collector. */
   public AbfsInputStreamStatistics getStreamStatistics() {
     return streamStatistics;
   }
 
+  /** @return whether small files should be read completely. */
   public boolean readSmallFilesCompletely() {
     return this.readSmallFilesCompletely;
   }
 
+  /** @return whether footer read optimization is enabled. */
   public boolean optimizeFooterRead() {
     return this.optimizeFooterRead;
   }
 
+  /** @return footer read buffer size. */
   public int getFooterReadBufferSize() {
     return footerReadBufferSize;
   }
 
+  /** @return whether the configured buffer size is always used. */
   public boolean shouldReadBufferSizeAlways() {
     return alwaysReadBufferSize;
   }
 
+  /** @return read-ahead block size. */
   public int getReadAheadBlockSize() {
     return readAheadBlockSize;
   }
 
+  /** @return whether buffered pread is disabled. */
   public boolean isBufferedPreadDisabled() {
     return bufferedPreadDisabled;
   }
 
+  /** @return filesystem back reference. */
   public BackReference getFsBackRef() {
     return fsBackRef;
   }
 
+  /** @return context encryption adapter. */
     public ContextEncryptionAdapter getEncryptionAdapter() {
       return contextEncryptionAdapter;
     }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
new file mode 100644
index 00000000000..3d4c9aa48e8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -0,0 +1,770 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
+
+  private AbfsConfiguration mockConfig;
+  private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
+  private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+  private static final int THREAD_SLEEP_DURATION_MS = 200;
+  private static final String TEST_FILE_PATH = "testFilePath";
+  private static final String TEST_DIR_PATH = "testDirPath";
+  private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
+  private static final int CONCURRENT_REQUEST_COUNT = 15;
+  private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10;
+  private static final int LOW_TIER_MEMORY_MULTIPLIER = 4;
+  private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
+  private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8;
+  private static final int HIGH_CPU_THRESHOLD = 15;
+  private static final int MEDIUM_CPU_THRESHOLD = 10;
+  private static final int LOW_CPU_THRESHOLD = 5;
+  private static final int CPU_MONITORING_INTERVAL = 15;
+  private static final int WAIT_DURATION_MS = 3000;
+  private static final int LATCH_TIMEOUT_SECONDS = 60;
+  private static final int RESIZE_WAIT_TIME_MS = 6_000;
+  private static final double HIGH_CPU_USAGE_RATIO = 0.95;
+  private static final double LOW_CPU_USAGE_RATIO = 0.05;
+  private static final int SLEEP_DURATION_MS = 150;
+  private static final int AWAIT_TIMEOUT_SECONDS = 45;
+  private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
+  private static final int WAIT_TIMEOUT_MS = 5000;
+  private static final int SLEEP_DURATION_30S_MS = 30000;
+  private static final int SMALL_PAUSE_MS = 50;
+  private static final int BURST_LOAD = 50;
+  private static final long LOAD_SLEEP_DURATION_MS = 2000;
+
+  TestWriteThreadPoolSizeManager() throws Exception {
+    super.setup();
+  }
+
+  /**
+   * Common setup to prepare a mock configuration for each test.
+   */
+  @BeforeEach
+  public void setUp() {
+    mockConfig = mock(AbfsConfiguration.class);
+    
when(mockConfig.getWriteConcurrentRequestCount()).thenReturn(CONCURRENT_REQUEST_COUNT);
+    
when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME);
+    
when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER);
+    
when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER);
+    
when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER);
+    when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD);
+    
when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
+    when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
+    
when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
+  }
+
+  /**
+   * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String, 
AbfsConfiguration)} returns a singleton per key.
+   */
+  @Test
+  void testGetInstanceReturnsSingleton() {
+    WriteThreadPoolSizeManager instance1
+        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+    WriteThreadPoolSizeManager instance2
+        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+    WriteThreadPoolSizeManager instance3 =
+        WriteThreadPoolSizeManager.getInstance("newFs", mockConfig);
+    Assertions.assertThat(instance1)
+        .as("Expected the same singleton instance for the same key")
+        .isSameAs(instance2);
+    Assertions.assertThat(instance1)
+        .as("Expected the same singleton instance for the same key")
+        .isNotSameAs(instance3);
+  }
+
+  /**
+   /**
+   * Tests that high CPU usage results in thread pool downscaling.
+   */
+  @Test
+  void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, 
IOException {
+    // Get the executor service (ThreadPoolExecutor)
+    WriteThreadPoolSizeManager instance
+        = WriteThreadPoolSizeManager.getInstance("testfsHigh",
+        getAbfsStore(getFileSystem()).getAbfsConfiguration());
+    ExecutorService executor = instance.getExecutorService();
+    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+
+    // Simulate high CPU usage (e.g., 95% CPU utilization)
+    int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
+    instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD);  
// High CPU
+
+    // Get the new maximum pool size after adjustment
+    int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
+
+    // Assert that the pool size has decreased or is equal to initial PoolSize 
based on high CPU usage
+    Assertions.assertThat(newMaxSize)
+        .as("Expected pool size to decrease under high CPU usage")
+        .isLessThanOrEqualTo(initialMaxSize);
+    instance.close();
+  }
+
+  /**
+   * Tests that low CPU usage results in thread pool upscaling or remains the 
same.
+   */
+  @Test
+  void testAdjustThreadPoolSizeBasedOnLowCPU()
+      throws InterruptedException, IOException {
+    WriteThreadPoolSizeManager instance
+        = WriteThreadPoolSizeManager.getInstance("testfsLow",
+        getAbfsStore(getFileSystem()).getAbfsConfiguration());
+    ExecutorService executor = instance.getExecutorService();
+    int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+    instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); // 
Low CPU
+    int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+    Assertions.assertThat(newSize)
+        .as("Expected pool size to increase or stay the same under low CPU 
usage")
+        .isGreaterThanOrEqualTo(initialSize);
+    instance.close();
+  }
+
+
+  /**
+   * Confirms that the thread pool executor is initialized and not shut down.
+   */
+  @Test
+  void testExecutorServiceIsNotNull() throws IOException {
+    WriteThreadPoolSizeManager instance
+        = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig);
+    ExecutorService executor = instance.getExecutorService();
+    Assertions.assertThat(executor).as("Executor service should be 
initialized")
+        .isNotNull();
+    Assertions.assertThat(executor.isShutdown())
+        .as("Executor service should not be shut down")
+        .isFalse();
+    instance.close();
+  }
+
+
+  /**
+   * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up 
resources.
+   */
+  @Test
+  void testCloseCleansUp() throws Exception {
+    WriteThreadPoolSizeManager instance
+        = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig);
+    ExecutorService executor = instance.getExecutorService();
+    instance.close();
+    Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
+        .as("Executor service should be shut down or terminated after close()")
+        .isTrue();
+  }
+
+  /**
+   * Test that the CPU monitoring task is scheduled properly when 
startCPUMonitoring() is called.
+   * This test checks the following:
+   * 1. That the CPU monitoring task gets scheduled by verifying that the CPU 
monitor executor is not null.
+   * 2. Ensures that the thread pool executor has at least one thread running, 
confirming that the task is being executed.
+   * @throws InterruptedException if the test is interrupted during the sleep 
time
+   */
+  @Test
+  void testStartCPUMonitoringSchedulesTask()
+      throws InterruptedException, IOException {
+    // Create a new instance of WriteThreadPoolSizeManager using a mock 
configuration
+    WriteThreadPoolSizeManager instance
+        = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig);
+
+    // Call startCPUMonitoring to schedule the monitoring task
+    instance.startCPUMonitoring();
+
+    // Wait for a short period to allow the task to run and be scheduled
+    Thread.sleep(THREAD_SLEEP_DURATION_MS);
+
+    // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from 
the instance
+    ScheduledThreadPoolExecutor monitor
+        = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor();
+
+    // Assert that the monitor executor is not null, ensuring that it was 
properly initialized
+    Assertions.assertThat(monitor)
+        .as("CPU Monitor Executor should not be null")
+        .isNotNull();
+
+    // Assert that the thread pool size is greater than 0, confirming that the 
task has been scheduled and threads are active
+    Assertions.assertThat(monitor.getPoolSize())
+        .as("Thread pool size should be greater than 0, indicating that the 
task is running")
+        .isGreaterThan(ZERO);
+    instance.close();
+  }
+
+  /**
+   * Verifies that ABFS write tasks can complete successfully even when the 
system
+   * is under artificial CPU stress. The test also ensures that the write 
thread
+   * pool resizes dynamically under load without leading to starvation, 
overload,
+   * or leftover work in the queue.
+   */
+  @Test
+  void testABFSWritesUnderCPUStress() throws Exception {
+    // Initialize the filesystem and thread pool manager
+    AzureBlobFileSystem fs = getFileSystem();
+    WriteThreadPoolSizeManager instance =
+        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), 
getConfiguration());
+    ThreadPoolExecutor executor =
+        (ThreadPoolExecutor) instance.getExecutorService();
+
+    // Start CPU monitoring so pool size adjustments happen in response to load
+    instance.startCPUMonitoring();
+
+    // Launch a background thread that generates CPU stress for ~3 seconds.
+    // This simulates contention on the system and should cause the pool to 
adjust.
+    Thread stressThread = new Thread(() -> {
+      long end = System.currentTimeMillis() + WAIT_DURATION_MS;
+      while (System.currentTimeMillis() < end) {
+        // Busy-work loop: repeatedly compute random powers to waste CPU cycles
+        double waste = Math.pow(Math.random(), Math.random());
+      }
+    });
+    stressThread.start();
+
+    // Prepare the ABFS write workload with multiple concurrent tasks
+    int taskCount = 10;
+    CountDownLatch latch = new CountDownLatch(taskCount);
+    Path testFile = new Path(TEST_FILE_PATH);
+    final byte[] b = new byte[TEST_FILE_LENGTH];
+    new Random().nextBytes(b);
+
+    // Submit 10 tasks, each writing to its own file to simulate parallel load
+    for (int i = 0; i < taskCount; i++) {
+      int finalI = i;
+      executor.submit(() -> {
+        try (FSDataOutputStream out = fs.create(
+            new Path(testFile + "_" + finalI), true)) {
+          for (int j = 0; j < 5; j++) {
+            out.write(b); // perform multiple writes to add sustained pressure
+          }
+          out.hflush();   // flush to force actual I/O
+        } catch (IOException e) {
+          // Any failure here indicates pool misbehavior or I/O issues
+          Assertions.fail("Write task failed with exception", e);
+        } finally {
+          // Mark this task as complete
+          latch.countDown();
+        }
+      });
+    }
+
+    // Wait for all tasks to finish (up to 60s timeout to guard against 
deadlock/starvation)
+    boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+    // Record the pool size after CPU stress to confirm resizing took place
+    int resizedPoolSize = executor.getMaximumPoolSize();
+
+    // 1. All tasks must finish within timeout → proves no starvation or 
deadlock
+    Assertions.assertThat(finished)
+        .as("All ABFS write tasks should complete without starvation")
+        .isTrue();
+
+    // 2. Pool size must fall within valid bounds → proves resizing occurred
+    Assertions.assertThat(resizedPoolSize)
+        .as("Thread pool size should dynamically adjust under CPU stress")
+        .isBetween(1, 
getAbfsStore(fs).getAbfsConfiguration().getWriteConcurrentRequestCount());
+
+    // 3. Task queue must be empty → proves no backlog remains after workload
+    Assertions.assertThat(executor.getQueue().size())
+        .as("No backlog should remain in task queue after completion")
+        .isEqualTo(0);
+
+    // Cleanup resources
+    instance.close();
+  }
+
+
+  /**
+   * Ensures that dynamic thread pool resizing during an active ABFS write 
workload
+   * does not cause deadlocks, task loss, or task duplication. The test also 
verifies
+   * that the pool resizes while work is in progress and that the executor 
queue
+   * eventually drains cleanly.
+   */
+  @Test
+  void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
+    // Initialize filesystem and thread pool manager
+    AzureBlobFileSystem fs = getFileSystem();
+    WriteThreadPoolSizeManager mgr =
+        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), 
mockConfig);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
mgr.getExecutorService();
+
+    // Enable monitoring (may not be required if adjust() is triggered 
internally)
+    mgr.startCPUMonitoring();
+
+    // Test configuration: enough tasks and writes to stress the pool
+    final int taskCount = 10;
+    final int writesPerTask = 5;
+    final byte[] b = new byte[TEST_FILE_LENGTH];
+    new Random().nextBytes(b);
+    final Path base = new Path(TEST_DIR_PATH);
+    fs.mkdirs(base);
+
+    // Barrier ensures all tasks start together, so resizing happens mid-flight
+    final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1);
+    final CountDownLatch done = new CountDownLatch(taskCount);
+
+    // Track execution results
+    final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); // 
mark tasks once
+    final AtomicInteger duplicates = new AtomicInteger(0);                  // 
guard against double-completion
+    final AtomicInteger rejected = new AtomicInteger(0);                    // 
count unexpected rejections
+
+    // Submit ABFS write tasks
+    for (int i = 0; i < taskCount; i++) {
+      final int id = i;
+      try {
+        executor.submit(() -> {
+          try {
+            // Hold until all tasks are enqueued, then start together
+            startBarrier.await(10, TimeUnit.SECONDS);
+
+            // Each task writes to its own file, flushing intermittently
+            Path subPath = new Path(base, "part-" + id);
+            try (FSDataOutputStream out = fs.create(subPath)) {
+              for (int w = 0; w < writesPerTask; w++) {
+                out.write(b);
+                if ((w & 1) == 1) {
+                  out.hflush(); // force some syncs to increase contention
+                }
+              }
+              out.hflush();
+            }
+
+            // Mark task as completed once; duplicates flag if it happens again
+            if (!completed.compareAndSet(id, 0, 1)) {
+              duplicates.incrementAndGet();
+            }
+          } catch (Exception e) {
+            Assertions.fail("ABFS write task " + id + " failed", e);
+          } finally {
+            done.countDown();
+          }
+        });
+      } catch (RejectedExecutionException rex) {
+        rejected.incrementAndGet();
+      }
+    }
+
+    // Thread that simulates fluctuating CPU load while tasks are running
+    final AtomicInteger observedMinMax = new 
AtomicInteger(executor.getMaximumPoolSize());
+    final AtomicInteger observedMaxMax = new 
AtomicInteger(executor.getMaximumPoolSize());
+
+    Thread resizer = new Thread(() -> {
+      try {
+        // Release worker tasks
+        startBarrier.await(10, TimeUnit.SECONDS);
+
+        long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep 
resizing for ~6s
+        boolean high = true;
+        while (System.currentTimeMillis() < end) {
+          // Alternate between high load (shrink) and low load (expand)
+          if (high) {
+            mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO);
+          } else {
+            mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO);
+          }
+          high = !high;
+
+          // Track observed pool size bounds to prove resizing occurred
+          int cur = executor.getMaximumPoolSize();
+          observedMinMax.updateAndGet(prev -> Math.min(prev, cur));
+          observedMaxMax.updateAndGet(prev -> Math.max(prev, cur));
+
+          Thread.sleep(SLEEP_DURATION_MS);
+        }
+      } catch (Exception ignore) {
+        // No-op: this is best-effort simulation
+      }
+    }, "resizer-thread");
+
+    resizer.start();
+
+    // Wait for all tasks to finish (ensures no deadlock)
+    boolean finished = done.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+    // Join resizer thread
+    resizer.join(RESIZER_JOIN_TIMEOUT_MS);
+
+    // 1. All tasks must complete in time → proves there are no deadlocks
+    Assertions.assertThat(finished)
+        .as("All tasks must complete within timeout (no deadlock)")
+        .isTrue();
+
+    // 2. Every task should complete exactly once → proves no task loss
+    int completedCount = 0;
+    for (int i = 0; i < taskCount; i++) {
+      completedCount += completed.get(i);
+    }
+    Assertions.assertThat(completedCount)
+        .as("Every task should complete exactly once (no task loss)")
+        .isEqualTo(taskCount);
+
+    // 3. No task should mark itself as done more than once → proves no 
duplication
+    Assertions.assertThat(duplicates.get())
+        .as("No task should report completion more than once (no duplication)")
+        .isZero();
+
+    // 4. The executor should not reject tasks while resizing is happening
+    Assertions.assertThat(rejected.get())
+        .as("Tasks should not be rejected during active resizing")
+        .isZero();
+
+    // 5. Executor queue should eventually empty once all tasks finish
+    Assertions.assertThat(executor.getQueue().size())
+        .as("Executor queue should drain after workload")
+        .isEqualTo(0);
+
+    // 6. Executor should still be running after workload until explicitly 
closed
+    Assertions.assertThat(executor.isShutdown())
+        .as("Executor should remain running until manager.close()")
+        .isFalse();
+
+    // 7. Verify that resizing actually occurred (pool max both grew and 
shrank)
+    int minObserved = observedMinMax.get();
+    int maxObserved = observedMaxMax.get();
+
+    Assertions.assertThat(maxObserved)
+        .as("Pool maximum size should have increased or fluctuated above 
baseline")
+        .isGreaterThan(0);
+
+    Assertions.assertThat(minObserved)
+        .as("Pool maximum size should have dropped during resizing")
+        .isLessThanOrEqualTo(maxObserved);
+
+    // Cleanup
+    for (int i = 0; i < taskCount; i++) {
+      Path p = new Path(base, "part-" + i);
+      try {
+        fs.delete(p, false);
+      } catch (IOException ignore) {
+        // Ignored: delete failures are non-fatal for test cleanup
+      }
+    }
+    try {
+      fs.delete(base, true);
+    } catch (IOException ignore) {
+      // Ignored: cleanup failures are non-fatal in tests
+    }
+    mgr.close();
+  }
+
+
+
+  /**
+   * Verifies that when the system experiences high CPU usage,
+   * the WriteThreadPoolSizeManager detects the load and reduces
+   * the maximum thread pool size accordingly.
+   */
+  @Test
+  void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
+    // Initialize filesystem and thread pool manager
+    try (FileSystem fileSystem = 
FileSystem.newInstance(getRawConfiguration())) {
+      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+      WriteThreadPoolSizeManager instance =
+          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(), 
getConfiguration());
+      ThreadPoolExecutor executor =
+          (ThreadPoolExecutor) instance.getExecutorService();
+
+      // Start monitoring CPU load
+      instance.startCPUMonitoring();
+
+      // Capture baseline pool size for comparison later
+      int initialMax = executor.getMaximumPoolSize();
+
+      // Define a CPU-bound task: tight loop of math ops for ~5s
+      Runnable cpuBurn = () -> {
+        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
+        while (System.currentTimeMillis() < end) {
+          double waste = Math.sin(Math.random()) * Math.cos(Math.random());
+        }
+      };
+
+      // Launch two CPU hogs in parallel
+      Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1");
+      Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2");
+      cpuHog1.start();
+      cpuHog2.start();
+
+      // Submit multiple write tasks while CPU is under stress
+      int taskCount = 10;
+      CountDownLatch latch = new CountDownLatch(taskCount);
+      Path base = new Path(TEST_DIR_PATH);
+      abfs.mkdirs(base);
+      final byte[] buffer = new byte[TEST_FILE_LENGTH];
+      new Random().nextBytes(buffer);
+
+      for (int i = 0; i < taskCount; i++) {
+        final Path part = new Path(base, "part-" + i);
+        executor.submit(() -> {
+          try (FSDataOutputStream out = abfs.create(part, true)) {
+            for (int j = 0; j < 5; j++) {
+              out.write(buffer);
+              out.hflush();
+            }
+          } catch (IOException e) {
+            Assertions.fail("Write task failed under CPU stress", e);
+          } finally {
+            latch.countDown();
+          }
+        });
+      }
+
+      // Ensure all tasks complete (avoid deadlock/starvation)
+      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+      // Wait for CPU hogs to end and give monitor time to react
+      cpuHog1.join();
+      cpuHog2.join();
+      Thread.sleep(SLEEP_DURATION_30S_MS);
+
+      int resizedMax = executor.getMaximumPoolSize();
+
+      // Verify outcomes:
+      // 1. All write tasks succeeded despite CPU pressure
+      Assertions.assertThat(finished)
+          .as("All ABFS write tasks must complete despite CPU stress")
+          .isTrue();
+
+      // 2. Thread pool scaled down as expected
+      Assertions.assertThat(resizedMax)
+          .as("Thread pool should scale down under high CPU load")
+          .isLessThanOrEqualTo(initialMax);
+
+      // 3. No leftover tasks in the queue
+      Assertions.assertThat(executor.getQueue().size())
+          .as("No backlog should remain in the queue after workload")
+          .isEqualTo(0);
+
+      // Cleanup test data
+      for (int i = 0; i < taskCount; i++) {
+        try {
+          abfs.delete(new Path(base, "part-" + i), false);
+        } catch (IOException ignore) {
+          // Ignored: cleanup failures are non-fatal in tests
+        }
+      }
+      try {
+        abfs.delete(base, true);
+      } catch (IOException ignore) {
+        // Ignored: cleanup failures are non-fatal in tests
+      }
+      instance.close();
+    }
+  }
+
+
+  /**
+   * Verifies that when two parallel high memory–consuming workloads run,
+   * the WriteThreadPoolSizeManager detects the memory pressure and
+   * scales down the maximum thread pool size.
+   */
+  @Test
+  void testScalesDownOnParallelHighMemoryLoad() throws Exception {
+    // Initialize filesystem and thread pool manager
+    try (FileSystem fileSystem = 
FileSystem.newInstance(getRawConfiguration())) {
+      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+      WriteThreadPoolSizeManager instance =
+          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+              getConfiguration());
+      ThreadPoolExecutor executor =
+          (ThreadPoolExecutor) instance.getExecutorService();
+
+      // Begin monitoring resource usage (CPU + memory)
+      instance.startCPUMonitoring();
+
+      // Capture the initial thread pool size for later comparison
+      int initialMax = executor.getMaximumPoolSize();
+
+      // Define a workload that continuously allocates memory (~5 MB chunks)
+      // for ~5 seconds to simulate memory pressure in the JVM.
+      Runnable memoryBurn = () -> {
+        List<byte[]> allocations = new ArrayList<>();
+        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
+        while (System.currentTimeMillis() < end) {
+          try {
+            allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB
+            Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM
+          } catch (OutOfMemoryError oom) {
+            // Clear allocations if JVM runs out of memory and continue
+            allocations.clear();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      };
+
+      // Start two threads running the memory hog workload in parallel
+      Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1");
+      Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2");
+      memHog1.start();
+      memHog2.start();
+
+      // Submit several write tasks to ABFS while memory is under stress
+      int taskCount = 10;
+      CountDownLatch latch = new CountDownLatch(taskCount);
+      Path base = new Path(TEST_DIR_PATH);
+      abfs.mkdirs(base);
+      final byte[] buffer = new byte[TEST_FILE_LENGTH];
+      new Random().nextBytes(buffer);
+
+      for (int i = 0; i < taskCount; i++) {
+        final Path part = new Path(base, "part-" + i);
+        executor.submit(() -> {
+          try (FSDataOutputStream out = abfs.create(part, true)) {
+            for (int j = 0; j < 5; j++) {
+              out.write(buffer);
+              out.hflush();
+            }
+          } catch (IOException e) {
+            Assertions.fail("Write task failed under memory stress", e);
+          } finally {
+            latch.countDown();
+          }
+        });
+      }
+
+      // Ensure all tasks finish within a timeout
+      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+      // Wait for memory hog threads to finish
+      memHog1.join();
+      memHog2.join();
+
+      // Give monitoring thread time to detect memory pressure and react
+      Thread.sleep(SLEEP_DURATION_30S_MS);
+
+      int resizedMax = executor.getMaximumPoolSize();
+
+      // Validate that:
+      // 1. All ABFS writes succeeded despite memory stress
+      Assertions.assertThat(finished)
+          .as("All ABFS write tasks must complete despite parallel memory 
stress")
+          .isTrue();
+
+      // 2. The thread pool scaled down under memory pressure
+      Assertions.assertThat(resizedMax)
+          .as("Thread pool should scale down under parallel high memory load")
+          .isLessThanOrEqualTo(initialMax);
+
+      // 3. No tasks remain queued after workload completion
+      Assertions.assertThat(executor.getQueue().size())
+          .as("No backlog should remain in the queue after workload")
+          .isEqualTo(0);
+
+      // Clean up temporary test files
+      for (int i = 0; i < taskCount; i++) {
+        try {
+          abfs.delete(new Path(base, "part-" + i), false);
+        } catch (IOException ignore) {
+          // Ignored: cleanup failures are non-fatal in tests
+        }
+      }
+      try {
+        abfs.delete(base, true);
+      } catch (IOException ignore) {
+        // Ignored: cleanup failures are non-fatal in tests
+      }
+      instance.close();
+    }
+  }
+
+  /**
+   * Test that after a long idle period, the thread pool
+   * can quickly scale up in response to a sudden burst of load
+   * without performance degradation.
+   */
+  @Test
+  void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
+    // Initialize filesystem and thread pool manager
+    try (FileSystem fileSystem = FileSystem.newInstance(
+        getRawConfiguration())) {
+      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+      WriteThreadPoolSizeManager instance = 
WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+              abfs.getAbfsStore().getAbfsConfiguration());
+      ThreadPoolExecutor executor =
+          (ThreadPoolExecutor) instance.getExecutorService();
+
+      // --- Step 1: Simulate idle period ---
+      // Let the executor sit idle with no work for a few seconds
+      Thread.sleep(WAIT_TIMEOUT_MS);
+      int poolSizeAfterIdle = executor.getPoolSize();
+
+      // Verify that after idling, the pool is at or close to its minimum size
+      Assertions.assertThat(poolSizeAfterIdle)
+          .as("Pool size should remain minimal after idle")
+          .isLessThanOrEqualTo(executor.getCorePoolSize());
+
+      // --- Step 2: Submit a sudden burst of tasks ---
+      // Launch many short, CPU-heavy tasks at once to simulate burst load
+      int burstLoad = BURST_LOAD;
+      CountDownLatch latch = new CountDownLatch(burstLoad);
+      for (int i = 0; i < burstLoad; i++) {
+        executor.submit(() -> {
+          // Busy loop for ~200ms to simulate CPU work
+          long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS;
+          while (System.currentTimeMillis() < end) {
+            Math.sqrt(Math.random()); // burn CPU cycles
+          }
+          latch.countDown();
+        });
+      }
+
+      // --- Step 3: Give pool time to react ---
+      // Wait briefly so the pool’s scaling logic has a chance to expand
+      Thread.sleep(LOAD_SLEEP_DURATION_MS);
+      int poolSizeDuringBurst = executor.getPoolSize();
+
+      // Verify that the pool scaled up compared to idle
+      Assertions.assertThat(poolSizeDuringBurst)
+          .as("Pool size should increase after burst load")
+          .isGreaterThanOrEqualTo(poolSizeAfterIdle);
+
+// --- Step 4: Verify completion ---
+// Ensure all tasks complete successfully in a reasonable time,
+// proving there was no degradation or deadlock under burst load
+      Assertions.assertThat(
+              latch.await(LATCH_TIMEOUT_SECONDS / 2, TimeUnit.SECONDS))
+          .as("All burst tasks should finish in reasonable time")
+          .isTrue();
+      instance.close();
+    }
+  }
+}
+
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 372dbfc8033..54521c9c971 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -105,7 +105,7 @@ public void testMaxRequestsAndQueueCapacityDefaults() 
throws Exception {
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
 
       int maxConcurrentRequests
-          = getConfiguration().getWriteMaxConcurrentRequestCount();
+          = getConfiguration().getWriteConcurrentRequestCount();
       if (stream.isAppendBlobStream()) {
         maxConcurrentRequests = 1;
       }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index a4eefce0cb8..146eed84e80 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -96,7 +96,7 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             .disableOutputStreamFlush(disableOutputStreamFlush)
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
-            
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
+            
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
             .withClientHandler(clientHandler)
             .withPath(path)
@@ -613,7 +613,7 @@ private ExecutorService createExecutorService(
       AbfsConfiguration abfsConf) {
     ExecutorService executorService =
         new 
SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
-            abfsConf.getWriteMaxConcurrentRequestCount(),
+            abfsConf.getWriteConcurrentRequestCount(),
             abfsConf.getMaxWriteRequestsToQueue(),
             10L, TimeUnit.SECONDS,
             "abfs-test-bounded"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to