[ 
https://issues.apache.org/jira/browse/HADOOP-19472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033173#comment-18033173
 ] 

ASF GitHub Bot commented on HADOOP-19472:
-----------------------------------------

bhattmanish98 commented on code in PR #7669:
URL: https://github.com/apache/hadoop/pull/7669#discussion_r2464543406


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -459,10 +459,43 @@ public static String containerProperty(String property, 
String fsName, String ac
   public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = 
"fs.azure.blob.dir.rename.max.thread";
   /**Maximum number of thread per blob-delete orchestration: {@value}*/
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = 
"fs.azure.blob.dir.delete.max.thread";
+  /**
+   * Configuration key for the keep-alive time for the write thread pool.
+   * This value specifies the amount of time that threads in the write thread 
pool
+   * will remain idle before being terminated.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME = 
"fs.azure.write.threadpool.keep.alive.time";

Review Comment:
   Is this time in millis? If yes, can we put millis at the end in the name and 
in the config key?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java:
##########
@@ -459,10 +459,43 @@ public static String containerProperty(String property, 
String fsName, String ac
   public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD = 
"fs.azure.blob.dir.rename.max.thread";
   /**Maximum number of thread per blob-delete orchestration: {@value}*/
   public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD = 
"fs.azure.blob.dir.delete.max.thread";
+  /**
+   * Configuration key for the keep-alive time for the write thread pool.
+   * This value specifies the amount of time that threads in the write thread 
pool
+   * will remain idle before being terminated.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME = 
"fs.azure.write.threadpool.keep.alive.time";
+
+  public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL = 
"fs.azure.write.cpu.monitoring.interval";
+
+  public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT = 
"fs.azure.write.dynamic.threadpool.enablement";
+
+  public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD = 
"fs.azure.write.high.cpu.threshold";
+
+  public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD = 
"fs.azure.write.medium.cpu.threshold";
+
+  public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD = 
"fs.azure.write.low.cpu.threshold";
+
+  public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.low.tier.memory.multiplier";
+
+  public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.medium.tier.memory.multiplier";
+
+  public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 
"fs.azure.write.high.tier.memory.multiplier";
+
+
+
   /**Flag to enable/disable sending client transactional ID during 
create/rename operations: {@value}*/
   public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = 
"fs.azure.enable.client.transaction.id";
   /**Flag to enable/disable create idempotency during create operation: 
{@value}*/
   public static final String FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = 
"fs.azure.enable.create.blob.idempotency";
 
+  /**

Review Comment:
   Since this config is related to the above newly added ones, should we group 
them together?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java:
##########
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+
+/**
+ * Manages a thread pool for writing operations, adjusting the pool size based 
on CPU utilization.
+ */
+public final class WriteThreadPoolSizeManager implements Closeable {
+
+  /* Maximum allowed size for the thread pool. */
+  private final int maxThreadPoolSize;
+  /* Executor for periodically monitoring CPU usage. */
+  private final ScheduledExecutorService cpuMonitorExecutor;
+  /* Thread pool whose size is dynamically managed. */
+  private volatile ExecutorService boundedThreadPool;
+  /* Lock to ensure thread-safe updates to the thread pool. */
+  private final Lock lock = new ReentrantLock();
+  /* New computed max size for the thread pool after adjustment. */
+  private volatile int newMaxPoolSize;
+  /* Logger instance for logging events from WriteThreadPoolSizeManager. */
+  private static final Logger LOG = LoggerFactory.getLogger(
+      WriteThreadPoolSizeManager.class);
+  /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
+  private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
+      POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
+  /* Name of the filesystem associated with this manager. */
+  private final String filesystemName;
+  /* Initial size for the thread pool when created. */
+  private final int initialPoolSize;
+  /* Initially available heap memory. */
+  private final long initialAvailableHeapMemory;
+  /* The configuration instance. */
+  private final AbfsConfiguration abfsConfiguration;
+
+  /**
+   * Private constructor to initialize the write thread pool and CPU monitor 
executor
+   * based on system resources and ABFS configuration.
+   *
+   * @param filesystemName       Name of the ABFS filesystem.
+   * @param abfsConfiguration    Configuration containing pool size parameters.
+   */
+  private WriteThreadPoolSizeManager(String filesystemName,
+      AbfsConfiguration abfsConfiguration) {
+    this.filesystemName = filesystemName;
+    this.abfsConfiguration = abfsConfiguration;
+    int availableProcessors = Runtime.getRuntime().availableProcessors();
+    /* Get the heap space available when the instance is created */
+    this.initialAvailableHeapMemory = getAvailableHeapMemory();
+    /* Compute the max pool size */
+    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, 
initialAvailableHeapMemory);
+
+    /* Get the initial pool size from config, fallback to at least 1 */
+    this.initialPoolSize = Math.max(1,
+        abfsConfiguration.getWriteConcurrentRequestCount());
+
+    /* Set the upper bound for the thread pool size */
+    this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
+    AtomicInteger threadCount = new AtomicInteger(1);
+    this.boundedThreadPool = Executors.newFixedThreadPool(
+        initialPoolSize,
+        r -> {
+          Thread t = new Thread(r);
+          t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
+          return t;
+        }
+    );
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
+    executor.setKeepAliveTime(
+        abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+    executor.allowCoreThreadTimeOut(true);
+    /* Create a scheduled executor for CPU monitoring and pool adjustment */
+    this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+  }
+
+  /** Returns the internal {@link AbfsConfiguration}. */
+  private AbfsConfiguration getAbfsConfiguration() {
+    return abfsConfiguration;
+  }
+
+  /**
+   * Calculates the max thread pool size using a multiplier based on
+   * memory per core. Higher memory per core results in a larger multiplier.
+   *
+   * @param availableProcessors Number of CPU cores.

Review Comment:
   @param initialAvailableHeapMemory is missing



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java:
##########
@@ -277,11 +278,19 @@ public AzureBlobFileSystemStore(
     }
     this.blockFactory = abfsStoreBuilder.blockFactory;
     this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
-    this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
-        abfsConfiguration.getWriteMaxConcurrentRequestCount(),
-        abfsConfiguration.getMaxWriteRequestsToQueue(),
-        10L, TimeUnit.SECONDS,
-        "abfs-bounded");
+    if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+      this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+          getClient().getFileSystem() + "-" + UUID.randomUUID(),
+          abfsConfiguration);
+      poolSizeManager.startCPUMonitoring();

Review Comment:
   WriteThreadPoolSizeManager.getInstance(...) can return null and this line 
can throw null pointer exception.





> ABFS: Enhance performance of ABFS driver for write-heavy workloads
> ------------------------------------------------------------------
>
>                 Key: HADOOP-19472
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19472
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.1
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Minor
>              Labels: pull-request-available
>             Fix For: 3.4.1
>
>
> The goal of this work item is to enhance the performance of ABFS Driver for 
> write-heavy workloads by improving concurrency within writes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to