This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5cd20b9e30a HADOOP-19472. [ABFS] Remove write aggressiveness 
optimization (#8141)
5cd20b9e30a is described below

commit 5cd20b9e30a6220fbda8af80be48c851aaf57867
Author: Anmol Asrani <[email protected]>
AuthorDate: Tue Jan 27 15:56:34 2026 +0530

    HADOOP-19472. [ABFS] Remove write aggressiveness optimization (#8141)
    
    Contributed by Anmol Asrani
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 104 +--
 .../hadoop/fs/azurebfs/AbfsCountersImpl.java       |  23 -
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  34 +-
 .../fs/azurebfs/WriteThreadPoolSizeManager.java    | 568 ------------
 .../fs/azurebfs/constants/AbfsHttpConstants.java   |   2 -
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  37 -
 .../constants/FileSystemConfigurations.java        | 136 ---
 .../fs/azurebfs/constants/MetricsConstants.java    |  33 +
 .../AbfsWriteResourceUtilizationMetricsEnum.java   | 107 ---
 .../hadoop/fs/azurebfs/services/AbfsClient.java    |   5 -
 .../hadoop/fs/azurebfs/services/AbfsCounters.java  |   4 -
 .../fs/azurebfs/services/AbfsOutputStream.java     |  26 -
 .../azurebfs/services/AbfsOutputStreamContext.java |  14 -
 .../AbfsWriteResourceUtilizationMetrics.java       | 116 ---
 .../azurebfs/services/AzureBlobIngressHandler.java |   9 -
 .../azurebfs/services/AzureDFSIngressHandler.java  |   9 -
 .../AzureDfsToBlobIngressFallbackHandler.java      |   9 -
 .../fs/azurebfs/services/AzureIngressHandler.java  |  11 -
 .../fs/azurebfs/services/ReadBufferManagerV2.java  |  10 +-
 .../services/ResourceUtilizationStats.java         |  22 +-
 .../azurebfs/TestWriteThreadPoolSizeManager.java   | 963 ---------------------
 .../azurebfs/services/ITestAbfsOutputStream.java   |   2 +-
 .../fs/azurebfs/services/TestAbfsOutputStream.java |  11 +-
 23 files changed, 65 insertions(+), 2190 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 6c5929d2b90..6c95bd37689 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -541,62 +541,6 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
   private int maxApacheHttpClientIoExceptionsRetries;
 
-  @BooleanConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
-      DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
-  private boolean dynamicWriteThreadPoolEnablement;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS,
-      DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS)
-  private int writeThreadPoolKeepAliveTime;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
-      MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
-      MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
-      DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS)
-  private int writeCpuMonitoringInterval;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
-      MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
-      MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
-      DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT)
-  private int writeHighCpuThreshold;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
-      MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
-      MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
-      DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT)
-  private int writeMediumCpuThreshold;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
-      MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
-      DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
-  private int writeLowCpuThreshold;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
-      MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
-      DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
-  private int lowTierMemoryMultiplier;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
-      MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
-      DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
-  private int mediumTierMemoryMultiplier;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
-      MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
-      DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
-  private int highTierMemoryMultiplier;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
-      FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT,
-      DefaultValue = DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT)
-  private int writeHighMemoryUsageThresholdPercent;
-
-  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
-      FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT,
-      DefaultValue = DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT)
-  private int writeLowMemoryUsageThresholdPercent;
-
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
       FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = 
DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
       MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = 
MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
@@ -1955,60 +1899,16 @@ public ExponentialRetryPolicy 
getOauthTokenFetchRetryPolicy() {
         oauthTokenFetchRetryDeltaBackoff);
   }
 
