This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5cd20b9e30a HADOOP-19472. [ABFS] Remove write aggressiveness
optimization (#8141)
5cd20b9e30a is described below
commit 5cd20b9e30a6220fbda8af80be48c851aaf57867
Author: Anmol Asrani <[email protected]>
AuthorDate: Tue Jan 27 15:56:34 2026 +0530
HADOOP-19472. [ABFS] Remove write aggressiveness optimization (#8141)
Contributed by Anmol Asrani
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 104 +--
.../hadoop/fs/azurebfs/AbfsCountersImpl.java | 23 -
.../fs/azurebfs/AzureBlobFileSystemStore.java | 34 +-
.../fs/azurebfs/WriteThreadPoolSizeManager.java | 568 ------------
.../fs/azurebfs/constants/AbfsHttpConstants.java | 2 -
.../fs/azurebfs/constants/ConfigurationKeys.java | 37 -
.../constants/FileSystemConfigurations.java | 136 ---
.../fs/azurebfs/constants/MetricsConstants.java | 33 +
.../AbfsWriteResourceUtilizationMetricsEnum.java | 107 ---
.../hadoop/fs/azurebfs/services/AbfsClient.java | 5 -
.../hadoop/fs/azurebfs/services/AbfsCounters.java | 4 -
.../fs/azurebfs/services/AbfsOutputStream.java | 26 -
.../azurebfs/services/AbfsOutputStreamContext.java | 14 -
.../AbfsWriteResourceUtilizationMetrics.java | 116 ---
.../azurebfs/services/AzureBlobIngressHandler.java | 9 -
.../azurebfs/services/AzureDFSIngressHandler.java | 9 -
.../AzureDfsToBlobIngressFallbackHandler.java | 9 -
.../fs/azurebfs/services/AzureIngressHandler.java | 11 -
.../fs/azurebfs/services/ReadBufferManagerV2.java | 10 +-
.../services/ResourceUtilizationStats.java | 22 +-
.../azurebfs/TestWriteThreadPoolSizeManager.java | 963 ---------------------
.../azurebfs/services/ITestAbfsOutputStream.java | 2 +-
.../fs/azurebfs/services/TestAbfsOutputStream.java | 11 +-
23 files changed, 65 insertions(+), 2190 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 6c5929d2b90..6c95bd37689 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -541,62 +541,6 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;
- @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
- DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
- private boolean dynamicWriteThreadPoolEnablement;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS,
- DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS)
- private int writeThreadPoolKeepAliveTime;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
- MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
- MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
- DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS)
- private int writeCpuMonitoringInterval;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
- MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
- MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
- DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT)
- private int writeHighCpuThreshold;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
- MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
- MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
- DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT)
- private int writeMediumCpuThreshold;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
- MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
- DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
- private int writeLowCpuThreshold;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
- MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
- DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
- private int lowTierMemoryMultiplier;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
- MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
- DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
- private int mediumTierMemoryMultiplier;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
- MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
- DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
- private int highTierMemoryMultiplier;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
- FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT,
- DefaultValue = DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT)
- private int writeHighMemoryUsageThresholdPercent;
-
- @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
- FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT,
- DefaultValue = DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT)
- private int writeLowMemoryUsageThresholdPercent;
-
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue =
DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue =
MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
@@ -1955,60 +1899,16 @@ public ExponentialRetryPolicy
getOauthTokenFetchRetryPolicy() {
oauthTokenFetchRetryDeltaBackoff);
}
- public int getWriteConcurrentRequestCount() {
+ public int getWriteMaxConcurrentRequestCount() {
if (this.writeMaxConcurrentRequestCount < 1) {
return 4 * Runtime.getRuntime().availableProcessors();
}
return this.writeMaxConcurrentRequestCount;
}
- public int getWriteThreadPoolKeepAliveTime() {
- return writeThreadPoolKeepAliveTime;
- }
-
- public int getWriteCpuMonitoringInterval() {
- return writeCpuMonitoringInterval;
- }
-
- public boolean isDynamicWriteThreadPoolEnablement() {
- return dynamicWriteThreadPoolEnablement;
- }
-
- public int getWriteLowCpuThreshold() {
- return writeLowCpuThreshold;
- }
-
- public int getWriteMediumCpuThreshold() {
- return writeMediumCpuThreshold;
- }
-
- public int getWriteHighCpuThreshold() {
- return writeHighCpuThreshold;
- }
-
- public int getLowTierMemoryMultiplier() {
- return lowTierMemoryMultiplier;
- }
-
- public int getMediumTierMemoryMultiplier() {
- return mediumTierMemoryMultiplier;
- }
-
- public int getHighTierMemoryMultiplier() {
- return highTierMemoryMultiplier;
- }
-
- public int getWriteHighMemoryUsageThresholdPercent() {
- return writeHighMemoryUsageThresholdPercent;
- }
-
- public int getWriteLowMemoryUsageThresholdPercent() {
- return writeLowMemoryUsageThresholdPercent;
- }
-
public int getMaxWriteRequestsToQueue() {
if (this.maxWriteRequestsToQueue < 1) {
- return 2 * getWriteConcurrentRequestCount();
+ return 2 * getWriteMaxConcurrentRequestCount();
}
return this.maxWriteRequestsToQueue;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 363ed34025a..91f83302dd4 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
import
org.apache.hadoop.fs.azurebfs.services.AbfsReadResourceUtilizationMetrics;
-import
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -109,8 +108,6 @@ public class AbfsCountersImpl implements AbfsCounters {
private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
- private AbfsWriteResourceUtilizationMetrics
abfsWriteResourceUtilizationMetrics = null;
-
private AbfsReadResourceUtilizationMetrics
abfsReadResourceUtilizationMetrics = null;
private AtomicLong lastExecutionTime = null;
@@ -188,18 +185,6 @@ public void initializeReadResourceUtilizationMetrics() {
abfsReadResourceUtilizationMetrics = new
AbfsReadResourceUtilizationMetrics();
}
- /**
- * Initializes the metrics collector for the write thread pool.
- * <p>
- * This method creates a new instance of {@link
AbfsWriteResourceUtilizationMetrics}
- * to track performance statistics and operational metrics related to
- * write operations executed by the thread pool.
- * </p>
- */
- public void initializeWriteResourceUtilizationMetrics() {
- abfsWriteResourceUtilizationMetrics = new
AbfsWriteResourceUtilizationMetrics();
- }
-
@Override
public void initializeMetrics(final MetricFormat metricFormat,
@@ -315,14 +300,6 @@ public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
return abfsReadFooterMetrics != null ? abfsReadFooterMetrics : null;
}
- /**
- * Returns the write thread pool metrics instance, or {@code null} if
uninitialized.
- */
- @Override
- public AbfsWriteResourceUtilizationMetrics
getAbfsWriteResourceUtilizationMetrics() {
- return abfsWriteResourceUtilizationMetrics != null ?
abfsWriteResourceUtilizationMetrics : null;
- }
-
/**
* Returns the read thread pool metrics instance, or {@code null} if
uninitialized.
*/
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 6ec5c51eb1d..055cee06256 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -45,7 +45,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -210,7 +209,6 @@ public class AzureBlobFileSystemStore implements Closeable,
ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
- private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
@@ -286,18 +284,11 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
- if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
- this.writeThreadPoolSizeManager = WriteThreadPoolSizeManager.getInstance(
- getClient().getFileSystem() + "-" + UUID.randomUUID(),
- abfsConfiguration, getClient().getAbfsCounters());
- this.boundedThreadPool = writeThreadPoolSizeManager.getExecutorService();
- } else {
- this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
- abfsConfiguration.getWriteConcurrentRequestCount(),
- abfsConfiguration.getMaxWriteRequestsToQueue(),
- 10L, TimeUnit.SECONDS,
- "abfs-bounded");
- }
+ this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ abfsConfiguration.getWriteMaxConcurrentRequestCount(),
+ abfsConfiguration.getMaxWriteRequestsToQueue(),
+ 10L, TimeUnit.SECONDS,
+ "abfs-bounded");
}
/**
@@ -336,19 +327,17 @@ public void close() throws IOException {
}
try {
Futures.allAsList(futures).get();
- if (!abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
- // shutdown the threadPool and set it to null.
- HadoopExecutors.shutdown(boundedThreadPool, LOG,
- 30, TimeUnit.SECONDS);
- boundedThreadPool = null;
- }
+ // shutdown the threadPool and set it to null.
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ 30, TimeUnit.SECONDS);
+ boundedThreadPool = null;
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, writeThreadPoolSizeManager,
getClientHandler());
+ IOUtils.cleanupWithLogger(LOG, getClientHandler());
}
}
@@ -817,7 +806,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
-
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteConcurrentRequestCount())
+
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withEncryptionAdapter(contextEncryptionAdapter)
@@ -829,7 +818,6 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.withPath(path)
.withExecutorService(new
SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
- .withWriteThreadPoolManager(writeThreadPoolSizeManager)
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(clientHandler.getIngressServiceType())
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
deleted file mode 100644
index de11037780b..00000000000
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
-import
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
-import org.apache.hadoop.fs.azurebfs.services.ResourceUtilizationStats;
-import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
-
-import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
-import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-
-/**
- * Manages a thread pool for writing operations, adjusting the pool size based
on CPU utilization.
- */
-public final class WriteThreadPoolSizeManager implements Closeable {
-
- /* Maximum allowed size for the thread pool. */
- private final int maxThreadPoolSize;
- /* Executor for periodically monitoring CPU usage. */
- private final ScheduledExecutorService cpuMonitorExecutor;
- /* Thread pool whose size is dynamically managed. */
- private volatile ExecutorService boundedThreadPool;
- /* Lock to ensure thread-safe updates to the thread pool. */
- private final Lock lock = new ReentrantLock();
- /* New computed max size for the thread pool after adjustment. */
- private volatile int newMaxPoolSize;
- /* Logger instance for logging events from WriteThreadPoolSizeManager. */
- private static final Logger LOG = LoggerFactory.getLogger(
- WriteThreadPoolSizeManager.class);
- /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
- private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
- POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
- /* Name of the filesystem associated with this manager. */
- private final String filesystemName;
- /* Initial size for the thread pool when created. */
- private final int initialPoolSize;
- /* The configuration instance. */
- private final AbfsConfiguration abfsConfiguration;
- /* Metrics collector for monitoring the performance of the ABFS write thread
pool. */
- private final AbfsWriteResourceUtilizationMetrics writeThreadPoolMetrics;
- /* Flag indicating if CPU monitoring has started. */
- private volatile boolean isMonitoringStarted = false;
- /* Tracks the last scale direction applied, or empty if none. */
- private volatile String lastScaleDirection = EMPTY_STRING;
- /* Maximum CPU utilization observed during the monitoring interval. */
- private volatile long maxJvmCpuUtilization = 0L;
- /** High memory usage threshold used to trigger thread pool downscaling. */
- private final long highMemoryThreshold;
- /** Low memory usage threshold used to allow thread pool upscaling. */
- private final long lowMemoryThreshold;
-
- /**
- * Private constructor to initialize the write thread pool and CPU monitor
executor
- * based on system resources and ABFS configuration.
- *
- * @param filesystemName Name of the ABFS filesystem.
- * @param abfsConfiguration Configuration containing pool size parameters.
- * @param abfsCounters ABFS counters instance used for
metrics.
- */
- private WriteThreadPoolSizeManager(String filesystemName,
- AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
- /* Retrieves and assigns the write thread pool metrics from the ABFS
client counters. */
- this.writeThreadPoolMetrics =
abfsCounters.getAbfsWriteResourceUtilizationMetrics();
- this.filesystemName = filesystemName;
- this.abfsConfiguration = abfsConfiguration;
- int availableProcessors = Runtime.getRuntime().availableProcessors();
- /* Compute the max pool size */
- int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors,
ResourceUtilizationUtils.getAvailableMaxHeapMemory());
-
- /* Get the initial pool size from config, fallback to at least 1 */
- this.initialPoolSize = Math.max(1,
- abfsConfiguration.getWriteConcurrentRequestCount());
-
- /* Set the upper bound for the thread pool size */
- this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
- AtomicInteger threadCount = new AtomicInteger(1);
- this.boundedThreadPool = Executors.newFixedThreadPool(
- initialPoolSize,
- r -> {
- Thread t = new Thread(r);
- t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
- return t;
- }
- );
- ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
- int keepAlive = Math.max(1,
abfsConfiguration.getWriteThreadPoolKeepAliveTime());
- executor.setKeepAliveTime(keepAlive, TimeUnit.SECONDS);
- executor.allowCoreThreadTimeOut(true);
- /* Create a scheduled executor for CPU monitoring and pool adjustment */
- this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
- highMemoryThreshold =
abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
- lowMemoryThreshold =
abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
- }
-
- /** Returns the internal {@link AbfsConfiguration}. */
- private AbfsConfiguration getAbfsConfiguration() {
- return abfsConfiguration;
- }
-
- /**
- * Computes the maximum thread pool size based on the available processors
- * and the initial available heap memory. The calculation uses a tiered
- * multiplier derived from the memory-to-core ratio — systems with higher
- * memory per core allow for a larger thread pool.
- *
- * @param availableProcessors the number of available CPU cores.
- * @param initialAvailableHeapMemory the initial available heap memory, in
bytes or GB (depending on implementation).
- * @return the computed maximum thread pool size.
- */
- private int getComputedMaxPoolSize(final int availableProcessors, long
initialAvailableHeapMemory) {
- int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory,
availableProcessors);
- LOG.debug("Computed max thread pool size: {} | Available processors: {} |
Heap memory (GB): {}",
- maxpoolSize, availableProcessors, initialAvailableHeapMemory);
- return maxpoolSize;
- }
-
- /**
- * Determines the maximum thread count based on available heap memory and
CPU cores.
- * Calculates the thread count as {@code availableProcessors × multiplier},
where the
- * multiplier is selected according to the heap memory tier (low, medium, or
high).
- *
- * @param availableHeapGB the available heap memory in gigabytes.
- * @param availableProcessors the number of available CPU cores.
- * @return the maximum thread count based on memory tier and processor count.
- */
- private int getMemoryTierMaxThreads(long availableHeapGB, int
availableProcessors) {
- int multiplier;
- if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
- multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
- } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
- multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
- } else {
- multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
- }
- return availableProcessors * multiplier;
- }
-
- /**
- * Returns the singleton {@link WriteThreadPoolSizeManager} instance for the
specified filesystem.
- * If an active instance already exists in the manager map for the given
filesystem, it is returned.
- * Otherwise, a new instance is created, registered in the map, and returned.
- *
- * @param filesystemName the name of the filesystem.
- * @param abfsConfiguration the {@link AbfsConfiguration} associated with
the filesystem.
- * @param abfsCounters the {@link AbfsCounters} used to
initialize the manager.
- * @return the singleton {@link WriteThreadPoolSizeManager} instance for
the given filesystem.
- */
- public static synchronized WriteThreadPoolSizeManager getInstance(
- String filesystemName, AbfsConfiguration abfsConfiguration, AbfsCounters
abfsCounters) {
- /* Check if an instance already exists in the map for the given filesystem
*/
- WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
- filesystemName);
-
- /* If an existing instance is found, return it */
- if (existingInstance != null && existingInstance.boundedThreadPool != null
- && !existingInstance.boundedThreadPool.isShutdown()) {
- return existingInstance;
- }
-
- /* Otherwise, create a new instance, put it in the map, and return it */
- LOG.debug(
- "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
- filesystemName);
- WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
- filesystemName, abfsConfiguration, abfsCounters);
- POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
- return newInstance;
- }
-
- /**
- * Adjusts the thread pool size to the specified maximum pool size.
- *
- * @param newMaxPoolSize the new maximum pool size.
- */
- private void adjustThreadPoolSize(int newMaxPoolSize) {
- synchronized (this) {
- ThreadPoolExecutor threadPoolExecutor
- = ((ThreadPoolExecutor) boundedThreadPool);
- int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
-
- if (newMaxPoolSize >= currentCorePoolSize) {
- threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
- threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
- } else {
- threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
- threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
- }
- LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size:
{}, Active threads: {}",
- newMaxPoolSize, threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getActiveCount());
- }
- }
-
- /**
- * Starts monitoring the CPU utilization and adjusts the thread pool size
accordingly.
- */
- public synchronized void startCPUMonitoring() {
- if (!isMonitoringStarted()) {
- isMonitoringStarted = true;
- cpuMonitorExecutor.scheduleAtFixedRate(() -> {
- long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
- LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
- try {
- adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
- } catch (InterruptedException e) {
- throw new RuntimeException(String.format(
- "Thread pool size adjustment interrupted for filesystem %s",
- filesystemName), e);
- }
- }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
- TimeUnit.MILLISECONDS);
- }
- }
-
- /**
- * Dynamically adjusts the thread pool size based on current CPU utilization
- * and available heap memory relative to the initially available heap.
- *
- * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
- * @throws InterruptedException if the resizing operation is interrupted
while acquiring the lock
- */
- public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws
InterruptedException {
- lock.lock();
- try {
- ThreadPoolExecutor executor = (ThreadPoolExecutor)
this.boundedThreadPool;
- int currentPoolSize = executor.getMaximumPoolSize();
- long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
- long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
- long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
- long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
- LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad,
cpuUtilization);
- if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
- newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize,
memoryLoad);
- if (currentPoolSize == initialPoolSize && newMaxPoolSize ==
initialPoolSize) {
- lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
- }
- } else if (cpuUtilization >
(abfsConfiguration.getWriteMediumCpuThreshold())) {
- newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize,
memoryLoad);
- if (currentPoolSize == initialPoolSize && newMaxPoolSize ==
initialPoolSize) {
- lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
- }
- } else if (cpuUtilization <
(abfsConfiguration.getWriteLowCpuThreshold())) {
- newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize,
memoryLoad);
- if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize ==
maxThreadPoolSize) {
- lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
- }
- } else {
- newMaxPoolSize = currentPoolSize;
- LOG.debug("CPU load normal ({}). No change: current={}",
cpuUtilization, currentPoolSize);
- }
- boolean willResize = newMaxPoolSize != currentPoolSize;
- if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
- usedHeapMemory, availableMemory, committedMemory);
- // Update the write thread pool metrics with the latest statistics
snapshot.
- writeThreadPoolMetrics.update(stats);
- }
- // Case 1: CPU increased — push metrics ONLY if not resizing
- if (cpuUtilization > maxJvmCpuUtilization) {
- maxJvmCpuUtilization = cpuUtilization;
- if (!willResize) {
- try {
- // Capture the latest thread pool statistics (pool size, CPU,
memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
- usedHeapMemory, availableMemory, committedMemory);
- // Update the write thread pool metrics with the latest statistics
snapshot.
- writeThreadPoolMetrics.update(stats);
- } catch (Exception e) {
- LOG.debug("Error updating write thread pool metrics", e);
- }
- }
- }
- // Case 2: Resize — always push metrics
- if (willResize) {
- LOG.debug("Resizing thread pool from {} to {}", currentPoolSize,
newMaxPoolSize);
- // Record scale direction
- lastScaleDirection = (newMaxPoolSize > currentPoolSize) ?
SCALE_DIRECTION_UP: SCALE_DIRECTION_DOWN;
- adjustThreadPoolSize(newMaxPoolSize);
- try {
- // Capture the latest thread pool statistics (pool size, CPU,
memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
- usedHeapMemory, availableMemory, committedMemory);
- // Update the write thread pool metrics with the latest statistics
snapshot.
- writeThreadPoolMetrics.update(stats);
- } catch (Exception e) {
- LOG.debug("Error updating write thread pool metrics after
resizing.", e);
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Calculates a reduced thread pool size when high CPU utilization is
detected.
- * The reduction strategy depends on available heap memory:
- * if heap usage is high (low free memory), the pool size is reduced
aggressively;
- * otherwise, it is reduced moderately to prevent resource contention.
- *
- * @param currentPoolSize the current size of the thread pool.
- * @param memoryLoad the current JVM heap load (0.0–1.0)
- * @return the adjusted (reduced) pool size based on CPU and memory
conditions.
- */
- private int calculateReducedPoolSizeHighCPU(int currentPoolSize, double
memoryLoad) {
- LOG.debug("The high cpu memory load is {}", memoryLoad);
- if (memoryLoad > highMemoryThreshold) {
- LOG.debug("High CPU & high memory load ({}). Aggressive reduction:
current={}, new={}",
- memoryLoad, currentPoolSize, currentPoolSize /
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
- return Math.max(initialPoolSize, currentPoolSize /
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
- }
- int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize
/ HIGH_CPU_REDUCTION_FACTOR);
- LOG.debug("High CPU ({}). Reducing pool size moderately: current={},
new={}",
- abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize,
reduced);
- return reduced;
- }
-
- /**
- * Calculates a reduced thread pool size when medium CPU utilization is
detected.
- * The reduction is based on available heap memory: if memory is low, the
pool size
- * is reduced more aggressively; otherwise, a moderate reduction is applied
to
- * maintain balanced performance.
- *
- * @param currentPoolSize the current size of the thread pool.
- * @param memoryLoad the current JVM heap load (0.0–1.0)
- * @return the adjusted (reduced) pool size based on medium CPU and memory
conditions.
- */
- private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, double
memoryLoad) {
- LOG.debug("The medium cpu memory load is {}", memoryLoad);
- if (memoryLoad > highMemoryThreshold) {
- int reduced = Math.max(initialPoolSize, currentPoolSize -
currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
- LOG.debug("Medium CPU & high memory load ({}). Reducing: current={},
new={}",
- memoryLoad, currentPoolSize, reduced);
- return reduced;
- }
- int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize
/ MEDIUM_CPU_REDUCTION_FACTOR);
- LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
- abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize,
reduced);
- return reduced;
- }
-
- /**
- * Calculates an adjusted thread pool size when low CPU utilization is
detected.
- * If sufficient heap memory is available, the pool size is increased to
improve throughput.
- * Otherwise, it is slightly decreased to conserve memory resources.
- *
- * @param currentPoolSize the current size of the thread pool.
- * @param memoryLoad the current JVM heap load (0.0–1.0)
- * @return the adjusted (increased or decreased) pool size based on CPU and
memory conditions.
- */
- private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, double
memoryLoad) {
- LOG.debug("The low cpu memory load is {}", memoryLoad);
- if (memoryLoad <= lowMemoryThreshold) {
- int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize *
LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
- LOG.debug("Low CPU & low memory load ({}). Increasing: current={},
new={}",
- memoryLoad, currentPoolSize, increased);
- return increased;
- } else {
- // Decrease by 10%
- int decreased = Math.max(1, (int) (currentPoolSize *
LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
- LOG.debug("Low CPU but insufficient heap. Decreasing: current={},
new={}", currentPoolSize, decreased);
- return decreased;
- }
- }
-
- /**
- * Returns the executor service for the thread pool.
- *
- * @return the executor service.
- */
- public ExecutorService getExecutorService() {
- return boundedThreadPool;
- }
-
- /**
- * Returns the scheduled executor responsible for CPU monitoring and dynamic
pool adjustment.
- *
- * @return the {@link ScheduledExecutorService} used for CPU monitoring.
- */
- public ScheduledExecutorService getCpuMonitorExecutor() {
- return cpuMonitorExecutor;
- }
-
- /**
- * Checks if monitoring has started.
- *
- * @return true if monitoring has started, false otherwise.
- */
- public synchronized boolean isMonitoringStarted() {
- return isMonitoringStarted;
- }
-
- /**
- * Returns the maximum JVM CPU utilization observed during the current
- * monitoring interval or since the last reset.
- *
- * @return the highest JVM CPU utilization percentage recorded
- */
- @VisibleForTesting
- public long getMaxJvmCpuUtilization() {
- return maxJvmCpuUtilization;
- }
-
- /**
- * Closes this manager by shutting down executors and cleaning up resources.
- * Removes the instance from the active manager map.
- *
- * @throws IOException if an error occurs during shutdown.
- */
- @Override
- public void close() throws IOException {
- synchronized (this) {
- try {
- // Shutdown CPU monitor
- if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
- cpuMonitorExecutor.shutdown();
- }
- // Gracefully shutdown the bounded thread pool
- if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
- boundedThreadPool.shutdown();
- if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS,
TimeUnit.SECONDS)) {
- LOG.warn("Bounded thread pool did not terminate in time, forcing
shutdownNow for filesystem: {}", filesystemName);
- boundedThreadPool.shutdownNow();
- }
- boundedThreadPool = null;
- }
- // Remove from the map
- POOL_SIZE_MANAGER_MAP.remove(filesystemName);
- LOG.debug("Closed and removed instance for filesystem: {}",
filesystemName);
- } catch (Exception e) {
- LOG.warn("Failed to properly close instance for filesystem: {}",
filesystemName, e);
- }
- }
- }
-
- /**
- * Represents current statistics of the write thread pool and system.
- */
- public static class WriteThreadPoolStats extends ResourceUtilizationStats {
-
- /**
- * Constructs a {@link WriteThreadPoolStats} instance containing thread
pool
- * metrics and JVM/system resource utilization details.
- *
- * @param currentPoolSize the current number of threads in the pool
- * @param maxPoolSize the maximum number of threads permitted in the pool
- * @param activeThreads the number of threads actively executing tasks
- * @param idleThreads the number of idle threads in the pool
- * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
- * @param systemCpuUtilization the current system-wide CPU utilization
(0.0–1.0)
- * @param availableHeapGB the available heap memory in gigabytes
- * @param committedHeapGB the committed heap memory in gigabytes
- * @param usedHeapGB the available heap memory in gigabytes
- * @param maxHeapGB the committed heap memory in gigabytes
- * @param memoryLoad the JVM memory load (used / max)
- * @param lastScaleDirection the last scaling action performed: "I"
(increase),
- * "D" (decrease), or empty if no scaling occurred
- * @param maxCpuUtilization the peak JVM CPU utilization observed during
this interval
- * @param jvmProcessId the process ID of the JVM
- */
- public WriteThreadPoolStats(int currentPoolSize,
- int maxPoolSize, int activeThreads, int idleThreads,
- long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
- long committedHeapGB, long usedHeapGB, long maxHeapGB, long
memoryLoad, String lastScaleDirection,
- long maxCpuUtilization, long jvmProcessId) {
- super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
- jvmCpuLoad, systemCpuUtilization, availableHeapGB,
- committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad,
lastScaleDirection,
- maxCpuUtilization, jvmProcessId);
- }
- }
-
- /**
- * Returns a snapshot of the current write thread pool and JVM/system
resource
- * statistics.
- *
- * <p>The snapshot includes thread pool size and activity, JVM and system CPU
- * utilization, and JVM heap memory metrics. These values are used for
monitoring
- * and for making dynamic scaling decisions for the write thread pool.</p>
- *
- * @param jvmCpuUtilization current JVM CPU utilization
- * @param memoryLoad current JVM memory load ratio (used / max)
- * @param usedMemory current used JVM heap memory
- * @param availableMemory current available JVM heap memory
- * @param committedMemory current committed JVM heap memory
- *
- * @return a {@link WriteThreadPoolStats} instance containing the current
metrics
- */
- synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
- long memoryLoad, long usedMemory, long availableMemory, long
committedMemory) {
-
- if (boundedThreadPool == null) {
- return new WriteThreadPoolStats(
- ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
- ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
- }
-
- ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
-
- String currentScaleDirection = lastScaleDirection;
- lastScaleDirection = EMPTY_STRING;
-
- int poolSize = exec.getPoolSize();
- int activeThreads = exec.getActiveCount();
- int idleThreads = poolSize - activeThreads;
-
- return new WriteThreadPoolStats(
- poolSize, // Current thread count
- exec.getMaximumPoolSize(), // Max allowed threads
- activeThreads, // Busy threads
- idleThreads, // Idle threads
- jvmCpuUtilization, // JVM CPU usage (ratio)
- ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage
(ratio)
- availableMemory, // Free heap (GB)
- committedMemory, // Committed heap (GB)
- usedMemory, // Used heap (GB)
- ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
- memoryLoad, // used/max
- currentScaleDirection, // "I", "D", or ""
- getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far
- JvmUniqueIdProvider.getJvmId() // JVM PID
- );
- }
-}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index 918997ab43b..e5db0653f3c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -183,8 +183,6 @@ public final class AbfsHttpConstants {
public static final char CHAR_EQUALS = '=';
public static final char CHAR_STAR = '*';
public static final char CHAR_PLUS = '+';
- public static final int LOW_HEAP_SPACE_FACTOR = 4;
- public static final double MEDIUM_HEAP_SPACE_FACTOR = 8;
public static final int SPLIT_NO_LIMIT = -1;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 115568c7853..e3b3c2a4675 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -567,43 +567,6 @@ public static String containerProperty(String property,
String fsName, String ac
/**Maximum number of thread per blob-delete orchestration: {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD =
"fs.azure.blob.dir.delete.max.thread";
- /** Configuration key for the keep-alive time (ms) for the write thread
pool. Value: {@value}. */
- public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS
= "fs.azure.write.threadpool.keep.alive.time.millis";
-
- /** Configuration key for the CPU monitoring interval (ms) during write
operations. Value: {@value}. */
- public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS =
"fs.azure.write.cpu.monitoring.interval.millis";
-
- /** Configuration key to enable or disable dynamic write thread pool
adjustment. Value: {@value}. */
- public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT =
"fs.azure.write.dynamic.threadpool.enablement";
-
- /** Configuration key for the high CPU utilization threshold (%) for write
scaling. Value: {@value}. */
- public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT =
"fs.azure.write.high.cpu.threshold.percent";
-
- /** Configuration key for the medium CPU utilization threshold (%) for write
scaling. Value: {@value}. */
- public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT =
"fs.azure.write.medium.cpu.threshold.percent";
-
- /** Configuration key for the low CPU utilization threshold (%) for write
scaling. Value: {@value}. */
- public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT =
"fs.azure.write.low.cpu.threshold.percent";
-
- /** Configuration key for the low-tier memory multiplier for write
workloads. Value: {@value}. */
- public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.low.tier.memory.multiplier";
-
- /** Configuration key for the medium-tier memory multiplier for write
workloads. Value: {@value}. */
- public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.medium.tier.memory.multiplier";
-
- /** Configuration key for the high-tier memory multiplier for write
workloads. Value: {@value}. */
- public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.high.tier.memory.multiplier";
-
- /**
- * Threshold percentage for high memory usage to scale up/down the buffer
pool size in write code.
- */
- public static final String
FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT =
"fs.azure.write.high.memory.usage.threshold.percent";
-
- /**
- * Threshold percentage for low memory usage to scale up/down the buffer
pool size in write code.
- */
- public static final String FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT
= "fs.azure.write.low.memory.usage.threshold.percent";
-
/**Flag to enable/disable sending client transactional ID during
create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
"fs.azure.enable.client.transaction.id";
/**Flag to enable/disable create idempotency during create operation:
{@value}*/
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index a7717c124db..feafbe4f3a5 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -43,19 +43,6 @@ public final class FileSystemConfigurations {
* Number of bytes in a gigabyte.
*/
public static final long BYTES_PER_GIGABYTE = 1024L * 1024 * 1024;
- /**
- * Factor by which the pool size is increased when CPU utilization is low.
- */
- public static final double LOW_CPU_POOL_SIZE_INCREASE_FACTOR = 1.5;
- public static final double LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR = 0.9;
- public static final int HIGH_CPU_REDUCTION_FACTOR = 3;
- public static final int HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR = 2;
- public static final int MEDIUM_CPU_REDUCTION_FACTOR = 5;
- public static final int MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR = 3;
- public static final int HIGH_MEDIUM_HEAP_FACTOR = 2;
- public static final double LOW_CPU_HEAP_FACTOR = 0.8;
-
-
// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 500; // 500ms
@@ -254,28 +241,6 @@ public final class FileSystemConfigurations {
public static final int HUNDRED = 100;
public static final double HUNDRED_D = 100.0;
public static final long THOUSAND = 1000L;
- // Indicates a successful scale-up operation
- public static final int SCALE_UP = 1;
- // Indicates a successful scale-down operation
- public static final int SCALE_DOWN = -1;
- // Indicates a down-scale was requested but already at minimum
- public static final int NO_SCALE_DOWN_AT_MIN = -2;
- // Indicates an up-scale was requested but already at maximum
- public static final int NO_SCALE_UP_AT_MAX = 2;
- // Indicates no scaling action was taken
- public static final int SCALE_NONE = 0;
- // Indicates no action is needed based on current metrics
- public static final int NO_ACTION_NEEDED = 3;
- // Indicates a successful scale-up operation
- public static final String SCALE_DIRECTION_UP = "I";
- // Indicates a successful scale-down operation
- public static final String SCALE_DIRECTION_DOWN = "D";
- // Indicates a down-scale was requested but pool is already at minimum
- public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
- // Indicates an up-scale was requested but pool is already at maximum
- public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
- // Indicates no scaling action is needed based on current metrics
- public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
= HttpOperationType.APACHE_HTTP_CLIENT;
@@ -320,107 +285,6 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD =
DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
- /**
- * Whether dynamic write thread pool adjustment is enabled by default.
- */
- public static final boolean DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT =
false;
-
- /**
- * Default keep-alive time (in milliseconds) for write thread pool threads.
- */
- public static final int DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS =
30_000;
-
- /**
- * Minimum interval (in milliseconds) for CPU monitoring during write
operations.
- */
- public static final int MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 10_000;
-
- /**
- * Maximum interval (in milliseconds) for CPU monitoring during write
operations.
- */
- public static final int MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 60_000;
-
- /**
- * Default interval (in milliseconds) for CPU monitoring during write
operations.
- */
- public static final int DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS =
15_000;
-
- /**
- * Minimum CPU utilization percentage considered as high threshold for write
scaling.
- */
- public static final int MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 65;
-
- /**
- * Maximum CPU utilization percentage considered as high threshold for write
scaling.
- */
- public static final int MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 90;
-
- /**
- * Default CPU utilization percentage considered as high threshold for write
scaling.
- */
- public static final int DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 80;
-
- /**
- * Minimum CPU utilization percentage considered as medium threshold for
write scaling.
- */
- public static final int MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 45;
-
- /**
- * Maximum CPU utilization percentage considered as medium threshold for
write scaling.
- */
- public static final int MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 65;
-
- /**
- * Default CPU utilization percentage considered as medium threshold for
write scaling.
- */
- public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
-
- /**
- * Maximum CPU utilization percentage considered as low threshold for write
scaling.
- */
- public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
-
- /**
- * Default CPU utilization percentage considered as low threshold for write
scaling.
- */
- public static final int DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT = 35;
-
- /**
- * Minimum multiplier applied to available memory for low-tier write
workloads.
- */
- public static final int MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 3;
-
- /**
- * Default multiplier applied to available memory for low-tier write
workloads.
- */
- public static final int DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 4;
-
- /**
- * Minimum multiplier applied to available memory for medium-tier write
workloads.
- */
- public static final int MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
-
- /**
- * Default multiplier applied to available memory for medium-tier write
workloads.
- */
- public static final int DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 8;
-
- /**
- * Minimum multiplier applied to available memory for high-tier write
workloads.
- */
- public static final int MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 12;
-
- /**
- * Default multiplier applied to available memory for high-tier write
workloads.
- */
- public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
-
- /** Percentage threshold of heap usage at which memory pressure is
considered high. */
- public static final int DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT =
60;
-
- /** Percentage threshold of heap usage at which memory pressure is
considered low. */
- public static final int DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT =
30;
-
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY
= true;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
index 71d314a7d5e..3312e2e491a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/MetricsConstants.java
@@ -114,6 +114,39 @@ public final class MetricsConstants {
*/
public static final String READ_LENGTH = "$RL=";
+ // Indicates a successful scale-up operation
+ public static final int SCALE_UP = 1;
+
+ // Indicates a successful scale-down operation
+ public static final int SCALE_DOWN = -1;
+
+ // Indicates a down-scale was requested but already at minimum
+ public static final int NO_SCALE_DOWN_AT_MIN = -2;
+
+ // Indicates an up-scale was requested but already at maximum
+ public static final int NO_SCALE_UP_AT_MAX = 2;
+
+ // Indicates no scaling action was taken
+ public static final int SCALE_NONE = 0;
+
+ // Indicates no action is needed based on current metrics
+ public static final int NO_ACTION_NEEDED = 3;
+
+ // Indicates a successful scale-up operation
+ public static final String SCALE_DIRECTION_UP = "I";
+
+ // Indicates a successful scale-down operation
+ public static final String SCALE_DIRECTION_DOWN = "D";
+
+ // Indicates a down-scale was requested but pool is already at minimum
+ public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
+
+ // Indicates an up-scale was requested but pool is already at maximum
+ public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
+
+ // Indicates no scaling action is needed based on current metrics
+ public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
+
// Private constructor to prevent instantiation
private MetricsConstants() {
throw new AssertionError("Cannot instantiate MetricsConstants");
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
deleted file mode 100644
index 1f92626eeb4..00000000000
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.enums;
-
-/**
- * Enum representing the set of metrics tracked for the ABFS write thread pool.
- * Each metric entry defines a short name identifier and its corresponding
- * {@link StatisticTypeEnum}, which specifies the type of measurement (e.g.,
gauge).
- * These metrics are used for monitoring and analyzing the performance and
- * resource utilization of the write thread pool.
- */
-public enum AbfsWriteResourceUtilizationMetricsEnum implements
- AbfsResourceUtilizationMetricsEnum {
-
- /** Current number of threads in the write thread pool. */
- CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Maximum configured size of the write thread pool. */
- MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Number of threads currently executing write operations. */
- ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Number of threads currently idle. */
- IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
- JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Overall system-wide CPU utilization percentage during write operations.
*/
- SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Available heap memory (in GB) measured during write operations. */
- AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Committed heap memory (in GB) measured during write operations. */
- COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Used heap memory (in GB) measured during write operations. */
- USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Maximum heap memory (in GB) measured during write operations. */
- MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Available heap memory (in GB) measured during write operations. */
- MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
- LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
-
- /** Maximum CPU utilization recorded during the monitoring interval. */
- MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
-
- /** The process ID (PID) of the running JVM, useful for correlating metrics
with system-level process information. */
- JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
-
- private final String name;
- private final StatisticTypeEnum statisticType;
-
- /**
- * Constructs a metric definition for the ABFS write thread pool.
- *
- * @param name the short name identifier for the metric.
- * @param type the {@link StatisticTypeEnum} describing the metric type.
- */
- AbfsWriteResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type)
{
- this.name = name;
- this.statisticType = type;
- }
-
- /**
- * Returns the short name identifier of the metric.
- *
- * @return the metric name.
- */
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * Returns the {@link StatisticTypeEnum} associated with this metric.
- *
- * @return the metric's statistic type.
- */
- @Override
- public StatisticTypeEnum getStatisticType() {
- return statisticType;
- }
-}
-
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index db3e163ca4d..a781e12c86a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -276,11 +276,6 @@ private AbfsClient(final URL baseUrl,
// register the client to Aggregated Metrics Manager
abfsMetricsManager.getAggregateMetricsManager()
.registerClient(accountName, this);
-
- // Initialize write thread pool metrics if dynamic write thread pool
scaling is enabled.
- if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
- abfsCounters.initializeWriteResourceUtilizationMetrics();
- }
// Initialize read thread pool metrics if ReadAheadV2 and its dynamic
scaling feature are enabled.
if (abfsConfiguration.isReadAheadV2Enabled() &&
abfsConfiguration.isReadAheadV2DynamicScalingEnabled()) {
abfsCounters.initializeReadResourceUtilizationMetrics();
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
index 8ada20abf43..ab42bc94bd6 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
@@ -89,10 +89,6 @@ void initializeMetrics(MetricFormat metricFormat,
AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics();
- void initializeWriteResourceUtilizationMetrics();
-
- AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics();
-
AtomicLong getLastExecutionTime();
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 883af56c589..cb48fea42fc 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -31,7 +31,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
@@ -168,13 +167,6 @@ public class AbfsOutputStream extends OutputStream
implements Syncable,
*/
private MessageDigest fullBlobContentMd5 = null;
- /**
- * Instance of {@link WriteThreadPoolSizeManager} used by this class
- * to dynamically adjust the write thread pool size based on
- * system resource utilization.
- */
- private final WriteThreadPoolSizeManager writeThreadPoolSizeManager;
-
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.statistics = abfsOutputStreamContext.getStatistics();
@@ -225,9 +217,6 @@ public AbfsOutputStream(AbfsOutputStreamContext
abfsOutputStreamContext)
this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
this.currentExecutingServiceType =
abfsOutputStreamContext.getIngressServiceType();
this.clientHandler = abfsOutputStreamContext.getClientHandler();
- this.writeThreadPoolSizeManager =
abfsOutputStreamContext.getWriteThreadPoolSizeManager();
- // Initialize CPU monitoring if the pool size manager is present
- initializeMonitoringIfNeeded();
createIngressHandler(serviceTypeAtInit,
abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
try {
@@ -253,21 +242,6 @@ public AzureIngressHandler getIngressHandler() {
private volatile boolean switchCompleted = false;
- /**
- * Starts CPU monitoring in the thread pool size manager if it
- * is initialized and not already monitoring.
- */
- private void initializeMonitoringIfNeeded() {
- if (writeThreadPoolSizeManager != null &&
!writeThreadPoolSizeManager.isMonitoringStarted()) {
- synchronized (this) {
- // Re-check to avoid a race between threads
- if (!writeThreadPoolSizeManager.isMonitoringStarted()) {
- writeThreadPoolSizeManager.startCPUMonitoring();
- }
- }
- }
- }
-
/**
* Creates or retrieves an existing Azure ingress handler based on the
service type and provided parameters.
* <p>
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index 68a2ba04d20..ceae24b0ee6 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -21,7 +21,6 @@
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -80,9 +79,6 @@ public class AbfsOutputStreamContext extends
AbfsStreamContext {
private AbfsClientHandler clientHandler;
- /** Reference to the thread pool manager. */
- private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
-
public AbfsOutputStreamContext(final long
sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -232,12 +228,6 @@ public AbfsOutputStreamContext withEncryptionAdapter(
return this;
}
- public AbfsOutputStreamContext withWriteThreadPoolManager(
- final WriteThreadPoolSizeManager writeThreadPoolSizeManager) {
- this.writeThreadPoolSizeManager = writeThreadPoolSizeManager;
- return this;
- }
-
public int getWriteBufferSize() {
return writeBufferSize;
}
@@ -338,10 +328,6 @@ public AbfsClientHandler getClientHandler() {
return clientHandler;
}
- public WriteThreadPoolSizeManager getWriteThreadPoolSizeManager() {
- return writeThreadPoolSizeManager;
- }
-
/**
* Checks if small write is supported based on the current configuration.
*
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
deleted file mode 100644
index 6c44c20b31b..00000000000
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.services;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import
org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
-import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
-import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
-
-/**
- * Metrics container for the ABFS write thread pool.
- * <p>
- * This class records pool size, CPU utilization, memory usage,
- * scaling direction, and other runtime indicators reported by
- * {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}.
- * </p>
- */
-public class AbfsWriteResourceUtilizationMetrics
- extends
-
AbstractAbfsResourceUtilizationMetrics<AbfsWriteResourceUtilizationMetricsEnum>
{
-
- /**
- * A version counter incremented each time a metric update occurs.
- * Used to detect whether metrics have changed since the last serialization.
- */
- private final AtomicLong updateVersion = new AtomicLong(0);
-
- /**
- * The last version number that was serialized and pushed out.
- */
- private final AtomicLong lastPushedVersion = new AtomicLong(0);
-
- /**
- * Creates a metrics set for write operations, pre-initializing
- * all metric keys defined in {@link
AbfsWriteResourceUtilizationMetricsEnum}.
- */
- public AbfsWriteResourceUtilizationMetrics() {
- super(AbfsWriteResourceUtilizationMetricsEnum.values(),
FSOperationType.WRITE.toString());
- }
-
- @Override
- protected boolean isUpdated() {
- return updateVersion.get() > lastPushedVersion.get();
- }
-
- protected void markUpdated() {
- updateVersion.incrementAndGet();
- }
-
- @Override
- protected long getUpdateVersion() {
- return updateVersion.get();
- }
-
- @Override
- protected long getLastPushedVersion() {
- return lastPushedVersion.get();
- }
-
- /**
- * Marks the current metrics version as pushed.
- * Must be called only after the metrics string is actually emitted.
- */
- @Override
- public synchronized void markPushed() {
- lastPushedVersion.set(updateVersion.get());
- }
-
- /**
- * Updates all write-thread-pool metrics using the latest stats snapshot.
- * Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
- * is mapped to a corresponding metric.
- *
- * @param stats the latest thread-pool statistics; ignored if {@code null}
- */
- public synchronized void
update(WriteThreadPoolSizeManager.WriteThreadPoolStats stats) {
- if (stats == null) {
- return;
- }
-
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE,
stats.getCurrentPoolSize());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE,
stats.getMaxPoolSize());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS,
stats.getActiveThreads());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS,
stats.getIdleThreads());
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION,
stats.getJvmCpuLoad());
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION,
stats.getSystemCpuUtilization());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY,
stats.getMemoryUtilization());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY,
stats.getCommittedHeapGB());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY,
stats.getUsedHeapGB());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY,
stats.getMaxHeapGB());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD,
stats.getMemoryLoad());
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
- stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION,
stats.getMaxCpuUtilization());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID,
stats.getJvmProcessId());
-
- markUpdated();
- }
-}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 6877d5a03a9..8610adde30b 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -125,15 +125,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
TracingContext tracingContextAppend = new TracingContext(tracingContext);
tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
- // Fetches write thread pool metrics from the ABFS client and adds them
to the tracing context.
- AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
- if (writeResourceUtilizationMetrics != null) {
- String writeMetrics = writeResourceUtilizationMetrics.toString();
- tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
- if (!writeMetrics.isEmpty()) {
- writeResourceUtilizationMetrics.markPushed();
- }
- }
try {
LOG.trace("Starting remote write for block with ID {} and offset {}",
blockToUpload.getBlockId(), blockToUpload.getOffset());
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index bc5e6b7c521..96a8f07cc3d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -117,15 +117,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
TracingContext tracingContextAppend = new TracingContext(tracingContext);
- // Fetches write thread pool metrics from the ABFS client and adds them to
the tracing context.
- AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
- if (writeResourceUtilizationMetrics != null) {
- String writeMetrics = writeResourceUtilizationMetrics.toString();
- tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
- if (!writeMetrics.isEmpty()) {
- writeResourceUtilizationMetrics.markPushed();
- }
- }
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index 8db94cc3e4c..cfa01315f2a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -110,15 +110,6 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
TracingContext tracingContextAppend = new TracingContext(tracingContext);
- // Fetches write thread pool metrics from the ABFS client and adds them to
the tracing context.
- AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
- if (writeResourceUtilizationMetrics != null) {
- String writeMetrics = writeResourceUtilizationMetrics.toString();
- tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
- if (!writeMetrics.isEmpty()) {
- writeResourceUtilizationMetrics.markPushed();
- }
- }
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " +
threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
index 48f7058ffb7..81007e1c3dd 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
@@ -236,15 +236,4 @@ protected String computeFullBlobMd5() {
}
return fullBlobMd5;
}
-
- /**
- * Helper that returns the write thread-pool metrics from the client's
counters, if available.
- *
- * @return the {@link AbfsWriteResourceUtilizationMetrics} instance or
{@code null} when not present
- */
- protected AbfsWriteResourceUtilizationMetrics
getWriteResourceUtilizationMetrics() {
- return getAbfsOutputStream().getClient()
- .getAbfsCounters()
- .getAbfsWriteResourceUtilizationMetrics();
- }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 5cbe4893b12..c271a5e354a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -46,12 +46,12 @@
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_DOWN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_UP;
/**
* The Improved Read Buffer Manager for Rest AbfsClient.
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
index 6ddd2e9f6f2..b871afa2802 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -19,17 +19,17 @@
package org.apache.hadoop.fs.azurebfs.services;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DOWN;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_NONE;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_UP;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_ACTION_NEEDED;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_SCALE_DOWN_AT_MIN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.NO_SCALE_UP_AT_MAX;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_DOWN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DIRECTION_UP;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_DOWN;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_NONE;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SCALE_UP;
/**
* Represents current statistics of the thread pool and system.
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
deleted file mode 100644
index 735b3f276ba..00000000000
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++ /dev/null
@@ -1,963 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs;
-
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerArray;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
-import
org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
-import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
-
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT;
-import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
-
- private AbfsConfiguration mockConfig;
- private static final long HIGH_CPU_UTILIZATION_THRESHOLD = 95;
- private static final long LOW_CPU_UTILIZATION_THRESHOLD = 5;
- private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
- private static final int THREAD_SLEEP_DURATION_MS = 200;
- private static final String TEST_FILE_PATH = "testFilePath";
- private static final String TEST_DIR_PATH = "testDirPath";
- private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
- private static final int CONCURRENT_REQUEST_COUNT = 15;
- private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10;
- private static final int LOW_TIER_MEMORY_MULTIPLIER = 4;
- private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
- private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8;
- private static final int HIGH_CPU_THRESHOLD = 15;
- private static final int MEDIUM_CPU_THRESHOLD = 10;
- private static final int LOW_CPU_THRESHOLD = 5;
- private static final int CPU_MONITORING_INTERVAL = 15;
- private static final int WAIT_DURATION_MS = 3000;
- private static final int LATCH_TIMEOUT_SECONDS = 60;
- private static final int RESIZE_WAIT_TIME_MS = 6_000;
- private static final long HIGH_CPU_USAGE_RATIO = 95;
- private static final long LOW_CPU_USAGE_RATIO = 5;
- private static final int SLEEP_DURATION_MS = 150;
- private static final int AWAIT_TIMEOUT_SECONDS = 45;
- private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
- private static final int WAIT_TIMEOUT_MS = 5000;
- private static final int SLEEP_DURATION_30S_MS = 30000;
- private static final int SMALL_PAUSE_MS = 50;
- private static final int BURST_LOAD = 50;
- private static final long LOAD_SLEEP_DURATION_MS = 2000;
-
- TestWriteThreadPoolSizeManager() throws Exception {
- super.setup();
- }
-
- /**
- * Common setup to prepare a mock configuration for each test.
- */
- @BeforeEach
- public void setUp() {
- mockConfig = mock(AbfsConfiguration.class);
-
when(mockConfig.getWriteConcurrentRequestCount()).thenReturn(CONCURRENT_REQUEST_COUNT);
-
when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME);
-
when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER);
-
when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER);
-
when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER);
- when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD);
-
when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
- when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
-
when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
-
when(mockConfig.getWriteLowMemoryUsageThresholdPercent()).thenReturn(LOW_MEMORY_USAGE_THRESHOLD_PERCENT);
- }
-
- /**
- * Verifies that {@link WriteThreadPoolSizeManager#getInstance(String,
AbfsConfiguration, AbfsCounters)}
- * returns the same singleton instance for the same filesystem name, and a
different instance
- * for a different filesystem name.
- */
- @Test
- void testGetInstanceReturnsSingleton() throws IOException {
- WriteThreadPoolSizeManager instance1
- = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- WriteThreadPoolSizeManager instance2
- = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- WriteThreadPoolSizeManager instance3 =
- WriteThreadPoolSizeManager.getInstance("newFs", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- Assertions.assertThat(instance1)
- .as("Expected the same singleton instance for the same key")
- .isSameAs(instance2);
- Assertions.assertThat(instance1)
- .as("Expected the same singleton instance for the same key")
- .isNotSameAs(instance3);
- }
-
- /**
- * Tests that high CPU usage results in thread pool downscaling.
- */
- @Test
- void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException,
IOException {
- // Initialize filesystem and thread pool manager
- Configuration conf = getRawConfiguration();
- conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
- FileSystem fileSystem = FileSystem.newInstance(conf);
- try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
- // Get the executor service (ThreadPoolExecutor)
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getAbfsStore(abfs).getAbfsConfiguration(),
- abfs.getAbfsClient().getAbfsCounters());
- ExecutorService executor = instance.getExecutorService();
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
-
- // Simulate high CPU usage (e.g., 95% CPU utilization)
- int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
- instance.adjustThreadPoolSizeBasedOnCPU(
- HIGH_CPU_UTILIZATION_THRESHOLD); // High CPU
-
- // Get the new maximum pool size after adjustment
- int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
-
- // Assert that the pool size has decreased or is equal to initial
PoolSize based on high CPU usage
- Assertions.assertThat(newMaxSize)
- .as("Expected pool size to decrease under high CPU usage")
- .isLessThanOrEqualTo(initialMaxSize);
- instance.close();
- }
- }
-
- /**
- * Tests that low CPU usage results in thread pool upscaling or remains the
same.
- */
- @Test
- void testAdjustThreadPoolSizeBasedOnLowCPU()
- throws InterruptedException, IOException {
- Configuration conf = getRawConfiguration();
- conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
- FileSystem fileSystem = FileSystem.newInstance(conf);
- try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- mockConfig,
- abfs.getAbfsClient().getAbfsCounters());
- ExecutorService executor = instance.getExecutorService();
- int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
- instance.adjustThreadPoolSizeBasedOnCPU(
- LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
- int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
- Assertions.assertThat(newSize)
- .as("Expected pool size to increase or stay the same under low CPU
usage")
- .isGreaterThanOrEqualTo(initialSize);
- instance.close();
- }
- }
-
-
- /**
- * Confirms that the thread pool executor is initialized and not shut down.
- */
- @Test
- void testExecutorServiceIsNotNull() throws IOException {
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- ExecutorService executor = instance.getExecutorService();
- Assertions.assertThat(executor).as("Executor service should be
initialized")
- .isNotNull();
- Assertions.assertThat(executor.isShutdown())
- .as("Executor service should not be shut down")
- .isFalse();
- instance.close();
- }
-
-
- /**
- * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up
resources.
- */
- @Test
- void testCloseCleansUp() throws Exception {
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- ExecutorService executor = instance.getExecutorService();
- instance.close();
- Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
- .as("Executor service should be shut down or terminated after close()")
- .isTrue();
- }
-
- /**
- * Test that the CPU monitoring task is scheduled properly when
startCPUMonitoring() is called.
- * This test checks the following:
- * 1. That the CPU monitoring task gets scheduled by verifying that the CPU
monitor executor is not null.
- * 2. Ensures that the thread pool executor has at least one thread running,
confirming that the task is being executed.
- * @throws InterruptedException if the test is interrupted during the sleep
time
- */
- @Test
- void testStartCPUMonitoringSchedulesTask()
- throws InterruptedException, IOException {
- // Create a new instance of WriteThreadPoolSizeManager using a mock
configuration
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
-
- // Call startCPUMonitoring to schedule the monitoring task
- instance.startCPUMonitoring();
-
- // Wait for a short period to allow the task to run and be scheduled
- Thread.sleep(THREAD_SLEEP_DURATION_MS);
-
- // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from
the instance
- ScheduledThreadPoolExecutor monitor
- = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor();
-
- // Assert that the monitor executor is not null, ensuring that it was
properly initialized
- Assertions.assertThat(monitor)
- .as("CPU Monitor Executor should not be null")
- .isNotNull();
-
- // Assert that the thread pool size is greater than 0, confirming that the
task has been scheduled and threads are active
- Assertions.assertThat(monitor.getPoolSize())
- .as("Thread pool size should be greater than 0, indicating that the
task is running")
- .isGreaterThan(ZERO);
- instance.close();
- }
-
- /**
- * Verifies that ABFS write tasks can complete successfully even when the
system
- * is under artificial CPU stress. The test also ensures that the write
thread
- * pool resizes dynamically under load without leading to starvation,
overload,
- * or leftover work in the queue.
- */
- @Test
- void testABFSWritesUnderCPUStress() throws Exception {
- // Initialize the filesystem and thread pool manager
- AzureBlobFileSystem fs = getFileSystem();
- WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
- getConfiguration(),
getFileSystem().getAbfsClient().getAbfsCounters());
- ThreadPoolExecutor executor =
- (ThreadPoolExecutor) instance.getExecutorService();
-
- // Start CPU monitoring so pool size adjustments happen in response to load
- instance.startCPUMonitoring();
-
- // Launch a background thread that generates CPU stress for ~3 seconds.
- // This simulates contention on the system and should cause the pool to
adjust.
- Thread stressThread = new Thread(() -> {
- long end = System.currentTimeMillis() + WAIT_DURATION_MS;
- while (System.currentTimeMillis() < end) {
- // Busy-work loop: repeatedly compute random powers to waste CPU cycles
- double waste = Math.pow(Math.random(), Math.random());
- }
- });
- stressThread.start();
-
- // Prepare the ABFS write workload with multiple concurrent tasks
- int taskCount = 10;
- CountDownLatch latch = new CountDownLatch(taskCount);
- Path testFile = new Path(TEST_FILE_PATH);
- final byte[] b = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(b);
-
- // Submit 10 tasks, each writing to its own file to simulate parallel load
- for (int i = 0; i < taskCount; i++) {
- int finalI = i;
- executor.submit(() -> {
- try (FSDataOutputStream out = fs.create(
- new Path(testFile + "_" + finalI), true)) {
- for (int j = 0; j < 5; j++) {
- out.write(b); // perform multiple writes to add sustained pressure
- }
- out.hflush(); // flush to force actual I/O
- } catch (IOException e) {
- // Any failure here indicates pool misbehavior or I/O issues
- Assertions.fail("Write task failed with exception", e);
- } finally {
- // Mark this task as complete
- latch.countDown();
- }
- });
- }
-
- // Wait for all tasks to finish (up to 60s timeout to guard against
deadlock/starvation)
- boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // Record the pool size after CPU stress to confirm resizing took place
- int resizedPoolSize = executor.getMaximumPoolSize();
-
- // 1. All tasks must finish within timeout → proves no starvation or
deadlock
- Assertions.assertThat(finished)
- .as("All ABFS write tasks should complete without starvation")
- .isTrue();
-
- // 2. Pool size must fall within valid bounds → proves resizing occurred
- Assertions.assertThat(resizedPoolSize)
- .as("Thread pool size should dynamically adjust under CPU stress")
- .isBetween(1,
getAbfsStore(fs).getAbfsConfiguration().getWriteConcurrentRequestCount());
-
- // 3. Task queue must be empty → proves no backlog remains after workload
- Assertions.assertThat(executor.getQueue().size())
- .as("No backlog should remain in task queue after completion")
- .isEqualTo(0);
-
- // Cleanup resources
- instance.close();
- }
-
-
- /**
- * Ensures that dynamic thread pool resizing during an active ABFS write
workload
- * does not cause deadlocks, task loss, or task duplication. The test also
verifies
- * that the pool resizes while work is in progress and that the executor
queue
- * eventually drains cleanly.
- */
- @Test
- void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
- // Initialize filesystem and thread pool manager
- AzureBlobFileSystem fs = getFileSystem();
- WriteThreadPoolSizeManager mgr =
- WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig,
- getFileSystem().getAbfsClient().getAbfsCounters());
- ThreadPoolExecutor executor = (ThreadPoolExecutor)
mgr.getExecutorService();
-
- // Enable monitoring (may not be required if adjust() is triggered
internally)
- mgr.startCPUMonitoring();
-
- // Test configuration: enough tasks and writes to stress the pool
- final int taskCount = 10;
- final int writesPerTask = 5;
- final byte[] b = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(b);
- final Path base = new Path(TEST_DIR_PATH);
- fs.mkdirs(base);
-
- // Barrier ensures all tasks start together, so resizing happens mid-flight
- final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1);
- final CountDownLatch done = new CountDownLatch(taskCount);
-
- // Track execution results
- final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); //
mark tasks once
- final AtomicInteger duplicates = new AtomicInteger(0); //
guard against double-completion
- final AtomicInteger rejected = new AtomicInteger(0); //
count unexpected rejections
-
- // Submit ABFS write tasks
- for (int i = 0; i < taskCount; i++) {
- final int id = i;
- try {
- executor.submit(() -> {
- try {
- // Hold until all tasks are enqueued, then start together
- startBarrier.await(10, TimeUnit.SECONDS);
-
- // Each task writes to its own file, flushing intermittently
- Path subPath = new Path(base, "part-" + id);
- try (FSDataOutputStream out = fs.create(subPath)) {
- for (int w = 0; w < writesPerTask; w++) {
- out.write(b);
- if ((w & 1) == 1) {
- out.hflush(); // force some syncs to increase contention
- }
- }
- out.hflush();
- }
-
- // Mark task as completed once; duplicates flag if it happens again
- if (!completed.compareAndSet(id, 0, 1)) {
- duplicates.incrementAndGet();
- }
- } catch (Exception e) {
- Assertions.fail("ABFS write task " + id + " failed", e);
- } finally {
- done.countDown();
- }
- });
- } catch (RejectedExecutionException rex) {
- rejected.incrementAndGet();
- }
- }
-
- // Thread that simulates fluctuating CPU load while tasks are running
- final AtomicInteger observedMinMax = new
AtomicInteger(executor.getMaximumPoolSize());
- final AtomicInteger observedMaxMax = new
AtomicInteger(executor.getMaximumPoolSize());
-
- Thread resizer = new Thread(() -> {
- try {
- // Release worker tasks
- startBarrier.await(10, TimeUnit.SECONDS);
-
- long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep
resizing for ~6s
- boolean high = true;
- while (System.currentTimeMillis() < end) {
- // Alternate between high load (shrink) and low load (expand)
- if (high) {
- mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO);
- } else {
- mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO);
- }
- high = !high;
-
- // Track observed pool size bounds to prove resizing occurred
- int cur = executor.getMaximumPoolSize();
- observedMinMax.updateAndGet(prev -> Math.min(prev, cur));
- observedMaxMax.updateAndGet(prev -> Math.max(prev, cur));
-
- Thread.sleep(SLEEP_DURATION_MS);
- }
- } catch (Exception ignore) {
- // No-op: this is best-effort simulation
- }
- }, "resizer-thread");
-
- resizer.start();
-
- // Wait for all tasks to finish (ensures no deadlock)
- boolean finished = done.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // Join resizer thread
- resizer.join(RESIZER_JOIN_TIMEOUT_MS);
-
- // 1. All tasks must complete in time → proves there are no deadlocks
- Assertions.assertThat(finished)
- .as("All tasks must complete within timeout (no deadlock)")
- .isTrue();
-
- // 2. Every task should complete exactly once → proves no task loss
- int completedCount = 0;
- for (int i = 0; i < taskCount; i++) {
- completedCount += completed.get(i);
- }
- Assertions.assertThat(completedCount)
- .as("Every task should complete exactly once (no task loss)")
- .isEqualTo(taskCount);
-
- // 3. No task should mark itself as done more than once → proves no
duplication
- Assertions.assertThat(duplicates.get())
- .as("No task should report completion more than once (no duplication)")
- .isZero();
-
- // 4. The executor should not reject tasks while resizing is happening
- Assertions.assertThat(rejected.get())
- .as("Tasks should not be rejected during active resizing")
- .isZero();
-
- // 5. Executor queue should eventually empty once all tasks finish
- Assertions.assertThat(executor.getQueue().size())
- .as("Executor queue should drain after workload")
- .isEqualTo(0);
-
- // 6. Executor should still be running after workload until explicitly
closed
- Assertions.assertThat(executor.isShutdown())
- .as("Executor should remain running until manager.close()")
- .isFalse();
-
- // 7. Verify that resizing actually occurred (pool max both grew and
shrank)
- int minObserved = observedMinMax.get();
- int maxObserved = observedMaxMax.get();
-
- Assertions.assertThat(maxObserved)
- .as("Pool maximum size should have increased or fluctuated above
baseline")
- .isGreaterThan(0);
-
- Assertions.assertThat(minObserved)
- .as("Pool maximum size should have dropped during resizing")
- .isLessThanOrEqualTo(maxObserved);
-
- // Cleanup
- for (int i = 0; i < taskCount; i++) {
- Path p = new Path(base, "part-" + i);
- try {
- fs.delete(p, false);
- } catch (IOException ignore) {
- // Ignored: delete failures are non-fatal for test cleanup
- }
- }
- try {
- fs.delete(base, true);
- } catch (IOException ignore) {
- // Ignored: cleanup failures are non-fatal in tests
- }
- mgr.close();
- }
-
-
-
- /**
- * Verifies that when the system experiences high CPU usage,
- * the WriteThreadPoolSizeManager detects the load and reduces
- * the maximum thread pool size accordingly.
- */
- @Test
- void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
- // Initialize filesystem and thread pool manager
- try (FileSystem fileSystem =
FileSystem.newInstance(getRawConfiguration())) {
- AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
- WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getConfiguration(),
getFileSystem().getAbfsClient().getAbfsCounters());
- ThreadPoolExecutor executor =
- (ThreadPoolExecutor) instance.getExecutorService();
-
- // Start monitoring CPU load
- instance.startCPUMonitoring();
-
- // Capture baseline pool size for comparison later
- int initialMax = executor.getMaximumPoolSize();
-
- // Define a CPU-bound task: tight loop of math ops for ~5s
- Runnable cpuBurn = () -> {
- long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
- while (System.currentTimeMillis() < end) {
- double waste = Math.sin(Math.random()) * Math.cos(Math.random());
- }
- };
-
- // Launch two CPU hogs in parallel
- Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1");
- Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2");
- cpuHog1.start();
- cpuHog2.start();
-
- // Submit multiple write tasks while CPU is under stress
- int taskCount = 10;
- CountDownLatch latch = new CountDownLatch(taskCount);
- Path base = new Path(TEST_DIR_PATH);
- abfs.mkdirs(base);
- final byte[] buffer = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(buffer);
-
- for (int i = 0; i < taskCount; i++) {
- final Path part = new Path(base, "part-" + i);
- executor.submit(() -> {
- try (FSDataOutputStream out = abfs.create(part, true)) {
- for (int j = 0; j < 5; j++) {
- out.write(buffer);
- out.hflush();
- }
- } catch (IOException e) {
- Assertions.fail("Write task failed under CPU stress", e);
- } finally {
- latch.countDown();
- }
- });
- }
-
- // Ensure all tasks complete (avoid deadlock/starvation)
- boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // Wait for CPU hogs to end and give monitor time to react
- cpuHog1.join();
- cpuHog2.join();
- Thread.sleep(SLEEP_DURATION_30S_MS);
-
- int resizedMax = executor.getMaximumPoolSize();
-
- // Verify outcomes:
- // 1. All write tasks succeeded despite CPU pressure
- Assertions.assertThat(finished)
- .as("All ABFS write tasks must complete despite CPU stress")
- .isTrue();
-
- // 2. Thread pool scaled down as expected
- Assertions.assertThat(resizedMax)
- .as("Thread pool should scale down under high CPU load")
- .isLessThanOrEqualTo(initialMax);
-
- // 3. No leftover tasks in the queue
- Assertions.assertThat(executor.getQueue().size())
- .as("No backlog should remain in the queue after workload")
- .isEqualTo(0);
-
- // Cleanup test data
- for (int i = 0; i < taskCount; i++) {
- try {
- abfs.delete(new Path(base, "part-" + i), false);
- } catch (IOException ignore) {
- // Ignored: cleanup failures are non-fatal in tests
- }
- }
- try {
- abfs.delete(base, true);
- } catch (IOException ignore) {
- // Ignored: cleanup failures are non-fatal in tests
- }
- instance.close();
- }
- }
-
-
- /**
- * Verifies that when two parallel high memory–consuming workloads run,
- * the WriteThreadPoolSizeManager detects the memory pressure and
- * scales down the maximum thread pool size.
- */
- @Test
- void testScalesDownOnParallelHighMemoryLoad() throws Exception {
- // Initialize filesystem and thread pool manager
- try (FileSystem fileSystem =
FileSystem.newInstance(getRawConfiguration())) {
- AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
- WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getConfiguration(),
getFileSystem().getAbfsClient().getAbfsCounters());
- ThreadPoolExecutor executor =
- (ThreadPoolExecutor) instance.getExecutorService();
-
- // Begin monitoring resource usage (CPU + memory)
- instance.startCPUMonitoring();
-
- // Capture the initial thread pool size for later comparison
- int initialMax = executor.getMaximumPoolSize();
-
- // Define a workload that continuously allocates memory (~5 MB chunks)
- // for ~5 seconds to simulate memory pressure in the JVM.
- Runnable memoryBurn = () -> {
- List<byte[]> allocations = new ArrayList<>();
- long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
- while (System.currentTimeMillis() < end) {
- try {
- allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB
- Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM
- } catch (OutOfMemoryError oom) {
- // Clear allocations if JVM runs out of memory and continue
- allocations.clear();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- };
-
- // Start two threads running the memory hog workload in parallel
- Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1");
- Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2");
- memHog1.start();
- memHog2.start();
-
- // Submit several write tasks to ABFS while memory is under stress
- int taskCount = 10;
- CountDownLatch latch = new CountDownLatch(taskCount);
- Path base = new Path(TEST_DIR_PATH);
- abfs.mkdirs(base);
- final byte[] buffer = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(buffer);
-
- for (int i = 0; i < taskCount; i++) {
- final Path part = new Path(base, "part-" + i);
- executor.submit(() -> {
- try (FSDataOutputStream out = abfs.create(part, true)) {
- for (int j = 0; j < 5; j++) {
- out.write(buffer);
- out.hflush();
- }
- } catch (IOException e) {
- Assertions.fail("Write task failed under memory stress", e);
- } finally {
- latch.countDown();
- }
- });
- }
-
- // Ensure all tasks finish within a timeout
- boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-
- // Wait for memory hog threads to finish
- memHog1.join();
- memHog2.join();
-
- // Give monitoring thread time to detect memory pressure and react
- Thread.sleep(SLEEP_DURATION_30S_MS);
-
- int resizedMax = executor.getMaximumPoolSize();
-
- // Validate that:
- // 1. All ABFS writes succeeded despite memory stress
- Assertions.assertThat(finished)
- .as("All ABFS write tasks must complete despite parallel memory
stress")
- .isTrue();
-
- // 2. The thread pool scaled down under memory pressure
- Assertions.assertThat(resizedMax)
- .as("Thread pool should scale down under parallel high memory load")
- .isLessThanOrEqualTo(initialMax);
-
- // 3. No tasks remain queued after workload completion
- Assertions.assertThat(executor.getQueue().size())
- .as("No backlog should remain in the queue after workload")
- .isEqualTo(0);
-
- // Clean up temporary test files
- for (int i = 0; i < taskCount; i++) {
- try {
- abfs.delete(new Path(base, "part-" + i), false);
- } catch (IOException ignore) {
- // Ignored: cleanup failures are non-fatal in tests
- }
- }
- try {
- abfs.delete(base, true);
- } catch (IOException ignore) {
- // Ignored: cleanup failures are non-fatal in tests
- }
- instance.close();
- }
- }
-
- /**
- * Test that after a long idle period, the thread pool
- * can quickly scale up in response to a sudden burst of load
- * without performance degradation.
- */
- @Test
- void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
- // Initialize filesystem and thread pool manager
- try (FileSystem fileSystem = FileSystem.newInstance(
- getRawConfiguration())) {
- AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
- WriteThreadPoolSizeManager instance =
WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- abfs.getAbfsStore().getAbfsConfiguration(),
getFileSystem().getAbfsClient().getAbfsCounters());
- ThreadPoolExecutor executor =
- (ThreadPoolExecutor) instance.getExecutorService();
-
- // --- Step 1: Simulate idle period ---
- // Let the executor sit idle with no work for a few seconds
- Thread.sleep(WAIT_TIMEOUT_MS);
- int poolSizeAfterIdle = executor.getPoolSize();
-
- // Verify that after idling, the pool is at or close to its minimum size
- Assertions.assertThat(poolSizeAfterIdle)
- .as("Pool size should remain minimal after idle")
- .isLessThanOrEqualTo(executor.getCorePoolSize());
-
- // --- Step 2: Submit a sudden burst of tasks ---
- // Launch many short, CPU-heavy tasks at once to simulate burst load
- int burstLoad = BURST_LOAD;
- CountDownLatch latch = new CountDownLatch(burstLoad);
- for (int i = 0; i < burstLoad; i++) {
- executor.submit(() -> {
- // Busy loop for ~200ms to simulate CPU work
- long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS;
- while (System.currentTimeMillis() < end) {
- Math.sqrt(Math.random()); // burn CPU cycles
- }
- latch.countDown();
- });
- }
-
- // --- Step 3: Give pool time to react ---
- // Wait briefly so the pool’s scaling logic has a chance to expand
- Thread.sleep(LOAD_SLEEP_DURATION_MS);
- int poolSizeDuringBurst = executor.getPoolSize();
-
- // Verify that the pool scaled up compared to idle
- Assertions.assertThat(poolSizeDuringBurst)
- .as("Pool size should increase after burst load")
- .isGreaterThanOrEqualTo(poolSizeAfterIdle);
-
-// --- Step 4: Verify completion ---
-// Ensure all tasks complete successfully in a reasonable time,
-// proving there was no degradation or deadlock under burst load
- Assertions.assertThat(
- latch.await(LATCH_TIMEOUT_SECONDS / 2, TimeUnit.SECONDS))
- .as("All burst tasks should finish in reasonable time")
- .isTrue();
- instance.close();
- }
- }
-
- /**
- * Verifies that when the system experiences low CPU usage,
- * the WriteThreadPoolSizeManager maintains the thread pool size
- * without scaling down and updates the corresponding
- * write thread pool metrics accordingly.
- */
- @Test
- void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
- throws Exception {
- // Initialize filesystem and thread pool manager
- Configuration conf = getRawConfiguration();
- conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
- conf.setInt(AZURE_WRITE_MAX_CONCURRENT_REQUESTS, 2);
- conf.setInt(FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT, 10);
- conf.setInt(FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
- FileSystem fileSystem = FileSystem.newInstance(conf);
- try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
- WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance("fs1",
- abfs.getAbfsStore().getAbfsConfiguration(),
- abfs.getAbfsClient().getAbfsCounters());
- instance.startCPUMonitoring();
-
- // --- Capture initial metrics and stats ---
- AbfsWriteResourceUtilizationMetrics metrics =
- abfs.getAbfsClient()
- .getAbfsCounters()
- .getAbfsWriteResourceUtilizationMetrics();
-
- WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
- ResourceUtilizationUtils.getMemoryLoad(),
- ResourceUtilizationUtils.getUsedHeapMemory(),
- ResourceUtilizationUtils.getAvailableHeapMemory(),
- ResourceUtilizationUtils.getCommittedHeapMemory());
-
- ThreadPoolExecutor executor =
- (ThreadPoolExecutor) instance.getExecutorService();
-
- // No CPU hogs this time — simulate light CPU load
- // Submit lightweight ABFS tasks that barely use CPU
- int taskCount = 10;
- CountDownLatch latch = new CountDownLatch(taskCount);
-
- for (int i = 0; i < taskCount; i++) {
- executor.submit(() -> {
- try {
- // Light operations — minimal CPU load
- for (int j = 0; j < 3; j++) {
- Thread.sleep(HUNDRED); // simulate idle/light wait
- }
- } catch (Exception e) {
- Assertions.fail("Light task failed unexpectedly", e);
- } finally {
- latch.countDown();
- }
- });
- }
-
- // Wait for all tasks to finish
- boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- Assertions.assertThat(finished)
- .as("All lightweight tasks should complete normally")
- .isTrue();
-
- // Allow some time for monitoring and metrics update
- Thread.sleep(SLEEP_DURATION_30S_MS);
-
- WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
- ResourceUtilizationUtils.getMemoryLoad(),
- ResourceUtilizationUtils.getUsedHeapMemory(),
- ResourceUtilizationUtils.getAvailableHeapMemory(),
- ResourceUtilizationUtils.getCommittedHeapMemory());
-
- //--- Validate that metrics and stats changed ---
- Assertions.assertThat(statsAfter)
- .as("Thread pool stats should update after CPU load")
- .isNotEqualTo(statsBefore);
-
- String metricsOutput = metrics.toString();
-
- if (!metricsOutput.isEmpty()) {
- // Assertions for metrics correctness
- Assertions.assertThat(metricsOutput)
- .as("Metrics output should not be empty")
- .isNotEmpty();
-
- Assertions.assertThat(metricsOutput)
- .as("Metrics must include CPU utilization data")
- .contains("SC=");
-
- Assertions.assertThat(metricsOutput)
- .as("Metrics must include memory utilization data")
- .contains("AM=");
-
- Assertions.assertThat(metricsOutput)
- .as("Metrics must include current thread pool size")
- .contains("CP=");
- }
- instance.close();
- }
- }
-
- /**
- * Verifies that the JVM identifier is initialized once and remains
- * constant across multiple invocations within the same JVM process.
- */
- @Test
- public void testJvmIdIsSingletonWithinJvm() {
- int firstId = JvmUniqueIdProvider.getJvmId();
- int secondId = JvmUniqueIdProvider.getJvmId();
- int thirdId = JvmUniqueIdProvider.getJvmId();
-
- assertEquals(firstId, secondId,
- "Subsequent calls to getJvmId() should return the same value");
- assertEquals(secondId, thirdId,
- "JVM-scoped identifier must remain constant for the lifetime of the
JVM");
- }
-
- /**
- * Verifies that the JVM identifier is safely shared across multiple threads
- * and that concurrent access returns the same value.
- *
- * <p>This test ensures that static initialization of the identifier is
- * thread-safe and occurs only once per JVM.</p>
- */
- @Test
- public void testJvmIdIsSameAcrossThreads()
- throws ExecutionException, InterruptedException {
-
- ExecutorService executor = Executors.newFixedThreadPool(4);
-
- try {
- Callable<Integer> task = JvmUniqueIdProvider::getJvmId;
- Future<Integer> f1 = executor.submit(task);
- Future<Integer> f2 = executor.submit(task);
- Future<Integer> f3 = executor.submit(task);
- Future<Integer> f4 = executor.submit(task);
-
- int expectedId = f1.get();
- assertEquals(expectedId, f2.get(),
- "JVM ID should be identical when accessed from different threads");
- assertEquals(expectedId, f3.get(),
- "JVM ID should be identical when accessed concurrently");
- assertEquals(expectedId, f4.get(),
- "JVM ID should be initialized once and shared across all threads");
- } finally {
- executor.shutdown();
- executor.awaitTermination(5, TimeUnit.SECONDS);
- }
- }
-}
-
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 152c6a43f4c..fbf085d1ec0 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -110,7 +110,7 @@ public void testMaxRequestsAndQueueCapacityDefaults()
throws Exception {
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
int maxConcurrentRequests
- = getConfiguration().getWriteConcurrentRequestCount();
+ = getConfiguration().getWriteMaxConcurrentRequestCount();
if (stream.isAppendBlobStream()) {
maxConcurrentRequests = 1;
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 2129f5e20fc..ec080369e19 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -98,7 +98,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
-
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteConcurrentRequestCount())
+
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.withClientHandler(clientHandler)
.withPath(path)
@@ -125,7 +125,6 @@ public void verifyShortWriteRequest() throws Exception {
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(),
any(), any(TracingContext.class)))
@@ -197,7 +196,6 @@ public void verifyWriteRequest() throws Exception {
"test-fs-id", FSOperationType.WRITE,
TracingHeaderFormat.ALL_ID_FORMAT, null);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(),
any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(),
any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
@@ -284,7 +282,6 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws
Exception {
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream(
@@ -367,7 +364,6 @@ public void verifyWriteRequestOfBufferSize() throws
Exception {
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
@@ -431,7 +427,6 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob()
throws Exception {
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
@@ -492,7 +487,6 @@ public void verifyWriteRequestOfBufferSizeAndHFlush()
throws Exception {
abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(),
any(TracingContext.class)))
@@ -569,7 +563,6 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws
Exception {
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
when(client.getAbfsCounters()).thenReturn(abfsCounters);
-
when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new
AbfsWriteResourceUtilizationMetrics());
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1,
abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
@@ -627,7 +620,7 @@ private ExecutorService createExecutorService(
AbfsConfiguration abfsConf) {
ExecutorService executorService =
new
SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
- abfsConf.getWriteConcurrentRequestCount(),
+ abfsConf.getWriteMaxConcurrentRequestCount(),
abfsConf.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-test-bounded"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]