[ 
https://issues.apache.org/jira/browse/HADOOP-19737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18040219#comment-18040219
 ] 

ASF GitHub Bot commented on HADOOP-19737:
-----------------------------------------

anujmodi2021 commented on code in PR #8056:
URL: https://github.com/apache/hadoop/pull/8056#discussion_r2554604616


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java:
##########
@@ -244,9 +243,12 @@ public void constructHeader(AbfsHttpOperation 
httpOperation, String previousFail
     if (listener != null) { //for testing
       listener.callTracingHeaderValidator(header, format);
     }
-    
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID,
 header);
+    // If metricHeader is present, append it to the client request ID header 
for tracing
     if (!metricHeader.equals(EMPTY_STRING)) {
-      
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS,
 metricHeader);
+      
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID,
 header + COLON + metricHeader);

Review Comment:
   This would cause disparity in schema. With v2 we have made every field 
mandatory. If metrics are not there we should simply keep empty string but 
colon should always be there.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java:
##########
@@ -398,4 +400,12 @@ public void setReadType(ReadType readType) {
   public ReadType getReadType() {
     return readType;
   }
+
+  /**
+   * Sets the metric results string used for tracing or logging.
+   * @param metricResults the formatted metric data to store.
+   */
+  public void setMetricResults(final String metricResults) {

Review Comment:
   These are only for Thread Pool metrics right?
   Let's rename accordingly.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -831,41 +861,79 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = getJvmCpuLoad();
+    if (cpuLoad > maxCpuUtilization) {
+      maxCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
         "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
         cpuLoad, currentPoolSize, requiredPoolSize);
     if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      lastScaleDirection = "I";
       // Submit more background tasks.
       newThreadPoolSize = Math.min(maxThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
+      if (newThreadPoolSize == maxThreadPoolSize) {
+        lastScaleDirection = "+F";   // Already full, cannot scale up
+      } else {
+        lastScaleDirection = "I";    // Normal scale-up
+      }
       // Create new Worker Threads
-      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
-        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
-        workerRefs.add(worker);
-        workerPool.submit(worker);
+      if ("I".equals(lastScaleDirection)) {

Review Comment:
   Let's define constants.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -831,41 +861,79 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = getJvmCpuLoad();
+    if (cpuLoad > maxCpuUtilization) {
+      maxCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
         "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
         cpuLoad, currentPoolSize, requiredPoolSize);
     if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      lastScaleDirection = "I";
       // Submit more background tasks.
       newThreadPoolSize = Math.min(maxThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
+      if (newThreadPoolSize == maxThreadPoolSize) {

Review Comment:
   There should be a check on currentPoolSize as well. new can become max even 
after upscale



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java:
##########
@@ -38,7 +38,16 @@ public enum TracingHeaderVersion {
    *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
    *         
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
    */
-  V1("v1", 13);
+  V1("v1", 13),
+  /**
+   * Version 1 of the tracing header, which includes a version prefix and has 
13 permanent fields.
+   * This version is used for the current tracing header schema.
+   * Schema: version:clientCorrelationId:clientRequestId:fileSystemId
+   *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
+   *         
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
+   *         :networkLibrary:operationMetrics

Review Comment:
   Why 15?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java:
##########
@@ -328,6 +338,10 @@ public AbfsClientHandler getClientHandler() {
     return clientHandler;
   }
 
+  public WriteThreadPoolSizeManager getPoolSizeManager() {

Review Comment:
   Nit: `getWriteThreadPoolSizeManager`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -105,14 +110,39 @@ public final class ReadBufferManagerV2 extends 
ReadBufferManager {
 
   private static AtomicBoolean isConfigured = new AtomicBoolean(false);
 
+  /* Metrics collector for monitoring the performance of the ABFS read thread 
pool.  */
+  private final AbfsReadThreadPoolMetrics readThreadPoolMetrics;
+
+  /* Last recorded CPU time used for computing CPU utilization deltas.  */
+  private static long lastCpuTime = 0;

Review Comment:
   Remove no longer used variables



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -1346,7 +1346,13 @@ public AbfsRestOperation read(final String path,
     final AbfsUriQueryBuilder abfsUriQueryBuilder = 
createDefaultUriQueryBuilder();
     String sasTokenForReuse = appendSASTokenToQuery(path, 
SASTokenProvider.READ_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
-
+    // Retrieve the read thread pool metrics from the ABFS counters.
+    AbfsReadThreadPoolMetrics metrics = getAbfsCounters()

Review Comment:
   Does `getAbfsReadThreadPoolMetrics()` returns aggregated metrics?
   What defines the period over which these metrics are aggregated?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -242,6 +253,18 @@ public AzureIngressHandler getIngressHandler() {
 
   private volatile boolean switchCompleted = false;
 
+  /**
+   * Starts CPU monitoring in the thread pool size manager if it
+   * is initialized and not already monitoring.
+   */
+  private void initializeMonitoringIfNeeded() {
+    if (poolSizeManager != null && !poolSizeManager.isMonitoringStarted()) {
+      synchronized (this) {
+        poolSizeManager.startCPUMonitoring();
+      }
+    }
+  }

Review Comment:
   There was a discussion to accomodate stopping of resource utilization as 
well if all output streams are closed. Have we taken that?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -831,41 +861,79 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = getJvmCpuLoad();
+    if (cpuLoad > maxCpuUtilization) {
+      maxCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
         "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
         cpuLoad, currentPoolSize, requiredPoolSize);
     if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      lastScaleDirection = "I";
       // Submit more background tasks.
       newThreadPoolSize = Math.min(maxThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
+      if (newThreadPoolSize == maxThreadPoolSize) {
+        lastScaleDirection = "+F";   // Already full, cannot scale up
+      } else {
+        lastScaleDirection = "I";    // Normal scale-up

Review Comment:
   Nit: Redundant, Already set above.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadThreadPoolMetricsEnum.java:
##########
@@ -0,0 +1,95 @@
+/**

Review Comment:
   All of these metrics might be a overhead for some customer who does not wish 
to use new design for read and write and they might not wish to get these 
metrics.
   
   If not already done, can we put this change behind a config so that these 
metrics comes only when enabled.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java:
##########
@@ -83,5 +83,14 @@ String formString(String prefix, String separator, String 
suffix,
 
   AbfsReadFooterMetrics getAbfsReadFooterMetrics();
 
+  void initializeReadMetrics();

Review Comment:
   Nit: rename to accomodate the fact that these are resource management 
related.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -1097,4 +1222,210 @@ private void incrementActiveBufferCount() {
   private void decrementActiveBufferCount() {
     numberOfActiveBuffers.getAndDecrement();
   }
+
+  /**
+   * Returns the process ID (PID) of the currently running JVM.
+   * This method uses {@link ProcessHandle#current()} to obtain the ID of the
+   * Java process.
+   *
+   * @return the PID of the current JVM process
+   */
+  public long getJvmProcessId() {
+    return ProcessHandle.current().pid();
+  }
+
+
+  /**
+   * Represents current statistics of the read thread pool and system.
+   */
+  public static class ReadThreadPoolStats {

Review Comment:
   These again seems to be common for both read and write. We can Move out of 
this file as `ResourceUtilizationMetrics`



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -831,41 +861,79 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = getJvmCpuLoad();
+    if (cpuLoad > maxCpuUtilization) {
+      maxCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
         "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
         cpuLoad, currentPoolSize, requiredPoolSize);
     if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      lastScaleDirection = "I";
       // Submit more background tasks.
       newThreadPoolSize = Math.min(maxThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
+      if (newThreadPoolSize == maxThreadPoolSize) {
+        lastScaleDirection = "+F";   // Already full, cannot scale up
+      } else {
+        lastScaleDirection = "I";    // Normal scale-up
+      }
       // Create new Worker Threads
-      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
-        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
-        workerRefs.add(worker);
-        workerPool.submit(worker);
+      if ("I".equals(lastScaleDirection)) {
+        for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+          ReadBufferWorker worker = new ReadBufferWorker(i,
+              getBufferManager(abfsClient));
+          workerRefs.add(worker);
+          workerPool.submit(worker);
+        }
       }
+      // Capture the latest thread pool statistics (pool size, CPU, memory, 
etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad, maxCpuUtilization);
+      // Update the read thread pool metrics with the latest statistics 
snapshot.
+      readThreadPoolMetrics.update(stats);
       printTraceLog("Increased worker pool size from {} to {}", 
currentPoolSize,
           newThreadPoolSize);
+    } else if (cpuLoad < cpuThreshold && currentPoolSize > requiredPoolSize) {
+      lastScaleDirection = "NA";
+      // Capture the latest thread pool statistics (pool size, CPU, memory, 
etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad, maxCpuUtilization);
+      // Update the read thread pool metrics with the latest statistics 
snapshot.
+      readThreadPoolMetrics.update(stats);
     } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
       newThreadPoolSize = Math.max(minThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
                   / HUNDRED_D));
-      // Signal the extra workers to stop
-      while (workerRefs.size() > newThreadPoolSize) {
-        ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
-        worker.stop();
+      if (newThreadPoolSize == minThreadPoolSize) {

Review Comment:
   Same here. check for current also needed



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -831,41 +861,79 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = getJvmCpuLoad();
+    if (cpuLoad > maxCpuUtilization) {
+      maxCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
         "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
         cpuLoad, currentPoolSize, requiredPoolSize);
     if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      lastScaleDirection = "I";
       // Submit more background tasks.
       newThreadPoolSize = Math.min(maxThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
+      if (newThreadPoolSize == maxThreadPoolSize) {
+        lastScaleDirection = "+F";   // Already full, cannot scale up
+      } else {
+        lastScaleDirection = "I";    // Normal scale-up
+      }
       // Create new Worker Threads
-      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
-        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
-        workerRefs.add(worker);
-        workerPool.submit(worker);
+      if ("I".equals(lastScaleDirection)) {
+        for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+          ReadBufferWorker worker = new ReadBufferWorker(i,
+              getBufferManager(abfsClient));
+          workerRefs.add(worker);
+          workerPool.submit(worker);
+        }
       }
+      // Capture the latest thread pool statistics (pool size, CPU, memory, 
etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad, maxCpuUtilization);

Review Comment:
   Nit: These 2 statements are repeated in if elseif else block. They can be 
moved at last in common flow.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########


Review Comment:
   Do we need to make changes here as well as per the latest finding on 
resource utilization APIs



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -1057,6 +1171,17 @@ public ScheduledExecutorService getCpuMonitoringThread() 
{
     return cpuMonitorThread;
   }
 
+  /**
+   * Returns the maximum JVM CPU utilization observed during the current
+   * monitoring interval or since the last reset.
+   *
+   * @return the highest JVM CPU utilization percentage recorded
+   */
+  @VisibleForTesting
+  public double getMaxCpuUtilization() {

Review Comment:
   Nit: maxJvmCPUUtilization



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java:
##########
@@ -38,7 +38,16 @@ public enum TracingHeaderVersion {
    *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
    *         
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
    */
-  V1("v1", 13);
+  V1("v1", 13),
+  /**
+   * Version 1 of the tracing header, which includes a version prefix and has 
13 permanent fields.
+   * This version is used for the current tracing header schema.
+   * Schema: version:clientCorrelationId:clientRequestId:fileSystemId
+   *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
+   *         
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
+   *         :networkLibrary:operationMetrics

Review Comment:
   nit: instaed of operationMetrics, use resourceUtilizationMetrics



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java:
##########
@@ -38,7 +38,16 @@ public enum TracingHeaderVersion {
    *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
    *         
:position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
    */
-  V1("v1", 13);
+  V1("v1", 13),
+  /**
+   * Version 1 of the tracing header, which includes a version prefix and has 
13 permanent fields.

Review Comment:
   Nit: Edit javadoc to reflect v2 



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java:
##########
@@ -154,15 +173,51 @@ private int getComputedMaxPoolSize(final int 
availableProcessors, long initialAv
    *
    * @return the available heap memory in gigabytes
    */
-  private long getAvailableHeapMemory() {
+  @VisibleForTesting
+  public long getAvailableHeapMemory() {

Review Comment:
   Common methods between read and write.
   Can be moved to some util class



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteThreadPoolMetricsEnum.java:
##########
@@ -0,0 +1,95 @@
+/**

Review Comment:
   Also these are not only threads but memory related metrics as well.
   We can rename to something like: `ABFSResourceUtilizationMetrics`





> ABFS: Add metrics to identify improvements with read and write aggressiveness
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-19737
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19737
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.2
>            Reporter: Anmol Asrani
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>
> Introduces new performance metrics in the ABFS driver to monitor and evaluate 
> the effectiveness of read and write aggressiveness tuning. These metrics help 
> in understanding how thread pool behavior, CPU utilization, and heap 
> availability impact overall I/O throughput and latency. By capturing detailed 
> statistics such as active thread count, pool size, and system resource 
> utilization, this enhancement enables data-driven analysis of optimizations 
> made to improve ABFS read and write performance under varying workloads.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to