-  public int getWriteConcurrentRequestCount() {
+  public int getWriteMaxConcurrentRequestCount() {
     if (this.writeMaxConcurrentRequestCount < 1) {
       return 4 * Runtime.getRuntime().availableProcessors();
     }
     return this.writeMaxConcurrentRequestCount;
   }
 
-  public int getWriteThreadPoolKeepAliveTime() {
-    return writeThreadPoolKeepAliveTime;
-  }
-
-  public int getWriteCpuMonitoringInterval() {
-    return writeCpuMonitoringInterval;
-  }
-
-  public boolean isDynamicWriteThreadPoolEnablement() {
-    return dynamicWriteThreadPoolEnablement;
-  }
-
-  public int getWriteLowCpuThreshold() {
-    return writeLowCpuThreshold;
-  }
-
-  public int getWriteMediumCpuThreshold() {
-    return writeMediumCpuThreshold;
-  }
-
-  public int getWriteHighCpuThreshold() {
-    return writeHighCpuThreshold;
-  }
-
-  public int getLowTierMemoryMultiplier() {
-    return lowTierMemoryMultiplier;
-  }
-
-  public int getMediumTierMemoryMultiplier() {
-    return mediumTierMemoryMultiplier;
-  }
-
-  public int getHighTierMemoryMultiplier() {
-    return highTierMemoryMultiplier;
-  }
-
-  public int getWriteHighMemoryUsageThresholdPercent() {
-    return writeHighMemoryUsageThresholdPercent;
-  }
-
-  public int getWriteLowMemoryUsageThresholdPercent() {
-    return writeLowMemoryUsageThresholdPercent;
-  }
-
   public int getMaxWriteRequestsToQueue() {
     if (this.maxWriteRequestsToQueue < 1) {
-      return 2 * getWriteConcurrentRequestCount();
+      return 2 * getWriteMaxConcurrentRequestCount();
     }
     return this.maxWriteRequestsToQueue;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 363ed34025a..91f83302dd4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -28,7 +28,6 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
 import 
org.apache.hadoop.fs.azurebfs.services.AbfsReadResourceUtilizationMetrics;
-import 
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -109,8 +108,6 @@ public class AbfsCountersImpl implements AbfsCounters {
 
   private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
 
-  private AbfsWriteResourceUtilizationMetrics 
abfsWriteResourceUtilizationMetrics = null;
-
   private AbfsReadResourceUtilizationMetrics 
abfsReadResourceUtilizationMetrics = null;
 
   private AtomicLong lastExecutionTime = null;
@@ -188,18 +185,6 @@ public void initializeReadResourceUtilizationMetrics() {
     abfsReadResourceUtilizationMetrics = new 
AbfsReadResourceUtilizationMetrics();
   }
 
-  /**
-   * Initializes the metrics collector for the write thread pool.
-   * <p>
-   * This method creates a new instance of {@link 
AbfsWriteResourceUtilizationMetrics}
-   * to track performance statistics and operational metrics related to
-   * write operations executed by the thread pool.
-   * </p>
-   */
-  public void initializeWriteResourceUtilizationMetrics() {
-    abfsWriteResourceUtilizationMetrics = new 
AbfsWriteResourceUtilizationMetrics();
-  }
-
 
   @Override
   public void initializeMetrics(final MetricFormat metricFormat,
@@ -315,14 +300,6 @@ public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
     return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null;
   }
 
-  /**
-   * Returns the write thread pool metrics instance, or {@code null} if 
uninitialized.
-   */
-  @Override
-  public AbfsWriteResourceUtilizationMetrics 
getAbfsWriteResourceUtilizationMetrics() {
-    return abfsWriteResourceUtilizationMetrics != null ? 
abfsWriteResourceUtilizationMetrics : null;
-  }
-
   /**
    * Returns the read thread pool metrics instance, or {@code null} if 
uninitialized.
    */
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 6ec5c51eb1d..055cee06256 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -45,7 +45,6 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.WeakHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -210,7 +209,6 @@ public class AzureBlobFileSystemStore implements Closeable, 
ListingSupport {
   private int blockOutputActiveBlocks;
   /** Bounded ThreadPool for this instance. */
   private ExecutorService boundedThreadPool;
-  private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
 
   /** ABFS instance reference to be held by the store to avoid GC close. */
   private BackReference fsBackRef;
@@ -286,18 +284,11 @@ public AzureBlobFileSystemStore(
     }
     this.blockFactory = abfsStoreBuilder.blockFactory;
     this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
-    if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
-      this.writeThreadPoolSizeManager = WriteThreadPoolSizeManager.getInstance(
-          getClient().getFileSystem() + "-" + UUID.randomUUID(),
-          abfsConfiguration, getClient().getAbfsCounters());
-      this.boundedThreadPool = writeThreadPoolSizeManager.getExecutorService();
-    } else {
-      this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-          abfsConfiguration.getWriteConcurrentRequestCount(),
-          abfsConfiguration.getMaxWriteRequestsToQueue(),
-          10L, TimeUnit.SECONDS,
-          "abfs-bounded");
-    }
+    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+        abfsConfiguration.getWriteMaxConcurrentRequestCount(),
+        abfsConfiguration.getMaxWriteRequestsToQueue(),
+        10L, TimeUnit.SECONDS,
+        "abfs-bounded");
   }
 
   /**
@@ -336,19 +327,17 @@ public void close() throws IOException {
     }
     try {
       Futures.allAsList(futures).get();
-      if (!abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
-        // shutdown the threadPool and set it to null.
-        HadoopExecutors.shutdown(boundedThreadPool, LOG,
-            30, TimeUnit.SECONDS);
-        boundedThreadPool = null;
-      }
+      // shutdown the threadPool and set it to null.
+      HadoopExecutors.shutdown(boundedThreadPool, LOG,
+          30, TimeUnit.SECONDS);
+      boundedThreadPool = null;
     } catch (InterruptedException e) {
       LOG.error("Interrupted freeing leases", e);
       Thread.currentThread().interrupt();
     } catch (ExecutionException e) {
       LOG.error("Error freeing leases", e);
     } finally {
-      IOUtils.cleanupWithLogger(LOG, writeThreadPoolSizeManager, 
getClientHandler());
+      IOUtils.cleanupWithLogger(LOG, getClientHandler());
     }
   }
 
@@ -817,7 +806,7 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
-            
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteConcurrentRequestCount())
+            
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
             
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .withLease(lease)
             .withEncryptionAdapter(contextEncryptionAdapter)
@@ -829,7 +818,6 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             .withPath(path)
             .withExecutorService(new 
SemaphoredDelegatingExecutor(boundedThreadPool,
                 blockOutputActiveBlocks, true))
-            .withWriteThreadPoolManager(writeThreadPoolSizeManager)
             .withTracingContext(tracingContext)
             .withAbfsBackRef(fsBackRef)
             .withIngressServiceType(clientHandler.getIngressServiceType())
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
deleted file mode 100644
index de11037780b..00000000000
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
-import 
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
-import org.apache.hadoop.fs.azurebfs.services.ResourceUtilizationStats;
-import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
-
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-
-/**
- * Manages a thread pool for writing operations, adjusting the pool size based 
on CPU utilization.
- */
-public final class WriteThreadPoolSizeManager implements Closeable {
-
-  /* Maximum allowed size for the thread pool. */
-  private final int maxThreadPoolSize;
-  /* Executor for periodically monitoring CPU usage. */
-  private final ScheduledExecutorService cpuMonitorExecutor;
-  /* Thread pool whose size is dynamically managed. */
-  private volatile ExecutorService boundedThreadPool;
-  /* Lock to ensure thread-safe updates to the thread pool. */
-  private final Lock lock = new ReentrantLock();
-  /* New computed max size for the thread pool after adjustment. */
-  private volatile int newMaxPoolSize;
-  /* Logger instance for logging events from WriteThreadPoolSizeManager. */
-  private static final Logger LOG = LoggerFactory.getLogger(
-      WriteThreadPoolSizeManager.class);
-  /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
-  private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
-      POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
-  /* Name of the filesystem associated with this manager. */
-  private final String filesystemName;
-  /* Initial size for the thread pool when created. */
-  private final int initialPoolSize;
-  /* The configuration instance. */
-  private final AbfsConfiguration abfsConfiguration;
-  /* Metrics collector for monitoring the performance of the ABFS write thread 
pool.  */
-  private final AbfsWriteResourceUtilizationMetrics writeThreadPoolMetrics;
-  /* Flag indicating if CPU monitoring has started. */
-  private volatile boolean isMonitoringStarted = false;
-  /* Tracks the last scale direction applied, or empty if none. */
-  private volatile String lastScaleDirection = EMPTY_STRING;
-  /* Maximum CPU utilization observed during the monitoring interval. */
-  private volatile long maxJvmCpuUtilization = 0L;
-  /** High memory usage threshold used to trigger thread pool downscaling. */
-  private final long highMemoryThreshold;
-  /** Low memory usage threshold used to allow thread pool upscaling. */
-  private final long lowMemoryThreshold;
-
-  /**
-   * Private constructor to initialize the write thread pool and CPU monitor 
executor
-   * based on system resources and ABFS configuration.
-   *
-   * @param filesystemName       Name of the ABFS filesystem.
-   * @param abfsConfiguration    Configuration containing pool size parameters.
-   * @param abfsCounters                  ABFS counters instance used for 
metrics.
-   */
-  private WriteThreadPoolSizeManager(String filesystemName,
-      AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
-    /* Retrieves and assigns the write thread pool metrics from the ABFS 
client counters. */
-    this.writeThreadPoolMetrics = 
abfsCounters.getAbfsWriteResourceUtilizationMetrics();
-    this.filesystemName = filesystemName;
-    this.abfsConfiguration = abfsConfiguration;
-    int availableProcessors = Runtime.getRuntime().availableProcessors();
-    /* Compute the max pool size */
-    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, 
ResourceUtilizationUtils.getAvailableMaxHeapMemory());
-
-    /* Get the initial pool size from config, fallback to at least 1 */
-    this.initialPoolSize = Math.max(1,
-        abfsConfiguration.getWriteConcurrentRequestCount());
-
-    /* Set the upper bound for the thread pool size */
-    this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
-    AtomicInteger threadCount = new AtomicInteger(1);
-    this.boundedThreadPool = Executors.newFixedThreadPool(
-        initialPoolSize,
-        r -> {
-          Thread t = new Thread(r);
-          t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
-          return t;
-        }
-    );
-    ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
-    int keepAlive = Math.max(1, 
abfsConfiguration.getWriteThreadPoolKeepAliveTime());
-    executor.setKeepAliveTime(keepAlive, TimeUnit.SECONDS);
-    executor.allowCoreThreadTimeOut(true);
-    /* Create a scheduled executor for CPU monitoring and pool adjustment */
-    this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
-    highMemoryThreshold = 
abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
-    lowMemoryThreshold = 
abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
-  }
-
-  /** Returns the internal {@link AbfsConfiguration}. */
-  private AbfsConfiguration getAbfsConfiguration() {
-    return abfsConfiguration;
-  }
-
-  /**
-   * Computes the maximum thread pool size based on the available processors
-   * and the initial available heap memory. The calculation uses a tiered
-   * multiplier derived from the memory-to-core ratio — systems with higher
-   * memory per core allow for a larger thread pool.
-   *
-   * @param availableProcessors the number of available CPU cores.
-   * @param initialAvailableHeapMemory the initial available heap memory, in 
bytes or GB (depending on implementation).
-   * @return the computed maximum thread pool size.
-   */
-  private int getComputedMaxPoolSize(final int availableProcessors, long 
initialAvailableHeapMemory) {
-    int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory, 
availableProcessors);
-    LOG.debug("Computed max thread pool size: {} | Available processors: {} | 
Heap memory (GB): {}",
-        maxpoolSize, availableProcessors, initialAvailableHeapMemory);
-    return maxpoolSize;
-  }
-
-  /**
-   * Determines the maximum thread count based on available heap memory and 
CPU cores.
-   * Calculates the thread count as {@code availableProcessors × multiplier}, 
where the
-   * multiplier is selected according to the heap memory tier (low, medium, or 
high).
-   *
-   * @param availableHeapGB       the available heap memory in gigabytes.
-   * @param availableProcessors   the number of available CPU cores.
-   * @return the maximum thread count based on memory tier and processor count.
-   */
-  private int getMemoryTierMaxThreads(long availableHeapGB, int 
availableProcessors) {
-    int multiplier;
-    if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
-      multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
-    } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
-      multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
-    } else {
-      multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
-    }
-    return availableProcessors * multiplier;
-  }
-
-  /**
-   * Returns the singleton {@link WriteThreadPoolSizeManager} instance for the 
specified filesystem.
-   * If an active instance already exists in the manager map for the given 
filesystem, it is returned.
-   * Otherwise, a new instance is created, registered in the map, and returned.
-   *
-   * @param filesystemName     the name of the filesystem.
-   * @param abfsConfiguration  the {@link AbfsConfiguration} associated with 
the filesystem.
-   * @param abfsCounters                the {@link AbfsCounters} used to 
initialize the manager.
-   * @return  the singleton {@link WriteThreadPoolSizeManager} instance for 
the given filesystem.
-   */
-  public static synchronized WriteThreadPoolSizeManager getInstance(
-      String filesystemName, AbfsConfiguration abfsConfiguration, AbfsCounters 
abfsCounters) {
-    /* Check if an instance already exists in the map for the given filesystem 
*/
-    WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
-        filesystemName);
-
-    /* If an existing instance is found, return it */
-    if (existingInstance != null && existingInstance.boundedThreadPool != null
-        && !existingInstance.boundedThreadPool.isShutdown()) {
-      return existingInstance;
-    }
-
-    /* Otherwise, create a new instance, put it in the map, and return it */
-    LOG.debug(
-        "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
-        filesystemName);
-    WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
-        filesystemName, abfsConfiguration, abfsCounters);
-    POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
-    return newInstance;
-  }
-
-  /**
-   * Adjusts the thread pool size to the specified maximum pool size.
-   *
-   * @param newMaxPoolSize the new maximum pool size.
-   */
-  private void adjustThreadPoolSize(int newMaxPoolSize) {
-    synchronized (this) {
-      ThreadPoolExecutor threadPoolExecutor
-          = ((ThreadPoolExecutor) boundedThreadPool);
-      int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
-
-      if (newMaxPoolSize >= currentCorePoolSize) {
-        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
-        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
-      } else {
-        threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
-        threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
-      }
-      LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size: 
{}, Active threads: {}",
-          newMaxPoolSize, threadPoolExecutor.getPoolSize(), 
threadPoolExecutor.getActiveCount());
-    }
-  }
-
-  /**
-   * Starts monitoring the CPU utilization and adjusts the thread pool size 
accordingly.
-   */
-  public synchronized void startCPUMonitoring() {
-    if (!isMonitoringStarted()) {
-      isMonitoringStarted = true;
-      cpuMonitorExecutor.scheduleAtFixedRate(() -> {
-            long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
-            LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
-            try {
-              adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(String.format(
-                  "Thread pool size adjustment interrupted for filesystem %s",
-                  filesystemName), e);
-            }
-          }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
-          TimeUnit.MILLISECONDS);
-    }
-  }
-
-  /**
-   * Dynamically adjusts the thread pool size based on current CPU utilization
-   * and available heap memory relative to the initially available heap.
-   *
-   * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
-   *  @throws InterruptedException if the resizing operation is interrupted 
while acquiring the lock
-   */
-  public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws 
InterruptedException {
-    lock.lock();
-    try {
-      ThreadPoolExecutor executor = (ThreadPoolExecutor) 
this.boundedThreadPool;
-      int currentPoolSize = executor.getMaximumPoolSize();
-      long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
-      long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
-      long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
-      long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
-      LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, 
cpuUtilization);
-      if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
-        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, 
memoryLoad);
-        if (currentPoolSize == initialPoolSize && newMaxPoolSize == 
initialPoolSize) {
-          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
-        }
-      } else if (cpuUtilization > 
(abfsConfiguration.getWriteMediumCpuThreshold())) {
-        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, 
memoryLoad);
-        if (currentPoolSize == initialPoolSize && newMaxPoolSize == 
initialPoolSize) {
-          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
-        }
-      } else if (cpuUtilization < 
(abfsConfiguration.getWriteLowCpuThreshold())) {
-        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, 
memoryLoad);
-        if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == 
maxThreadPoolSize) {
-          lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
-        }
-      } else {
-        newMaxPoolSize = currentPoolSize;
-        LOG.debug("CPU load normal ({}). No change: current={}", 
cpuUtilization, currentPoolSize);
-      }
-      boolean willResize = newMaxPoolSize != currentPoolSize;
-      if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
-        WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, 
memoryLoad,
-            usedHeapMemory, availableMemory, committedMemory);
-        // Update the write thread pool metrics with the latest statistics 
snapshot.
-        writeThreadPoolMetrics.update(stats);
-      }
-      // Case 1: CPU increased — push metrics ONLY if not resizing
-      if (cpuUtilization > maxJvmCpuUtilization) {
-        maxJvmCpuUtilization = cpuUtilization;
-        if (!willResize) {
-          try {
-            // Capture the latest thread pool statistics (pool size, CPU, 
memory, etc.).
-            WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, 
memoryLoad,
-                usedHeapMemory, availableMemory, committedMemory);
-            // Update the write thread pool metrics with the latest statistics 
snapshot.
-            writeThreadPoolMetrics.update(stats);
-          } catch (Exception e) {
-            LOG.debug("Error updating write thread pool metrics", e);
-          }
-        }
-      }
-      // Case 2: Resize — always push metrics
-      if (willResize) {
-        LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, 
newMaxPoolSize);
-        // Record scale direction
-        lastScaleDirection = (newMaxPoolSize > currentPoolSize) ? 
SCALE_DIRECTION_UP: SCALE_DIRECTION_DOWN;
-        adjustThreadPoolSize(newMaxPoolSize);
-        try {
-          // Capture the latest thread pool statistics (pool size, CPU, 
memory, etc.).
-          WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, 
memoryLoad,
-              usedHeapMemory, availableMemory, committedMemory);
-          // Update the write thread pool metrics with the latest statistics 
snapshot.
-          writeThreadPoolMetrics.update(stats);
-        } catch (Exception e) {
-          LOG.debug("Error updating write thread pool metrics after 
resizing.", e);
-        }
-      }
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  /**
-   * Calculates a reduced thread pool size when high CPU utilization is 
detected.
-   * The reduction strategy depends on available heap memory:
-   * if heap usage is high (low free memory), the pool size is reduced 
aggressively;
-   * otherwise, it is reduced moderately to prevent resource contention.
-   *
-   * @param currentPoolSize  the current size of the thread pool.
-   *  @param memoryLoad      the current JVM heap load (0.0–1.0)
-   * @return the adjusted (reduced) pool size based on CPU and memory 
conditions.
-   */
-  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, double 
memoryLoad) {
-    LOG.debug("The high cpu memory load is {}", memoryLoad);
-    if (memoryLoad > highMemoryThreshold) {
-      LOG.debug("High CPU & high memory load ({}). Aggressive reduction: 
current={}, new={}",
-          memoryLoad, currentPoolSize, currentPoolSize / 
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
-      return Math.max(initialPoolSize, currentPoolSize / 
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
-    }
-    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize 
/ HIGH_CPU_REDUCTION_FACTOR);
-    LOG.debug("High CPU ({}). Reducing pool size moderately: current={}, 
new={}",
-        abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize, 
reduced);
-    return reduced;
-  }
-
-  /**
-   * Calculates a reduced thread pool size when medium CPU utilization is 
detected.
-   * The reduction is based on available heap memory: if memory is low, the 
pool size
-   * is reduced more aggressively; otherwise, a moderate reduction is applied 
to
-   * maintain balanced performance.
-   *
-   * @param currentPoolSize  the current size of the thread pool.
-   * @param memoryLoad      the current JVM heap load (0.0–1.0)
-   * @return the adjusted (reduced) pool size based on medium CPU and memory 
conditions.
-   */
-  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, double 
memoryLoad) {
-    LOG.debug("The medium cpu memory load is {}", memoryLoad);
-    if (memoryLoad > highMemoryThreshold) {
-      int reduced = Math.max(initialPoolSize, currentPoolSize - 
currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
-      LOG.debug("Medium CPU & high memory load ({}). Reducing: current={}, 
new={}",
-          memoryLoad, currentPoolSize, reduced);
-      return reduced;
-    }
-    int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize 
/ MEDIUM_CPU_REDUCTION_FACTOR);
-    LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
-        abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize, 
reduced);
-    return reduced;
-  }
-
-  /**
-   * Calculates an adjusted thread pool size when low CPU utilization is 
detected.
-   * If sufficient heap memory is available, the pool size is increased to 
improve throughput.
-   * Otherwise, it is slightly decreased to conserve memory resources.
-   *
-   * @param currentPoolSize  the current size of the thread pool.
-   * @param memoryLoad      the current JVM heap load (0.0–1.0)
-   * @return the adjusted (increased or decreased) pool size based on CPU and 
memory conditions.
-   */
-  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, double 
memoryLoad) {
-    LOG.debug("The low cpu memory load is {}", memoryLoad);
-    if (memoryLoad <= lowMemoryThreshold) {
-      int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * 
LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
-      LOG.debug("Low CPU & low memory load ({}). Increasing: current={}, 
new={}",
-          memoryLoad, currentPoolSize, increased);
-      return increased;
-    } else {
-      // Decrease by 10%
-      int decreased = Math.max(1, (int) (currentPoolSize * 
LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
-      LOG.debug("Low CPU but insufficient heap. Decreasing: current={}, 
new={}", currentPoolSize, decreased);
-      return decreased;
-    }
-  }
-
-  /**
-   * Returns the executor service for the thread pool.
-   *
-   * @return the executor service.
-   */
-  public ExecutorService getExecutorService() {
-    return boundedThreadPool;
-  }
-
-  /**
-   * Returns the scheduled executor responsible for CPU monitoring and dynamic 
pool adjustment.
-   *
-   * @return the {@link ScheduledExecutorService} used for CPU monitoring.
-   */
-  public ScheduledExecutorService getCpuMonitorExecutor() {
-    return cpuMonitorExecutor;
-  }
-
-  /**
-   * Checks if monitoring has started.
-   *
-   * @return true if monitoring has started, false otherwise.
-   */
-  public synchronized boolean isMonitoringStarted() {
-    return isMonitoringStarted;
-  }
-
-  /**
-   * Returns the maximum JVM CPU utilization observed during the current
-   * monitoring interval or since the last reset.
-   *
-   * @return the highest JVM CPU utilization percentage recorded
-   */
-  @VisibleForTesting
-  public long getMaxJvmCpuUtilization() {
-    return maxJvmCpuUtilization;
-  }
-
-  /**
-   * Closes this manager by shutting down executors and cleaning up resources.
-   * Removes the instance from the active manager map.
-   *
-   * @throws IOException if an error occurs during shutdown.
-   */
-  @Override
-  public void close() throws IOException {
-    synchronized (this) {
-      try {
-        // Shutdown CPU monitor
-        if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
-          cpuMonitorExecutor.shutdown();
-        }
-        // Gracefully shutdown the bounded thread pool
-        if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
-          boundedThreadPool.shutdown();
-          if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS, 
TimeUnit.SECONDS)) {
-            LOG.warn("Bounded thread pool did not terminate in time, forcing 
shutdownNow for filesystem: {}", filesystemName);
-            boundedThreadPool.shutdownNow();
-          }
-          boundedThreadPool = null;
-        }
-        // Remove from the map
-        POOL_SIZE_MANAGER_MAP.remove(filesystemName);
-        LOG.debug("Closed and removed instance for filesystem: {}", 
filesystemName);
-      } catch (Exception e) {
-        LOG.warn("Failed to properly close instance for filesystem: {}", 
filesystemName, e);
-      }
-    }
-  }
-
-  /**
-   * Represents current statistics of the write thread pool and system.
-   */
-  public static class WriteThreadPoolStats extends ResourceUtilizationStats {
-
-    /**
-     * Constructs a {@link WriteThreadPoolStats} instance containing thread 
pool
-     * metrics and JVM/system resource utilization details.
-     *
-     * @param currentPoolSize the current number of threads in the pool
-     * @param maxPoolSize the maximum number of threads permitted in the pool
-     * @param activeThreads the number of threads actively executing tasks
-     * @param idleThreads the number of idle threads in the pool
-     * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
-     * @param systemCpuUtilization the current system-wide CPU utilization 
(0.0–1.0)
-     * @param availableHeapGB the available heap memory in gigabytes
-     * @param committedHeapGB the committed heap memory in gigabytes
-     * @param usedHeapGB the available heap memory in gigabytes
-     * @param maxHeapGB the committed heap memory in gigabytes
-     * @param memoryLoad the JVM memory load (used / max)
-     * @param lastScaleDirection the last scaling action performed: "I" 
(increase),
-     * "D" (decrease), or empty if no scaling occurred
-     * @param maxCpuUtilization the peak JVM CPU utilization observed during 
this interval
-     * @param jvmProcessId the process ID of the JVM
-     */
-    public WriteThreadPoolStats(int currentPoolSize,
-        int maxPoolSize, int activeThreads, int idleThreads,
-        long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
-        long committedHeapGB, long usedHeapGB, long maxHeapGB, long 
memoryLoad, String lastScaleDirection,
-        long maxCpuUtilization, long jvmProcessId) {
-      super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
-          jvmCpuLoad, systemCpuUtilization, availableHeapGB,
-          committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, 
lastScaleDirection,
-          maxCpuUtilization, jvmProcessId);
-    }
-  }
-
-  /**
-   * Returns a snapshot of the current write thread pool and JVM/system 
resource
-   * statistics.
-   *
-   * <p>The snapshot includes thread pool size and activity, JVM and system CPU
-   * utilization, and JVM heap memory metrics. These values are used for 
monitoring
-   * and for making dynamic scaling decisions for the write thread pool.</p>
-   *
-   * @param jvmCpuUtilization current JVM CPU utilization
-   * @param memoryLoad current JVM memory load ratio (used / max)
-   * @param usedMemory current used JVM heap memory
-   * @param availableMemory current available JVM heap memory
-   * @param committedMemory current committed JVM heap memory
-   *
-   * @return a {@link WriteThreadPoolStats} instance containing the current 
metrics
-   */
-  synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
-      long memoryLoad, long usedMemory, long availableMemory, long 
committedMemory) {
-
-    if (boundedThreadPool == null) {
-      return new WriteThreadPoolStats(
-          ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
-          ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
-    }
-
-    ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
-
-    String currentScaleDirection = lastScaleDirection;
-    lastScaleDirection = EMPTY_STRING;
-
-    int poolSize = exec.getPoolSize();
-    int activeThreads = exec.getActiveCount();
-    int idleThreads = poolSize - activeThreads;
-
-    return new WriteThreadPoolStats(
-        poolSize,                      // Current thread count
-        exec.getMaximumPoolSize(),     // Max allowed threads
-        activeThreads,                 // Busy threads
-        idleThreads,                   // Idle threads
-        jvmCpuUtilization,             // JVM CPU usage (ratio)
-        ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage 
(ratio)
-        availableMemory, // Free heap (GB)
-        committedMemory, // Committed heap (GB)
-        usedMemory,   // Used heap (GB)
-        ResourceUtilizationUtils.getMaxHeapMemory(),    // Max heap (GB)
-        memoryLoad,                    // used/max
-        currentScaleDirection,         // "I", "D", or ""
-        getMaxJvmCpuUtilization(),              // Peak JVM CPU usage so far
-        JvmUniqueIdProvider.getJvmId()            // JVM PID
-    );
-  }
-}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 918997ab43b..e5db0653f3c 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -183,8 +183,6 @@ public final class AbfsHttpConstants {
   public static final char CHAR_EQUALS = '=';
   public static final char CHAR_STAR = '*';
   public static final char CHAR_PLUS = '+';
-  public static final int LOW_HEAP_SPACE_FACTOR = 4;
-  public static final double MEDIUM_HEAP_SPACE_FACTOR = 8;
 
   public static final int SPLIT_NO_LIMIT = -1;
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 115568c7853..e3b3c2a4675 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -567,43 +567,6 @@ public static String containerProperty(String property, 
String fsName, String ac
   /**Maximum number of thread per blob-delete orchestration: {@value}*/
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = 
"fs.azure.blob.dir.delete.max.thread";
 
-  /** Configuration key for the keep-alive time (ms) for the write thread 
pool. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS 
= "fs.azure.write.threadpool.keep.alive.time.millis";
-
-  /** Configuration key for the CPU monitoring interval (ms) during write 
operations. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 
"fs.azure.write.cpu.monitoring.interval.millis";
-
-  /** Configuration key to enable or disable dynamic write thread pool 
adjustment. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = 
"fs.azure.write.dynamic.threadpool.enablement";
-
-  /** Configuration key for the high CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.high.cpu.threshold.percent";
-
-  /** Configuration key for the medium CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.medium.cpu.threshold.percent";
-
-  /** Configuration key for the low CPU utilization threshold (%) for write 
scaling. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT = 
"fs.azure.write.low.cpu.threshold.percent";
-
-  /** Configuration key for the low-tier memory multiplier for write 
workloads. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.low.tier.memory.multiplier";
-
-  /** Configuration key for the medium-tier memory multiplier for write 
workloads. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.medium.tier.memory.multiplier";
-
-  /** Configuration key for the high-tier memory multiplier for write 
workloads. Value: {@value}. */
-  public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.high.tier.memory.multiplier";
-
-  /**
-   * Threshold percentage for high memory usage to scale up/down the buffer 
pool size in write code.
-   */
-  public static final String 
FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 
"fs.azure.write.high.memory.usage.threshold.percent";
-
-  /**
-   * Threshold percentage for low memory usage to scale up/down the buffer 
pool size in write code.
-   */
-  public static final String FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT 
= "fs.azure.write.low.memory.usage.threshold.percent";
-
   /**Flag to enable/disable sending client transactional ID during 
create/rename operations: {@value}*/
   public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
"fs.azure.enable.client.transaction.id";
   /**Flag to enable/disable create idempotency during create operation: 
{@value}*/
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a7717c124db..feafbe4f3a5 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -43,19 +43,6 @@ public final class FileSystemConfigurations {
    * Number of bytes in a gigabyte.
    */
   public static final long BYTES_PER_GIGABYTE = 1024L * 1024 * 1024;
-  /**
-   * Factor by which the pool size is increased when CPU utilization is low.
-   */
-  public static final double LOW_CPU_POOL_SIZE_INCREASE_FACTOR = 1.5;
-  public static final double LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR = 0.9;
-  public static final int HIGH_CPU_REDUCTION_FACTOR = 3;
-  public static final int HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR = 2;
-  public static final int MEDIUM_CPU_REDUCTION_FACTOR = 5;
-  public static final int MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR = 3;
-  public static final int HIGH_MEDIUM_HEAP_FACTOR = 2;
-  public static final double LOW_CPU_HEAP_FACTOR = 0.8;
-
-
 
   // Retry parameter defaults.
   public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 500;  // 500ms
@@ -254,28 +241,6 @@ public final class FileSystemConfigurations {
   public static final int HUNDRED = 100;
   public static final double HUNDRED_D = 100.0;
   public static final long THOUSAND = 1000L;
-  // Indicates a successful scale-up operation
-  public static final int SCALE_UP = 1;
-  // Indicates a successful scale-down operation
-  public static final int SCALE_DOWN = -1;
-  // Indicates a down-scale was requested but already at minimum
-  public static final int NO_SCALE_DOWN_AT_MIN = -2;
-  // Indicates an up-scale was requested but already at maximum
-  public static final int NO_SCALE_UP_AT_MAX = 2;
-  // Indicates no scaling action was taken
-  public static final int SCALE_NONE = 0;
-  // Indicates no action is needed based on current metrics
-  public static final int NO_ACTION_NEEDED = 3;
-  // Indicates a successful scale-up operation
-  public static final String SCALE_DIRECTION_UP = "I";
-  // Indicates a successful scale-down operation
-  public static final String SCALE_DIRECTION_DOWN = "D";
-  // Indicates a down-scale was requested but pool is already at minimum
-  public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
-  // Indicates an up-scale was requested but pool is already at maximum
-  public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
-  // Indicates no scaling action is needed based on current metrics
-  public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
       = HttpOperationType.APACHE_HTTP_CLIENT;
@@ -320,107 +285,6 @@ public final class FileSystemConfigurations {
 
   public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD = 
DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
 
-  /**
-   * Whether dynamic write thread pool adjustment is enabled by default.
-   */
-  public static final boolean DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = 
false;
-
-  /**
-   * Default keep-alive time (in milliseconds) for write thread pool threads.
-   */
-  public static final int DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS = 
30_000;
-
-  /**
-   * Minimum interval (in milliseconds) for CPU monitoring during write 
operations.
-   */
-  public static final int MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 10_000;
-
-  /**
-   * Maximum interval (in milliseconds) for CPU monitoring during write 
operations.
-   */
-  public static final int MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 60_000;
-
-  /**
-   * Default interval (in milliseconds) for CPU monitoring during write 
operations.
-   */
-  public static final int DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 
15_000;
-
-  /**
-   * Minimum CPU utilization percentage considered as high threshold for write 
scaling.
-   */
-  public static final int MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 65;
-
-  /**
-   * Maximum CPU utilization percentage considered as high threshold for write 
scaling.
-   */
-  public static final int MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 90;
-
-  /**
-   * Default CPU utilization percentage considered as high threshold for write 
scaling.
-   */
-  public static final int DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 80;
-
-  /**
-   * Minimum CPU utilization percentage considered as medium threshold for 
write scaling.
-   */
-  public static final int MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 45;
-
-  /**
-   * Maximum CPU utilization percentage considered as medium threshold for 
write scaling.
-   */
-  public static final int MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 65;
-
-  /**
-   * Default CPU utilization percentage considered as medium threshold for 
write scaling.
-   */
-  public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
-
-  /**
-   * Maximum CPU utilization percentage considered as low threshold for write 
scaling.
-   */
-  public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
-
-  /**
-   * Default CPU utilization percentage considered as low threshold for write 
scaling.
-   */
-  public static final int DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT = 35;
-
-  /**
-   * Minimum multiplier applied to available memory for low-tier write 
workloads.
-   */
-  public static final int MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 3;
-
-  /**
-   * Default multiplier applied to available memory for low-tier write 
workloads.
-   */
-  public static final int DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 4;
-
-  /**
-   * Minimum multiplier applied to available memory for medium-tier write 
workloads.
-   */
-  public static final int MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
-
-  /**
-   * Default multiplier applied to available memory for medium-tier write 
workloads.
-   */
-  public static final int DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 8;
-
-  /**
-   * Minimum multiplier applied to available memory for high-tier write 
workloads.
-   */
-  public static final int MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 12;
-
-  /**
-   * Default multiplier applied to available memory for high-tier write 
workloads.
-   */
-  public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
-
-  /** Percentage threshold of heap usage at which memory pressure is 
considered high. */
-  public static final int DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 
60;
-
-  /** Percentage threshold of heap usage at which memory pressure is 
considered low. */
-  public static final int DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 
30;
-
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
true;
 
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY 
= true;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
index 71d314a7d5e..3312e2e491a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
@@ -114,6 +114,39 @@ public final class MetricsConstants {
      */
     public static final String READ_LENGTH = "$RL=";
 
+  // Indicates a successful scale-up operation
+  public static final int SCALE_UP = 1;
+
+  // Indicates a successful scale-down operation
+  public static final int SCALE_DOWN = -1;
+
+  // Indicates a down-scale was requested but already at minimum
+  public static final int NO_SCALE_DOWN_AT_MIN = -2;
+
+  // Indicates an up-scale was requested but already at maximum
+  public static final int NO_SCALE_UP_AT_MAX = 2;
+
+  // Indicates no scaling action was taken
+  public static final int SCALE_NONE = 0;
+
+  // Indicates no action is needed based on current metrics
+  public static final int NO_ACTION_NEEDED = 3;
+
+  // Indicates a successful scale-up operation
+  public static final String SCALE_DIRECTION_UP = "I";
+
+  // Indicates a successful scale-down operation
+  public static final String SCALE_DIRECTION_DOWN = "D";
+
+  // Indicates a down-scale was requested but pool is already at minimum
+  public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
+
+  // Indicates an up-scale was requested but pool is already at maximum
+  public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
+
+  // Indicates no scaling action is needed based on current metrics
+  public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
+
     // Private constructor to prevent instantiation
     private MetricsConstants() {
         throw new AssertionError("Cannot instantiate MetricsConstants");
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
deleted file mode 100644
index 1f92626eeb4..00000000000
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.enums;
-
-/**
- * Enum representing the set of metrics tracked for the ABFS write thread pool.
- * Each metric entry defines a short name identifier and its corresponding
- * {@link StatisticTypeEnum}, which specifies the type of measurement (e.g., 
gauge).
- * These metrics are used for monitoring and analyzing the performance and
- * resource utilization of the write thread pool.
- */
-public enum AbfsWriteResourceUtilizationMetricsEnum implements
-    AbfsResourceUtilizationMetricsEnum {
-
-  /** Current number of threads in the write thread pool. */
-  CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Maximum configured size of the write thread pool. */
-  MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Number of threads currently executing write operations. */
-  ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Number of threads currently idle. */
-  IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
-  JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Overall system-wide CPU utilization percentage during write operations. 
*/
-  SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Available heap memory (in GB) measured during write operations. */
-  AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Committed heap memory (in GB) measured during write operations. */
-  COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Used heap memory (in GB) measured during write operations. */
-  USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Maximum heap memory (in GB) measured during write operations. */
-  MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Available heap memory (in GB) measured during write operations. */
-  MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
-  LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** Maximum CPU utilization recorded during the monitoring interval. */
-  MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
-
-  /** The process ID (PID) of the running JVM, useful for correlating metrics 
with system-level process information. */
-  JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
-
-  private final String name;
-  private final StatisticTypeEnum statisticType;
-
-  /**
-   * Constructs a metric definition for the ABFS write thread pool.
-   *
-   * @param name  the short name identifier for the metric.
-   * @param type  the {@link StatisticTypeEnum} describing the metric type.
-   */
-  AbfsWriteResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type) 
{
-    this.name = name;
-    this.statisticType = type;
-  }
-
-  /**
-   * Returns the short name identifier of the metric.
-   *
-   * @return the metric name.
-   */
-  @Override
-  public String getName() {
-    return name;
-  }
-
-  /**
-   * Returns the {@link StatisticTypeEnum} associated with this metric.
-   *
-   * @return the metric's statistic type.
-   */
-  @Override
-  public StatisticTypeEnum getStatisticType() {
-    return statisticType;
-  }
-}
-
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index db3e163ca4d..a781e12c86a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -276,11 +276,6 @@ private AbfsClient(final URL baseUrl,
     // register the client to Aggregated Metrics Manager
     abfsMetricsManager.getAggregateMetricsManager()
         .registerClient(accountName, this);
-
-    // Initialize write thread pool metrics if dynamic write thread pool 
scaling is enabled.
-    if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
-      abfsCounters.initializeWriteResourceUtilizationMetrics();
-    }
     // Initialize read thread pool metrics if ReadAheadV2 and its dynamic 
scaling feature are enabled.
     if (abfsConfiguration.isReadAheadV2Enabled() && 
abfsConfiguration.isReadAheadV2DynamicScalingEnabled()) {
       abfsCounters.initializeReadResourceUtilizationMetrics();
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
index 8ada20abf43..ab42bc94bd6 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
@@ -89,10 +89,6 @@ void initializeMetrics(MetricFormat metricFormat,
 
   AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics();
 
-  void initializeWriteResourceUtilizationMetrics();
-
-  AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics();
-
   AtomicLong getLastExecutionTime();
 
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 883af56c589..cb48fea42fc 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -31,7 +31,6 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
@@ -168,13 +167,6 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable,
    */
   private MessageDigest fullBlobContentMd5 = null;
 
-  /**
-   * Instance of {@link WriteThreadPoolSizeManager} used by this class
-   * to dynamically adjust the write thread pool size based on
-   * system resource utilization.
-   */
-  private final WriteThreadPoolSizeManager writeThreadPoolSizeManager;
-
   public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
       throws IOException {
     this.statistics = abfsOutputStreamContext.getStatistics();
@@ -225,9 +217,6 @@ public AbfsOutputStream(AbfsOutputStreamContext 
abfsOutputStreamContext)
     this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
     this.currentExecutingServiceType = 
abfsOutputStreamContext.getIngressServiceType();
     this.clientHandler = abfsOutputStreamContext.getClientHandler();
-    this.writeThreadPoolSizeManager = 
abfsOutputStreamContext.getWriteThreadPoolSizeManager();
-    // Initialize CPU monitoring if the pool size manager is present
-    initializeMonitoringIfNeeded();
     createIngressHandler(serviceTypeAtInit,
         abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
     try {
@@ -253,21 +242,6 @@ public AzureIngressHandler getIngressHandler() {
 
   private volatile boolean switchCompleted = false;
 
-  /**
-   * Starts CPU monitoring in the thread pool size manager if it
-   * is initialized and not already monitoring.
-   */
-  private void initializeMonitoringIfNeeded() {
-    if (writeThreadPoolSizeManager != null && 
!writeThreadPoolSizeManager.isMonitoringStarted()) {
-      synchronized (this) {
-        // Re-check to avoid a race between threads
-        if (!writeThreadPoolSizeManager.isMonitoringStarted()) {
-          writeThreadPoolSizeManager.startCPUMonitoring();
-        }
-      }
-    }
-  }
-
   /**
    * Creates or retrieves an existing Azure ingress handler based on the 
service type and provided parameters.
    * <p>
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 68a2ba04d20..ceae24b0ee6 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -21,7 +21,6 @@
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -80,9 +79,6 @@ public class AbfsOutputStreamContext extends 
AbfsStreamContext {
 
   private AbfsClientHandler clientHandler;
 
-  /** Reference to the thread pool manager. */
-  private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
-
   public AbfsOutputStreamContext(final long 
sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -232,12 +228,6 @@ public AbfsOutputStreamContext withEncryptionAdapter(
     return this;
   }
 
-  public AbfsOutputStreamContext withWriteThreadPoolManager(
-      final WriteThreadPoolSizeManager writeThreadPoolSizeManager) {
-    this.writeThreadPoolSizeManager = writeThreadPoolSizeManager;
-    return this;
-  }
-
   public int getWriteBufferSize() {
     return writeBufferSize;
   }
@@ -338,10 +328,6 @@ public AbfsClientHandler getClientHandler() {
     return clientHandler;
   }
 
-  public WriteThreadPoolSizeManager getWriteThreadPoolSizeManager() {
-    return writeThreadPoolSizeManager;
-  }
-
   /**
    * Checks if small write is supported based on the current configuration.
    *
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
deleted file mode 100644
index 6c44c20b31b..00000000000
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import 
org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
-import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
-
-/**
- * Metrics container for the ABFS write thread pool.
- * <p>
- * This class records pool size, CPU utilization, memory usage,
- * scaling direction, and other runtime indicators reported by
- * {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}.
- * </p>
- */
-public class AbfsWriteResourceUtilizationMetrics
-    extends
-    
AbstractAbfsResourceUtilizationMetrics<AbfsWriteResourceUtilizationMetricsEnum> 
{
-
-  /**
-   * A version counter incremented each time a metric update occurs.
-   * Used to detect whether metrics have changed since the last serialization.
-   */
-  private final AtomicLong updateVersion = new AtomicLong(0);
-
-  /**
-   * The last version number that was serialized and pushed out.
-   */
-  private final AtomicLong lastPushedVersion = new AtomicLong(0);
-
-  /**
-   * Creates a metrics set for write operations, pre-initializing
-   * all metric keys defined in {@link 
AbfsWriteResourceUtilizationMetricsEnum}.
-   */
-  public AbfsWriteResourceUtilizationMetrics() {
-    super(AbfsWriteResourceUtilizationMetricsEnum.values(), 
FSOperationType.WRITE.toString());
-  }
-
-  @Override
-  protected boolean isUpdated() {
-    return updateVersion.get() > lastPushedVersion.get();
-  }
-
-  protected void markUpdated() {
-    updateVersion.incrementAndGet();
-  }
-
-  @Override
-  protected long getUpdateVersion() {
-    return updateVersion.get();
-  }
-
-  @Override
-  protected long getLastPushedVersion() {
-    return lastPushedVersion.get();
-  }
-
-  /**
-   * Marks the current metrics version as pushed.
-   * Must be called only after the metrics string is actually emitted.
-   */
-  @Override
-  public synchronized void markPushed() {
-    lastPushedVersion.set(updateVersion.get());
-  }
-
-  /**
-   * Updates all write-thread-pool metrics using the latest stats snapshot.
-   * Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
-   * is mapped to a corresponding metric.
-   *
-   * @param stats the latest thread-pool statistics; ignored if {@code null}
-   */
-  public synchronized void 
update(WriteThreadPoolSizeManager.WriteThreadPoolStats stats) {
-    if (stats == null) {
-      return;
-    }
-
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE, 
stats.getCurrentPoolSize());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE, 
stats.getMaxPoolSize());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS, 
stats.getActiveThreads());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS, 
stats.getIdleThreads());
-    
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, 
stats.getJvmCpuLoad());
-    
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, 
stats.getSystemCpuUtilization());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, 
stats.getMemoryUtilization());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY, 
stats.getCommittedHeapGB());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY, 
stats.getUsedHeapGB());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, 
stats.getMaxHeapGB());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD, 
stats.getMemoryLoad());
-    
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
-        stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
-    
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, 
stats.getMaxCpuUtilization());
-    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID, 
stats.getJvmProcessId());
-
-    markUpdated();
-  }
-}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 6877d5a03a9..8610adde30b 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -125,15 +125,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock 
blockToUpload,
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
     tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
     
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
-     // Fetches write thread pool metrics from the ABFS client and adds them 
to the tracing context.
-    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = 
getWriteResourceUtilizationMetrics();
-    if (writeResourceUtilizationMetrics != null) {
-      String writeMetrics = writeResourceUtilizationMetrics.toString();
-      tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
-      if (!writeMetrics.isEmpty()) {
-        writeResourceUtilizationMetrics.markPushed();
-      }
-    }
     try {
       LOG.trace("Starting remote write for block with ID {} and offset {}",
           blockToUpload.getBlockId(), blockToUpload.getOffset());
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index bc5e6b7c521..96a8f07cc3d 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -117,15 +117,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock 
blockToUpload,
       AppendRequestParameters reqParams,
       TracingContext tracingContext) throws IOException {
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
-    // Fetches write thread pool metrics from the ABFS client and adds them to 
the tracing context.
-    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = 
getWriteResourceUtilizationMetrics();
-    if (writeResourceUtilizationMetrics != null) {
-      String writeMetrics = writeResourceUtilizationMetrics.toString();
-      tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
-      if (!writeMetrics.isEmpty()) {
-        writeResourceUtilizationMetrics.markPushed();
-      }
-    }
     String threadIdStr = String.valueOf(Thread.currentThread().getId());
     if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
       tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index 8db94cc3e4c..cfa01315f2a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -110,15 +110,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock 
blockToUpload,
       TracingContext tracingContext) throws IOException {
     AbfsRestOperation op;
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
-    // Fetches write thread pool metrics from the ABFS client and adds them to 
the tracing context.
-    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = 
getWriteResourceUtilizationMetrics();
-    if (writeResourceUtilizationMetrics != null) {
-      String writeMetrics = writeResourceUtilizationMetrics.toString();
-      tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
-      if (!writeMetrics.isEmpty()) {
-        writeResourceUtilizationMetrics.markPushed();
-      }
-    }
     String threadIdStr = String.valueOf(Thread.currentThread().getId());
     tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + 
threadIdStr);
     
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
index 48f7058ffb7..81007e1c3dd 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
@@ -236,15 +236,4 @@ protected String computeFullBlobMd5() {
     }
     return fullBlobMd5;
   }
