bhattmanish98 commented on code in PR #7669:
URL: https://github.com/apache/hadoop/pull/7669#discussion_r2464543406
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -459,10 +459,43 @@ public static String containerProperty(String property,
String fsName, String ac
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD =
"fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration: {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD =
"fs.azure.blob.dir.delete.max.thread";
+ /**
+ * Configuration key for the keep-alive time for the write thread pool.
+ * This value specifies the amount of time that threads in the write thread
pool
+ * will remain idle before being terminated.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME =
"fs.azure.write.threadpool.keep.alive.time";
Review Comment:
Is this time in millis? If yes, can we put millis at the end in the name and
in the config key?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -459,10 +459,43 @@ public static String containerProperty(String property,
String fsName, String ac
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD =
"fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration: {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD =
"fs.azure.blob.dir.delete.max.thread";
+ /**
+ * Configuration key for the keep-alive time for the write thread pool.
+ * This value specifies the amount of time that threads in the write thread
pool
+ * will remain idle before being terminated.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME =
"fs.azure.write.threadpool.keep.alive.time";
+
+ public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL =
"fs.azure.write.cpu.monitoring.interval";
+
+ public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT =
"fs.azure.write.dynamic.threadpool.enablement";
+
+ public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD =
"fs.azure.write.high.cpu.threshold";
+
+ public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD =
"fs.azure.write.medium.cpu.threshold";
+
+ public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD =
"fs.azure.write.low.cpu.threshold";
+
+ public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.low.tier.memory.multiplier";
+
+ public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.medium.tier.memory.multiplier";
+
+ public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.high.tier.memory.multiplier";
+
+
+
/**Flag to enable/disable sending client transactional ID during
create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
"fs.azure.enable.client.transaction.id";
/**Flag to enable/disable create idempotency during create operation:
{@value}*/
public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY =
"fs.azure.enable.create.blob.idempotency";
+ /**
Review Comment:
Since this config is related to the above newly added ones, should we group
them together?
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java:
##########
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+
+/**
+ * Manages a thread pool for writing operations, adjusting the pool size based
on CPU utilization.
+ */
+public final class WriteThreadPoolSizeManager implements Closeable {
+
+ /* Maximum allowed size for the thread pool. */
+ private final int maxThreadPoolSize;
+ /* Executor for periodically monitoring CPU usage. */
+ private final ScheduledExecutorService cpuMonitorExecutor;
+ /* Thread pool whose size is dynamically managed. */
+ private volatile ExecutorService boundedThreadPool;
+ /* Lock to ensure thread-safe updates to the thread pool. */
+ private final Lock lock = new ReentrantLock();
+ /* New computed max size for the thread pool after adjustment. */
+ private volatile int newMaxPoolSize;
+ /* Logger instance for logging events from WriteThreadPoolSizeManager. */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ WriteThreadPoolSizeManager.class);
+ /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
+ private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
+ POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
+ /* Name of the filesystem associated with this manager. */
+ private final String filesystemName;
+ /* Initial size for the thread pool when created. */
+ private final int initialPoolSize;
+ /* Initially available heap memory. */
+ private final long initialAvailableHeapMemory;
+ /* The configuration instance. */
+ private final AbfsConfiguration abfsConfiguration;
+
+ /**
+ * Private constructor to initialize the write thread pool and CPU monitor
executor
+ * based on system resources and ABFS configuration.
+ *
+ * @param filesystemName Name of the ABFS filesystem.
+ * @param abfsConfiguration Configuration containing pool size parameters.
+ */
+ private WriteThreadPoolSizeManager(String filesystemName,
+ AbfsConfiguration abfsConfiguration) {
+ this.filesystemName = filesystemName;
+ this.abfsConfiguration = abfsConfiguration;
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ /* Get the heap space available when the instance is created */
+ this.initialAvailableHeapMemory = getAvailableHeapMemory();
+ /* Compute the max pool size */
+ int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors,
initialAvailableHeapMemory);
+
+ /* Get the initial pool size from config, fallback to at least 1 */
+ this.initialPoolSize = Math.max(1,
+ abfsConfiguration.getWriteConcurrentRequestCount());
+
+ /* Set the upper bound for the thread pool size */
+ this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
+ AtomicInteger threadCount = new AtomicInteger(1);
+ this.boundedThreadPool = Executors.newFixedThreadPool(
+ initialPoolSize,
+ r -> {
+ Thread t = new Thread(r);
+ t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
+ return t;
+ }
+ );
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
+ executor.setKeepAliveTime(
+ abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+ /* Create a scheduled executor for CPU monitoring and pool adjustment */
+ this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+ }
+
+ /** Returns the internal {@link AbfsConfiguration}. */
+ private AbfsConfiguration getAbfsConfiguration() {
+ return abfsConfiguration;
+ }
+
+ /**
+ * Calculates the max thread pool size using a multiplier based on
+ * memory per core. Higher memory per core results in a larger multiplier.
+ *
+ * @param availableProcessors Number of CPU cores.
Review Comment:
@param initialAvailableHeapMemory is missing
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -277,11 +278,19 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
- this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
- abfsConfiguration.getWriteMaxConcurrentRequestCount(),
- abfsConfiguration.getMaxWriteRequestsToQueue(),
- 10L, TimeUnit.SECONDS,
- "abfs-bounded");
+ if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+ this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+ getClient().getFileSystem() + "-" + UUID.randomUUID(),
+ abfsConfiguration);
+ poolSizeManager.startCPUMonitoring();
Review Comment:
WriteThreadPoolSizeManager.getInstance(...) can return null and this line
can throw null pointer exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]