-
-  /**
-   * Helper that returns the write thread-pool metrics from the client's 
counters, if available.
-   *
-   * @return the {@link AbfsWriteResourceUtilizationMetrics} instance or 
{@code null} when not present
-   */
-  protected AbfsWriteResourceUtilizationMetrics 
getWriteResourceUtilizationMetrics() {
-    return getAbfsOutputStream().getClient()
-        .getAbfsCounters()
-        .getAbfsWriteResourceUtilizationMetrics();
-  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 5cbe4893b12..c271a5e354a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -46,12 +46,12 @@
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_DOWN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_UP;
 
 /**
  * The Improved Read Buffer Manager for Rest AbfsClient.
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
index 6ddd2e9f6f2..b871afa2802 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -19,17 +19,17 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DOWN;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_NONE;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_UP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_ACTION_NEEDED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_SCALE_DOWN_AT_MIN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_SCALE_UP_AT_MAX;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_DOWN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_UP;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DOWN;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_NONE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_UP;
 
 /**
  * Represents current statistics of the thread pool and system.
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
deleted file mode 100644
index 735b3f276ba..00000000000
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++ /dev/null
@@ -1,963 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
-import 
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
-import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
-
-import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS;
-import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS;
-import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT;
-import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
-import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
-
-  private AbfsConfiguration mockConfig;
-  private static final long HIGH_CPU_UTILIZATION_THRESHOLD = 95;
-  private static final long LOW_CPU_UTILIZATION_THRESHOLD = 5;
-  private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
-  private static final int THREAD_SLEEP_DURATION_MS = 200;
-  private static final String TEST_FILE_PATH = "testFilePath";
-  private static final String TEST_DIR_PATH = "testDirPath";
-  private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
-  private static final int CONCURRENT_REQUEST_COUNT = 15;
-  private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10;
-  private static final int LOW_TIER_MEMORY_MULTIPLIER = 4;
-  private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
-  private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8;
-  private static final int HIGH_CPU_THRESHOLD = 15;
-  private static final int MEDIUM_CPU_THRESHOLD = 10;
-  private static final int LOW_CPU_THRESHOLD = 5;
-  private static final int CPU_MONITORING_INTERVAL = 15;
-  private static final int WAIT_DURATION_MS = 3000;
-  private static final int LATCH_TIMEOUT_SECONDS = 60;
-  private static final int RESIZE_WAIT_TIME_MS = 6_000;
-  private static final long HIGH_CPU_USAGE_RATIO = 95;
-  private static final long LOW_CPU_USAGE_RATIO = 5;
-  private static final int SLEEP_DURATION_MS = 150;
-  private static final int AWAIT_TIMEOUT_SECONDS = 45;
-  private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
-  private static final int WAIT_TIMEOUT_MS = 5000;
-  private static final int SLEEP_DURATION_30S_MS = 30000;
-  private static final int SMALL_PAUSE_MS = 50;
-  private static final int BURST_LOAD = 50;
-  private static final long LOAD_SLEEP_DURATION_MS = 2000;
-
-  TestWriteThreadPoolSizeManager() throws Exception {
-    super.setup();
-  }
-
-  /**
-   * Common setup to prepare a mock configuration for each test.
-   */
-  @BeforeEach
-  public void setUp() {
-    mockConfig = mock(AbfsConfiguration.class);
-    
when(mockConfig.getWriteConcurrentRequestCount()).thenReturn(CONCURRENT_REQUEST_COUNT);
-    
when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME);
-    
when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER);
-    
when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER);
-    
when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER);
-    when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD);
-    
when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
-    when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
-    
when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
-    
when(mockConfig.getWriteLowMemoryUsageThresholdPercent()).thenReturn(LOW_MEMORY_USAGE_THRESHOLD_PERCENT);
-  }
-
-  /**
-   * Verifies that {@link WriteThreadPoolSizeManager#getInstance(String, 
AbfsConfiguration, AbfsCounters)}
-   * returns the same singleton instance for the same filesystem name, and a 
different instance
-   * for a different filesystem name.
-   */
-  @Test
-  void testGetInstanceReturnsSingleton() throws IOException {
-    WriteThreadPoolSizeManager instance1
-        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
-        getFileSystem().getAbfsClient().getAbfsCounters());
-    WriteThreadPoolSizeManager instance2
-        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
-        getFileSystem().getAbfsClient().getAbfsCounters());
-    WriteThreadPoolSizeManager instance3 =
-        WriteThreadPoolSizeManager.getInstance("newFs", mockConfig,
-            getFileSystem().getAbfsClient().getAbfsCounters());
-    Assertions.assertThat(instance1)
-        .as("Expected the same singleton instance for the same key")
-        .isSameAs(instance2);
-    Assertions.assertThat(instance1)
-        .as("Expected the same singleton instance for the same key")
-        .isNotSameAs(instance3);
-  }
-
-  /**
-   * Tests that high CPU usage results in thread pool downscaling.
-   */
-  @Test
-  void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, 
IOException {
-    // Initialize filesystem and thread pool manager
-    Configuration conf = getRawConfiguration();
-    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
-    FileSystem fileSystem = FileSystem.newInstance(conf);
-    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
-      // Get the executor service (ThreadPoolExecutor)
-      WriteThreadPoolSizeManager instance
-          = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-          getAbfsStore(abfs).getAbfsConfiguration(),
-          abfs.getAbfsClient().getAbfsCounters());
-      ExecutorService executor = instance.getExecutorService();
-      ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
-
-      // Simulate high CPU usage (e.g., 95% CPU utilization)
-      int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
-      instance.adjustThreadPoolSizeBasedOnCPU(
-          HIGH_CPU_UTILIZATION_THRESHOLD);  // High CPU
-
-      // Get the new maximum pool size after adjustment
-      int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
-
-      // Assert that the pool size has decreased or is equal to initial 
PoolSize based on high CPU usage
-      Assertions.assertThat(newMaxSize)
-          .as("Expected pool size to decrease under high CPU usage")
-          .isLessThanOrEqualTo(initialMaxSize);
-      instance.close();
-    }
-  }
-
-  /**
-   * Tests that low CPU usage results in thread pool upscaling or remains the 
same.
-   */
-  @Test
-  void testAdjustThreadPoolSizeBasedOnLowCPU()
-      throws InterruptedException, IOException {
-    Configuration conf = getRawConfiguration();
-    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
-    FileSystem fileSystem = FileSystem.newInstance(conf);
-    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
-      WriteThreadPoolSizeManager instance
-          = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-         mockConfig,
-          abfs.getAbfsClient().getAbfsCounters());
-      ExecutorService executor = instance.getExecutorService();
-      int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
-      instance.adjustThreadPoolSizeBasedOnCPU(
-          LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
-      int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
-      Assertions.assertThat(newSize)
-          .as("Expected pool size to increase or stay the same under low CPU 
usage")
-          .isGreaterThanOrEqualTo(initialSize);
-      instance.close();
-    }
-  }
-
-
-  /**
-   * Confirms that the thread pool executor is initialized and not shut down.
-   */
-  @Test
-  void testExecutorServiceIsNotNull() throws IOException {
-    WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig,
-        getFileSystem().getAbfsClient().getAbfsCounters());
-    ExecutorService executor = instance.getExecutorService();
-    Assertions.assertThat(executor).as("Executor service should be 
initialized")
-        .isNotNull();
-    Assertions.assertThat(executor.isShutdown())
-        .as("Executor service should not be shut down")
-        .isFalse();
-    instance.close();
-  }
-
-
-  /**
-   * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up 
resources.
-   */
-  @Test
-  void testCloseCleansUp() throws Exception {
-    WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig,
-        getFileSystem().getAbfsClient().getAbfsCounters());
-    ExecutorService executor = instance.getExecutorService();
-    instance.close();
-    Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
-        .as("Executor service should be shut down or terminated after close()")
-        .isTrue();
-  }
-
-  /**
-   * Test that the CPU monitoring task is scheduled properly when 
startCPUMonitoring() is called.
-   * This test checks the following:
-   * 1. That the CPU monitoring task gets scheduled by verifying that the CPU 
monitor executor is not null.
-   * 2. Ensures that the thread pool executor has at least one thread running, 
confirming that the task is being executed.
-   * @throws InterruptedException if the test is interrupted during the sleep 
time
-   */
-  @Test
-  void testStartCPUMonitoringSchedulesTask()
-      throws InterruptedException, IOException {
-    // Create a new instance of WriteThreadPoolSizeManager using a mock 
configuration
-    WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig,
-        getFileSystem().getAbfsClient().getAbfsCounters());
-
-    // Call startCPUMonitoring to schedule the monitoring task
-    instance.startCPUMonitoring();
-
-    // Wait for a short period to allow the task to run and be scheduled
-    Thread.sleep(THREAD_SLEEP_DURATION_MS);
-
-    // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from 
the instance
-    ScheduledThreadPoolExecutor monitor
-        = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor();
-
-    // Assert that the monitor executor is not null, ensuring that it was 
properly initialized
-    Assertions.assertThat(monitor)
-        .as("CPU Monitor Executor should not be null")
-        .isNotNull();
-
-    // Assert that the thread pool size is greater than 0, confirming that the 
task has been scheduled and threads are active
-    Assertions.assertThat(monitor.getPoolSize())
-        .as("Thread pool size should be greater than 0, indicating that the 
task is running")
-        .isGreaterThan(ZERO);
-    instance.close();
-  }
-
-  /**
-   * Verifies that ABFS write tasks can complete successfully even when the 
system
-   * is under artificial CPU stress. The test also ensures that the write 
thread
-   * pool resizes dynamically under load without leading to starvation, 
overload,
-   * or leftover work in the queue.
-   */
-  @Test
-  void testABFSWritesUnderCPUStress() throws Exception {
-    // Initialize the filesystem and thread pool manager
-    AzureBlobFileSystem fs = getFileSystem();
-    WriteThreadPoolSizeManager instance =
-        WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
-            getConfiguration(), 
getFileSystem().getAbfsClient().getAbfsCounters());
-    ThreadPoolExecutor executor =
-        (ThreadPoolExecutor) instance.getExecutorService();
-
-    // Start CPU monitoring so pool size adjustments happen in response to load
-    instance.startCPUMonitoring();
-
-    // Launch a background thread that generates CPU stress for ~3 seconds.
-    // This simulates contention on the system and should cause the pool to 
adjust.
-    Thread stressThread = new Thread(() -> {
-      long end = System.currentTimeMillis() + WAIT_DURATION_MS;
-      while (System.currentTimeMillis() < end) {
-        // Busy-work loop: repeatedly compute random powers to waste CPU cycles
-        double waste = Math.pow(Math.random(), Math.random());
-      }
-    });
-    stressThread.start();
-
-    // Prepare the ABFS write workload with multiple concurrent tasks
-    int taskCount = 10;
-    CountDownLatch latch = new CountDownLatch(taskCount);
-    Path testFile = new Path(TEST_FILE_PATH);
-    final byte[] b = new byte[TEST_FILE_LENGTH];
-    new Random().nextBytes(b);
-
-    // Submit 10 tasks, each writing to its own file to simulate parallel load
-    for (int i = 0; i < taskCount; i++) {
-      int finalI = i;
-      executor.submit(() -> {
-        try (FSDataOutputStream out = fs.create(
-            new Path(testFile + "_" + finalI), true)) {
-          for (int j = 0; j < 5; j++) {
-            out.write(b); // perform multiple writes to add sustained pressure
-          }
-          out.hflush();   // flush to force actual I/O
-        } catch (IOException e) {
-          // Any failure here indicates pool misbehavior or I/O issues
-          Assertions.fail("Write task failed with exception", e);
-        } finally {
-          // Mark this task as complete
-          latch.countDown();
-        }
-      });
-    }
-
-    // Wait for all tasks to finish (up to 60s timeout to guard against 
deadlock/starvation)
-    boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-    // Record the pool size after CPU stress to confirm resizing took place
-    int resizedPoolSize = executor.getMaximumPoolSize();
-
-    // 1. All tasks must finish within timeout → proves no starvation or 
deadlock
-    Assertions.assertThat(finished)
-        .as("All ABFS write tasks should complete without starvation")
-        .isTrue();
-
-    // 2. Pool size must fall within valid bounds → proves resizing occurred
-    Assertions.assertThat(resizedPoolSize)
-        .as("Thread pool size should dynamically adjust under CPU stress")
-        .isBetween(1, 
getAbfsStore(fs).getAbfsConfiguration().getWriteConcurrentRequestCount());
-
-    // 3. Task queue must be empty → proves no backlog remains after workload
-    Assertions.assertThat(executor.getQueue().size())
-        .as("No backlog should remain in task queue after completion")
-        .isEqualTo(0);
-
-    // Cleanup resources
-    instance.close();
-  }
-
-
-  /**
-   * Ensures that dynamic thread pool resizing during an active ABFS write 
workload
-   * does not cause deadlocks, task loss, or task duplication. The test also 
verifies
-   * that the pool resizes while work is in progress and that the executor 
queue
-   * eventually drains cleanly.
-   */
-  @Test
-  void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
-    // Initialize filesystem and thread pool manager
-    AzureBlobFileSystem fs = getFileSystem();
-    WriteThreadPoolSizeManager mgr =
-        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig,
-            getFileSystem().getAbfsClient().getAbfsCounters());
-    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
mgr.getExecutorService();
-
-    // Enable monitoring (may not be required if adjust() is triggered 
internally)
-    mgr.startCPUMonitoring();
-
-    // Test configuration: enough tasks and writes to stress the pool
-    final int taskCount = 10;
-    final int writesPerTask = 5;
-    final byte[] b = new byte[TEST_FILE_LENGTH];
-    new Random().nextBytes(b);
-    final Path base = new Path(TEST_DIR_PATH);
-    fs.mkdirs(base);
-
-    // Barrier ensures all tasks start together, so resizing happens mid-flight
-    final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1);
-    final CountDownLatch done = new CountDownLatch(taskCount);
-
-    // Track execution results
-    final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); // 
mark tasks once
-    final AtomicInteger duplicates = new AtomicInteger(0);                  // 
guard against double-completion
-    final AtomicInteger rejected = new AtomicInteger(0);                    // 
count unexpected rejections
-
-    // Submit ABFS write tasks
-    for (int i = 0; i < taskCount; i++) {
-      final int id = i;
-      try {
-        executor.submit(() -> {
-          try {
-            // Hold until all tasks are enqueued, then start together
-            startBarrier.await(10, TimeUnit.SECONDS);
-
-            // Each task writes to its own file, flushing intermittently
-            Path subPath = new Path(base, "part-" + id);
-            try (FSDataOutputStream out = fs.create(subPath)) {
-              for (int w = 0; w < writesPerTask; w++) {
-                out.write(b);
-                if ((w & 1) == 1) {
-                  out.hflush(); // force some syncs to increase contention
-                }
-              }
-              out.hflush();
-            }
-
-            // Mark task as completed once; duplicates flag if it happens again
-            if (!completed.compareAndSet(id, 0, 1)) {
-              duplicates.incrementAndGet();
-            }
-          } catch (Exception e) {
-            Assertions.fail("ABFS write task " + id + " failed", e);
-          } finally {
-            done.countDown();
-          }
-        });
-      } catch (RejectedExecutionException rex) {
-        rejected.incrementAndGet();
-      }
-    }
-
-    // Thread that simulates fluctuating CPU load while tasks are running
-    final AtomicInteger observedMinMax = new 
AtomicInteger(executor.getMaximumPoolSize());
-    final AtomicInteger observedMaxMax = new 
AtomicInteger(executor.getMaximumPoolSize());
-
-    Thread resizer = new Thread(() -> {
-      try {
-        // Release worker tasks
-        startBarrier.await(10, TimeUnit.SECONDS);
-
-        long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep 
resizing for ~6s
-        boolean high = true;
-        while (System.currentTimeMillis() < end) {
-          // Alternate between high load (shrink) and low load (expand)
-          if (high) {
-            mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO);
-          } else {
-            mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO);
-          }
-          high = !high;
-
-          // Track observed pool size bounds to prove resizing occurred
-          int cur = executor.getMaximumPoolSize();
-          observedMinMax.updateAndGet(prev -> Math.min(prev, cur));
-          observedMaxMax.updateAndGet(prev -> Math.max(prev, cur));
-
-          Thread.sleep(SLEEP_DURATION_MS);
-        }
-      } catch (Exception ignore) {
-        // No-op: this is best-effort simulation
-      }
-    }, "resizer-thread");
-
-    resizer.start();
-
-    // Wait for all tasks to finish (ensures no deadlock)
-    boolean finished = done.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-    // Join resizer thread
-    resizer.join(RESIZER_JOIN_TIMEOUT_MS);
-
-    // 1. All tasks must complete in time → proves there are no deadlocks
-    Assertions.assertThat(finished)
-        .as("All tasks must complete within timeout (no deadlock)")
-        .isTrue();
-
-    // 2. Every task should complete exactly once → proves no task loss
-    int completedCount = 0;
-    for (int i = 0; i < taskCount; i++) {
-      completedCount += completed.get(i);
-    }
-    Assertions.assertThat(completedCount)
-        .as("Every task should complete exactly once (no task loss)")
-        .isEqualTo(taskCount);
-
-    // 3. No task should mark itself as done more than once → proves no 
duplication
-    Assertions.assertThat(duplicates.get())
-        .as("No task should report completion more than once (no duplication)")
-        .isZero();
-
-    // 4. The executor should not reject tasks while resizing is happening
-    Assertions.assertThat(rejected.get())
-        .as("Tasks should not be rejected during active resizing")
-        .isZero();
-
-    // 5. Executor queue should eventually empty once all tasks finish
-    Assertions.assertThat(executor.getQueue().size())
-        .as("Executor queue should drain after workload")
-        .isEqualTo(0);
-
-    // 6. Executor should still be running after workload until explicitly 
closed
-    Assertions.assertThat(executor.isShutdown())
-        .as("Executor should remain running until manager.close()")
-        .isFalse();
-
-    // 7. Verify that resizing actually occurred (pool max both grew and 
shrank)
-    int minObserved = observedMinMax.get();
-    int maxObserved = observedMaxMax.get();
-
-    Assertions.assertThat(maxObserved)
-        .as("Pool maximum size should have increased or fluctuated above 
baseline")
-        .isGreaterThan(0);
-
-    Assertions.assertThat(minObserved)
-        .as("Pool maximum size should have dropped during resizing")
-        .isLessThanOrEqualTo(maxObserved);
-
-    // Cleanup
-    for (int i = 0; i < taskCount; i++) {
-      Path p = new Path(base, "part-" + i);
-      try {
-        fs.delete(p, false);
-      } catch (IOException ignore) {
-        // Ignored: delete failures are non-fatal for test cleanup
-      }
-    }
-    try {
-      fs.delete(base, true);
-    } catch (IOException ignore) {
-      // Ignored: cleanup failures are non-fatal in tests
-    }
-    mgr.close();
-  }
-
-
-
-  /**
-   * Verifies that when the system experiences high CPU usage,
-   * the WriteThreadPoolSizeManager detects the load and reduces
-   * the maximum thread pool size accordingly.
-   */
-  @Test
-  void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
-    // Initialize filesystem and thread pool manager
-    try (FileSystem fileSystem = 
FileSystem.newInstance(getRawConfiguration())) {
-      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
-      WriteThreadPoolSizeManager instance =
-          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-              getConfiguration(), 
getFileSystem().getAbfsClient().getAbfsCounters());
-      ThreadPoolExecutor executor =
-          (ThreadPoolExecutor) instance.getExecutorService();
-
-      // Start monitoring CPU load
-      instance.startCPUMonitoring();
-
-      // Capture baseline pool size for comparison later
-      int initialMax = executor.getMaximumPoolSize();
-
-      // Define a CPU-bound task: tight loop of math ops for ~5s
-      Runnable cpuBurn = () -> {
-        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
-        while (System.currentTimeMillis() < end) {
-          double waste = Math.sin(Math.random()) * Math.cos(Math.random());
-        }
-      };
-
-      // Launch two CPU hogs in parallel
-      Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1");
-      Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2");
-      cpuHog1.start();
-      cpuHog2.start();
-
-      // Submit multiple write tasks while CPU is under stress
-      int taskCount = 10;
-      CountDownLatch latch = new CountDownLatch(taskCount);
-      Path base = new Path(TEST_DIR_PATH);
-      abfs.mkdirs(base);
-      final byte[] buffer = new byte[TEST_FILE_LENGTH];
-      new Random().nextBytes(buffer);
-
-      for (int i = 0; i < taskCount; i++) {
-        final Path part = new Path(base, "part-" + i);
-        executor.submit(() -> {
-          try (FSDataOutputStream out = abfs.create(part, true)) {
-            for (int j = 0; j < 5; j++) {
-              out.write(buffer);
-              out.hflush();
-            }
-          } catch (IOException e) {
-            Assertions.fail("Write task failed under CPU stress", e);
-          } finally {
-            latch.countDown();
-          }
-        });
-      }
-
-      // Ensure all tasks complete (avoid deadlock/starvation)
-      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-      // Wait for CPU hogs to end and give monitor time to react
-      cpuHog1.join();
-      cpuHog2.join();
-      Thread.sleep(SLEEP_DURATION_30S_MS);
-
-      int resizedMax = executor.getMaximumPoolSize();
-
-      // Verify outcomes:
-      // 1. All write tasks succeeded despite CPU pressure
-      Assertions.assertThat(finished)
-          .as("All ABFS write tasks must complete despite CPU stress")
-          .isTrue();
-
-      // 2. Thread pool scaled down as expected
-      Assertions.assertThat(resizedMax)
-          .as("Thread pool should scale down under high CPU load")
-          .isLessThanOrEqualTo(initialMax);
-
-      // 3. No leftover tasks in the queue
-      Assertions.assertThat(executor.getQueue().size())
-          .as("No backlog should remain in the queue after workload")
-          .isEqualTo(0);
-
-      // Cleanup test data
-      for (int i = 0; i < taskCount; i++) {
-        try {
-          abfs.delete(new Path(base, "part-" + i), false);
-        } catch (IOException ignore) {
-          // Ignored: cleanup failures are non-fatal in tests
-        }
-      }
-      try {
-        abfs.delete(base, true);
-      } catch (IOException ignore) {
-        // Ignored: cleanup failures are non-fatal in tests
-      }
-      instance.close();
-    }
-  }
-
-
-  /**
-   * Verifies that when two parallel high memory–consuming workloads run,
-   * the WriteThreadPoolSizeManager detects the memory pressure and
-   * scales down the maximum thread pool size.
-   */
-  @Test
-  void testScalesDownOnParallelHighMemoryLoad() throws Exception {
-    // Initialize filesystem and thread pool manager
-    try (FileSystem fileSystem = 
FileSystem.newInstance(getRawConfiguration())) {
-      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
-      WriteThreadPoolSizeManager instance =
-          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-              getConfiguration(), 
getFileSystem().getAbfsClient().getAbfsCounters());
-      ThreadPoolExecutor executor =
-          (ThreadPoolExecutor) instance.getExecutorService();
-
-      // Begin monitoring resource usage (CPU + memory)
-      instance.startCPUMonitoring();
-
-      // Capture the initial thread pool size for later comparison
-      int initialMax = executor.getMaximumPoolSize();
-
-      // Define a workload that continuously allocates memory (~5 MB chunks)
-      // for ~5 seconds to simulate memory pressure in the JVM.
-      Runnable memoryBurn = () -> {
-        List<byte[]> allocations = new ArrayList<>();
-        long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
-        while (System.currentTimeMillis() < end) {
-          try {
-            allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB
-            Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM
-          } catch (OutOfMemoryError oom) {
-            // Clear allocations if JVM runs out of memory and continue
-            allocations.clear();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-      };
-
-      // Start two threads running the memory hog workload in parallel
-      Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1");
-      Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2");
-      memHog1.start();
-      memHog2.start();
-
-      // Submit several write tasks to ABFS while memory is under stress
-      int taskCount = 10;
-      CountDownLatch latch = new CountDownLatch(taskCount);
-      Path base = new Path(TEST_DIR_PATH);
-      abfs.mkdirs(base);
-      final byte[] buffer = new byte[TEST_FILE_LENGTH];
-      new Random().nextBytes(buffer);
-
-      for (int i = 0; i < taskCount; i++) {
-        final Path part = new Path(base, "part-" + i);
-        executor.submit(() -> {
-          try (FSDataOutputStream out = abfs.create(part, true)) {
-            for (int j = 0; j < 5; j++) {
-              out.write(buffer);
-              out.hflush();
-            }
-          } catch (IOException e) {
-            Assertions.fail("Write task failed under memory stress", e);
-          } finally {
-            latch.countDown();
-          }
-        });
-      }
-
-      // Ensure all tasks finish within a timeout
-      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
-      // Wait for memory hog threads to finish
-      memHog1.join();
-      memHog2.join();
-
-      // Give monitoring thread time to detect memory pressure and react
-      Thread.sleep(SLEEP_DURATION_30S_MS);
-
-      int resizedMax = executor.getMaximumPoolSize();
-
-      // Validate that:
-      // 1. All ABFS writes succeeded despite memory stress
-      Assertions.assertThat(finished)
-          .as("All ABFS write tasks must complete despite parallel memory 
stress")
-          .isTrue();
-
-      // 2. The thread pool scaled down under memory pressure
-      Assertions.assertThat(resizedMax)
-          .as("Thread pool should scale down under parallel high memory load")
-          .isLessThanOrEqualTo(initialMax);
-
-      // 3. No tasks remain queued after workload completion
-      Assertions.assertThat(executor.getQueue().size())
-          .as("No backlog should remain in the queue after workload")
-          .isEqualTo(0);
-
-      // Clean up temporary test files
-      for (int i = 0; i < taskCount; i++) {
-        try {
-          abfs.delete(new Path(base, "part-" + i), false);
-        } catch (IOException ignore) {
-          // Ignored: cleanup failures are non-fatal in tests
-        }
-      }
-      try {
-        abfs.delete(base, true);
-      } catch (IOException ignore) {
-        // Ignored: cleanup failures are non-fatal in tests
-      }
-      instance.close();
-    }
-  }
-
-  /**
-   * Test that after a long idle period, the thread pool
-   * can quickly scale up in response to a sudden burst of load
-   * without performance degradation.
-   */
-  @Test
-  void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
-    // Initialize filesystem and thread pool manager
-    try (FileSystem fileSystem = FileSystem.newInstance(
-        getRawConfiguration())) {
-      AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
-      WriteThreadPoolSizeManager instance = 
WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-          abfs.getAbfsStore().getAbfsConfiguration(), 
getFileSystem().getAbfsClient().getAbfsCounters());
-      ThreadPoolExecutor executor =
-          (ThreadPoolExecutor) instance.getExecutorService();
-
-      // --- Step 1: Simulate idle period ---
-      // Let the executor sit idle with no work for a few seconds
-      Thread.sleep(WAIT_TIMEOUT_MS);
-      int poolSizeAfterIdle = executor.getPoolSize();
-
-      // Verify that after idling, the pool is at or close to its minimum size
-      Assertions.assertThat(poolSizeAfterIdle)
-          .as("Pool size should remain minimal after idle")
-          .isLessThanOrEqualTo(executor.getCorePoolSize());
-
-      // --- Step 2: Submit a sudden burst of tasks ---
-      // Launch many short, CPU-heavy tasks at once to simulate burst load
-      int burstLoad = BURST_LOAD;
-      CountDownLatch latch = new CountDownLatch(burstLoad);
-      for (int i = 0; i < burstLoad; i++) {
-        executor.submit(() -> {
-          // Busy loop for ~200ms to simulate CPU work
-          long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS;
-          while (System.currentTimeMillis() < end) {
-            Math.sqrt(Math.random()); // burn CPU cycles
-          }
-          latch.countDown();
-        });
-      }
-
-      // --- Step 3: Give pool time to react ---
-      // Wait briefly so the pool’s scaling logic has a chance to expand
-      Thread.sleep(LOAD_SLEEP_DURATION_MS);
-      int poolSizeDuringBurst = executor.getPoolSize();
-
-      // Verify that the pool scaled up compared to idle
-      Assertions.assertThat(poolSizeDuringBurst)
-          .as("Pool size should increase after burst load")
-          .isGreaterThanOrEqualTo(poolSizeAfterIdle);
-
-// --- Step 4: Verify completion ---
-// Ensure all tasks complete successfully in a reasonable time,
-// proving there was no degradation or deadlock under burst load
-      Assertions.assertThat(
-              latch.await(LATCH_TIMEOUT_SECONDS / 2, TimeUnit.SECONDS))
-          .as("All burst tasks should finish in reasonable time")
-          .isTrue();
-      instance.close();
-    }
-  }
-
-  /**
-   * Verifies that when the system experiences low CPU usage,
-   * the WriteThreadPoolSizeManager maintains the thread pool size
-   * without scaling down and updates the corresponding
-   * write thread pool metrics accordingly.
-   */
-  @Test
-  void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
-      throws Exception {
-    // Initialize filesystem and thread pool manager
-    Configuration conf = getRawConfiguration();
-    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
-    conf.setInt(AZURE_WRITE_MAX_CONCURRENT_REQUESTS, 2);
-    conf.setInt(FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT, 10);
-    conf.setInt(FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
-    FileSystem fileSystem = FileSystem.newInstance(conf);
-    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
-      WriteThreadPoolSizeManager instance =
-          WriteThreadPoolSizeManager.getInstance("fs1",
-              abfs.getAbfsStore().getAbfsConfiguration(),
-              abfs.getAbfsClient().getAbfsCounters());
-      instance.startCPUMonitoring();
-
-      // --- Capture initial metrics and stats ---
-      AbfsWriteResourceUtilizationMetrics metrics =
-          abfs.getAbfsClient()
-              .getAbfsCounters()
-              .getAbfsWriteResourceUtilizationMetrics();
-
-      WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
-          instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
-              ResourceUtilizationUtils.getMemoryLoad(),
-              ResourceUtilizationUtils.getUsedHeapMemory(),
-              ResourceUtilizationUtils.getAvailableHeapMemory(),
-              ResourceUtilizationUtils.getCommittedHeapMemory());
-
-      ThreadPoolExecutor executor =
-          (ThreadPoolExecutor) instance.getExecutorService();
-
-      // No CPU hogs this time — simulate light CPU load
-      // Submit lightweight ABFS tasks that barely use CPU
-      int taskCount = 10;
-      CountDownLatch latch = new CountDownLatch(taskCount);
-
-      for (int i = 0; i < taskCount; i++) {
-        executor.submit(() -> {
-          try {
-            // Light operations — minimal CPU load
-            for (int j = 0; j < 3; j++) {
-              Thread.sleep(HUNDRED); // simulate idle/light wait
-            }
-          } catch (Exception e) {
-            Assertions.fail("Light task failed unexpectedly", e);
-          } finally {
-            latch.countDown();
-          }
-        });
-      }
-
-      // Wait for all tasks to finish
-      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-      Assertions.assertThat(finished)
-          .as("All lightweight tasks should complete normally")
-          .isTrue();
-
-      // Allow some time for monitoring and metrics update
-      Thread.sleep(SLEEP_DURATION_30S_MS);
-
-      WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
-          instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
-              ResourceUtilizationUtils.getMemoryLoad(),
-              ResourceUtilizationUtils.getUsedHeapMemory(),
-              ResourceUtilizationUtils.getAvailableHeapMemory(),
-              ResourceUtilizationUtils.getCommittedHeapMemory());
-
-      //--- Validate that metrics and stats changed ---
-      Assertions.assertThat(statsAfter)
-          .as("Thread pool stats should update after CPU load")
-          .isNotEqualTo(statsBefore);
-
-      String metricsOutput = metrics.toString();
-
-      if (!metricsOutput.isEmpty()) {
-        // Assertions for metrics correctness
-        Assertions.assertThat(metricsOutput)
-            .as("Metrics output should not be empty")
-            .isNotEmpty();
-
-        Assertions.assertThat(metricsOutput)
-            .as("Metrics must include CPU utilization data")
-            .contains("SC=");
-
-        Assertions.assertThat(metricsOutput)
-            .as("Metrics must include memory utilization data")
-            .contains("AM=");
-
-        Assertions.assertThat(metricsOutput)
-            .as("Metrics must include current thread pool size")
-            .contains("CP=");
-      }
-      instance.close();
-    }
-  }
-
-  /**
-   * Verifies that the JVM identifier is initialized once and remains
-   * constant across multiple invocations within the same JVM process.
-   */
-  @Test
-  public void testJvmIdIsSingletonWithinJvm() {
-    int firstId = JvmUniqueIdProvider.getJvmId();
-    int secondId = JvmUniqueIdProvider.getJvmId();
-    int thirdId = JvmUniqueIdProvider.getJvmId();
-
-    assertEquals(firstId, secondId,
-        "Subsequent calls to getJvmId() should return the same value");
-    assertEquals(secondId, thirdId,
-        "JVM-scoped identifier must remain constant for the lifetime of the 
JVM");
-  }
-
-  /**
-   * Verifies that the JVM identifier is safely shared across multiple threads
-   * and that concurrent access returns the same value.
-   *
-   * <p>This test ensures that static initialization of the identifier is
-   * thread-safe and occurs only once per JVM.</p>
-   */
-  @Test
-  public void testJvmIdIsSameAcrossThreads()
-      throws ExecutionException, InterruptedException {
-
-    ExecutorService executor = Executors.newFixedThreadPool(4);
-
-    try {
-      Callable<Integer> task = JvmUniqueIdProvider::getJvmId;
-      Future<Integer> f1 = executor.submit(task);
-      Future<Integer> f2 = executor.submit(task);
-      Future<Integer> f3 = executor.submit(task);
-      Future<Integer> f4 = executor.submit(task);
-
-      int expectedId = f1.get();
-      assertEquals(expectedId, f2.get(),
-          "JVM ID should be identical when accessed from different threads");
-      assertEquals(expectedId, f3.get(),
-          "JVM ID should be identical when accessed concurrently");
-      assertEquals(expectedId, f4.get(),
-          "JVM ID should be initialized once and shared across all threads");
-    } finally {
-      executor.shutdown();
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-  }
-}
-
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 152c6a43f4c..fbf085d1ec0 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -110,7 +110,7 @@ public void testMaxRequestsAndQueueCapacityDefaults() 
throws Exception {
     AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
 
       int maxConcurrentRequests
-          = getConfiguration().getWriteConcurrentRequestCount();
+          = getConfiguration().getWriteMaxConcurrentRequestCount();
       if (stream.isAppendBlobStream()) {
         maxConcurrentRequests = 1;
       }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 2129f5e20fc..ec080369e19 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -98,7 +98,7 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             .disableOutputStreamFlush(disableOutputStreamFlush)
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
-            
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteConcurrentRequestCount())
+            
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
             .withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
             .withClientHandler(clientHandler)
             .withPath(path)
@@ -125,7 +125,6 @@ public void verifyShortWriteRequest() throws Exception {
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.getAbfsConfiguration()).thenReturn(abfsConf);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
     when(client.append(anyString(), any(byte[].class),
         any(AppendRequestParameters.class), any(),
         any(), any(TracingContext.class)))
@@ -197,7 +196,6 @@ public void verifyWriteRequest() throws Exception {
         "test-fs-id", FSOperationType.WRITE,
         TracingHeaderFormat.ALL_ID_FORMAT, null);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), 
any(AppendRequestParameters.class), any(), any(), 
any(TracingContext.class))).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), 
any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
@@ -284,7 +282,6 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws 
Exception {
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
 
 
     AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream(
@@ -367,7 +364,6 @@ public void verifyWriteRequestOfBufferSize() throws 
Exception {
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
 
     AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
         populateAbfsOutputStreamContext(
@@ -431,7 +427,6 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() 
throws Exception {
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
     AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
         populateAbfsOutputStreamContext(
             BUFFER_SIZE,
@@ -492,7 +487,6 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() 
throws Exception {
         abfsConf.getClientCorrelationId(), "test-fs-id",
         FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class),
         any(AppendRequestParameters.class), any(), any(), 
any(TracingContext.class)))
@@ -569,7 +563,6 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws 
Exception {
     abfsConf = new AbfsConfiguration(conf, accountName1);
     when(client.getAbfsConfiguration()).thenReturn(abfsConf);
     when(client.getAbfsCounters()).thenReturn(abfsCounters);
-    
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
 AbfsWriteResourceUtilizationMetrics());
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, 
abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class),
@@ -627,7 +620,7 @@ private ExecutorService createExecutorService(
       AbfsConfiguration abfsConf) {
     ExecutorService executorService =
         new 
SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
-            abfsConf.getWriteConcurrentRequestCount(),
+            abfsConf.getWriteMaxConcurrentRequestCount(),
             abfsConf.getMaxWriteRequestsToQueue(),
             10L, TimeUnit.SECONDS,
             "abfs-test-bounded"),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to