[ 
https://issues.apache.org/jira/browse/HADOOP-19311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910264#comment-17910264
 ] 

ASF GitHub Bot commented on HADOOP-19311:
-----------------------------------------

bhattmanish98 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1904265337


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,494 @@
  */
 package org.apache.hadoop.fs.azurebfs.services;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static 
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static 
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to 
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+    private static final String FOOTER_LENGTH = "20";
+    private static final List<FileType> FILE_TYPE_LIST = 
Arrays.asList(PARQUET, NON_PARQUET);
+
+    /**
+     * Inner class to handle file type checks.
+     */
+    private static final class FileTypeMetrics {
+        private final AtomicBoolean collectMetrics;
+        private final AtomicBoolean collectMetricsForNextRead;
+        private final AtomicBoolean collectLenMetrics;
+        private final AtomicLong readCount;
+        private final AtomicLong offsetOfFirstRead;
+        private FileType fileType = null;
+        private String sizeReadByFirstRead;
+        private String offsetDiffBetweenFirstAndSecondRead;
+
+        /**
+         * Constructor to initialize the file type metrics.
+         */
+        private FileTypeMetrics() {
+            collectMetrics = new AtomicBoolean(false);
+            collectMetricsForNextRead = new AtomicBoolean(false);
+            collectLenMetrics = new AtomicBoolean(false);
+            readCount = new AtomicLong(0);
+            offsetOfFirstRead = new AtomicLong(0);
+        }
+
+        /**
+         * Updates the file type based on the metrics collected.
+         */
+        private void updateFileType() {
+            if (fileType == null) {
+                fileType = collectMetrics.get() && readCount.get() >= 2
+                        && haveEqualValues(sizeReadByFirstRead)
+                        && 
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+            }
+        }
+
+        /**
+         * Checks if the given value has equal parts.
+         *
+         * @param value the value to check
+         * @return true if the value has equal parts, false otherwise
+         */
+        private boolean haveEqualValues(String value) {
+            String[] parts = value.split("_");
+            return parts.length == 2
+                    && parts[0].equals(parts[1]);
+        }
+
+        /**
+         * Increments the read count.
+         */
+        private void incrementReadCount() {
+            readCount.incrementAndGet();
+        }
+
+        /**
+         * Returns the read count.
+         *
+         * @return the read count
+         */
+        private long getReadCount() {
+            return readCount.get();
+        }
+
+        /**
+         * Sets the collect metrics flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectMetrics(boolean collect) {
+            collectMetrics.set(collect);
+        }
+
+        /**
+         * Returns the collect metrics flag.
+         *
+         * @return the collect metrics flag
+         */
+        private boolean getCollectMetrics() {
+            return collectMetrics.get();
+        }
+
+        /**
+         * Sets the collect metrics for the next read flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectMetricsForNextRead(boolean collect) {
+            collectMetricsForNextRead.set(collect);
+        }
+
+        /**
+         * Returns the collect metrics for the next read flag.
+         *
+         * @return the collect metrics for the next read flag
+         */
+        private boolean getCollectMetricsForNextRead() {
+            return collectMetricsForNextRead.get();
+        }
+
+        /**
+         * Returns the collect length metrics flag.
+         *
+         * @return the collect length metrics flag
+         */
+        private boolean getCollectLenMetrics() {
+            return collectLenMetrics.get();
+        }
+
+        /**
+         * Sets the collect length metrics flag.
+         *
+         * @param collect the value to set
+         */
+        private void setCollectLenMetrics(boolean collect) {
+            collectLenMetrics.set(collect);
+        }
+
+        /**
+         * Sets the offset of the first read.
+         *
+         * @param offset the value to set
+         */
+        private void setOffsetOfFirstRead(long offset) {
+            offsetOfFirstRead.set(offset);
+        }
+
+        /**
+         * Returns the offset of the first read.
+         *
+         * @return the offset of the first read
+         */
+        private long getOffsetOfFirstRead() {
+            return offsetOfFirstRead.get();
+        }
+
+        /**
+         * Sets the size read by the first read.
+         *
+         * @param size the value to set
+         */
+        private void setSizeReadByFirstRead(String size) {
+            sizeReadByFirstRead = size;
+        }
+
+        /**
+         * Returns the size read by the first read.
+         *
+         * @return the size read by the first read
+         */
+        private String getSizeReadByFirstRead() {
+            return sizeReadByFirstRead;
+        }
 
-public class AbfsReadFooterMetrics {
-  private final AtomicBoolean isParquetFile;
-  private final AtomicBoolean isParquetEvaluated;
-  private final AtomicBoolean isLenUpdated;
-  private String sizeReadByFirstRead;
-  private String offsetDiffBetweenFirstAndSecondRead;
-  private final AtomicLong fileLength;
-  private double avgFileLength;
-  private double avgReadLenRequested;
-  private final AtomicBoolean collectMetrics;
-  private final AtomicBoolean collectMetricsForNextRead;
-  private final AtomicBoolean collectLenMetrics;
-  private final AtomicLong dataLenRequested;
-  private final AtomicLong offsetOfFirstRead;
-  private final AtomicInteger readCount;
-  private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> 
metricsMap;
-  private static final String FOOTER_LENGTH = "20";
-
-  public AbfsReadFooterMetrics() {
-    this.isParquetFile = new AtomicBoolean(false);
-    this.isParquetEvaluated = new AtomicBoolean(false);
-    this.isLenUpdated = new AtomicBoolean(false);
-    this.fileLength = new AtomicLong();
-    this.readCount = new AtomicInteger(0);
-    this.offsetOfFirstRead = new AtomicLong();
-    this.collectMetrics = new AtomicBoolean(false);
-    this.collectMetricsForNextRead = new AtomicBoolean(false);
-    this.collectLenMetrics = new AtomicBoolean(false);
-    this.dataLenRequested = new AtomicLong(0);
-    this.metricsMap = new ConcurrentSkipListMap<>();
-  }
-
-  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
-    return metricsMap;
-  }
-
-  private boolean getIsParquetFile() {
-    return isParquetFile.get();
-  }
-
-  public void setIsParquetFile(boolean isParquetFile) {
-    this.isParquetFile.set(isParquetFile);
-  }
-
-  private String getSizeReadByFirstRead() {
-    return sizeReadByFirstRead;
-  }
-
-  public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
-    this.sizeReadByFirstRead = sizeReadByFirstRead;
-  }
-
-  private String getOffsetDiffBetweenFirstAndSecondRead() {
-    return offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  public void setOffsetDiffBetweenFirstAndSecondRead(final String 
offsetDiffBetweenFirstAndSecondRead) {
-    this.offsetDiffBetweenFirstAndSecondRead
-        = offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  private long getFileLength() {
-    return fileLength.get();
-  }
-
-  private void setFileLength(long fileLength) {
-    this.fileLength.set(fileLength);
-  }
-
-  private double getAvgFileLength() {
-    return avgFileLength;
-  }
-
-  public void setAvgFileLength(final double avgFileLength) {
-    this.avgFileLength = avgFileLength;
-  }
-
-  private double getAvgReadLenRequested() {
-    return avgReadLenRequested;
-  }
-
-  public void setAvgReadLenRequested(final double avgReadLenRequested) {
-    this.avgReadLenRequested = avgReadLenRequested;
-  }
-
-  private boolean getCollectMetricsForNextRead() {
-    return collectMetricsForNextRead.get();
-  }
-
-  private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) 
{
-    this.collectMetricsForNextRead.set(collectMetricsForNextRead);
-  }
-
-  private long getOffsetOfFirstRead() {
-    return offsetOfFirstRead.get();
-  }
-
-  private void setOffsetOfFirstRead(long offsetOfFirstRead) {
-    this.offsetOfFirstRead.set(offsetOfFirstRead);
-  }
-
-  private int getReadCount() {
-    return readCount.get();
-  }
-
-  private void setReadCount(int readCount) {
-    this.readCount.set(readCount);
-  }
-
-  private int incrementReadCount() {
-    this.readCount.incrementAndGet();
-    return getReadCount();
-  }
-
-  private boolean getCollectLenMetrics() {
-    return collectLenMetrics.get();
-  }
-
-  private void setCollectLenMetrics(boolean collectLenMetrics) {
-    this.collectLenMetrics.set(collectLenMetrics);
-
-  }
-
-  private long getDataLenRequested() {
-    return dataLenRequested.get();
-  }
-
-  private void setDataLenRequested(long dataLenRequested) {
-    this.dataLenRequested.set(dataLenRequested);
-  }
-
-  private void updateDataLenRequested(long dataLenRequested){
-    this.dataLenRequested.addAndGet(dataLenRequested);
-  }
-
-  private boolean getCollectMetrics() {
-    return collectMetrics.get();
-  }
-
-  private void setCollectMetrics(boolean collectMetrics) {
-    this.collectMetrics.set(collectMetrics);
-  }
-
-  private boolean getIsParquetEvaluated() {
-    return isParquetEvaluated.get();
-  }
-
-  private void setIsParquetEvaluated(boolean isParquetEvaluated) {
-    this.isParquetEvaluated.set(isParquetEvaluated);
-  }
-
-  private boolean getIsLenUpdated() {
-    return isLenUpdated.get();
-  }
-
-  private void setIsLenUpdated(boolean isLenUpdated) {
-    this.isLenUpdated.set(isLenUpdated);
-  }
-
-  /**
-   * Updates the metrics map with an entry for the specified file if it 
doesn't already exist.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   */
-  public void updateMap(String filePathIdentifier) {
-    // If the file is not already in the metrics map, add it with a new 
AbfsReadFooterMetrics object.
-    metricsMap.computeIfAbsent(filePathIdentifier, key -> new 
AbfsReadFooterMetrics());
-  }
-
-  /**
-   * Checks and updates metrics for a specific file identified by 
filePathIdentifier.
-   * If the metrics do not exist for the file, they are initialized.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   * @param nextReadPos       The position of the next read operation.
-   */
-  public void checkMetricUpdate(final String filePathIdentifier, final int 
len, final long contentLength,
-      final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
-            filePathIdentifier, key -> new AbfsReadFooterMetrics());
-    if (readFooterMetrics.getReadCount() == 0
-        || (readFooterMetrics.getReadCount() >= 1
-        && readFooterMetrics.getCollectMetrics())) {
-      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+        /**
+         * Sets the offset difference between the first and second read.
+         *
+         * @param offsetDiff the value to set
+         */
+        private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff) 
{
+            offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+        }
+
+        /**
+         * Returns the offset difference between the first and second read.
+         *
+         * @return the offset difference between the first and second read
+         */
+        private String getOffsetDiffBetweenFirstAndSecondRead() {
+            return offsetDiffBetweenFirstAndSecondRead;
+        }
+
+        /**
+         * Returns the file type.
+         *
+         * @return the file type
+         */
+        private FileType getFileType() {
+            return fileType;
+        }
     }
-  }
-
-  /**
-   * Updates metrics for a specific file identified by filePathIdentifier.
-   *
-   * @param filePathIdentifier  The unique identifier for the file.
-   * @param len                The length of the read operation.
-   * @param contentLength      The total content length of the file.
-   * @param nextReadPos        The position of the next read operation.
-   */
-  private void updateMetrics(final String filePathIdentifier, final int len, 
final long contentLength,
-                             final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = 
metricsMap.get(filePathIdentifier);
-
-    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the 
metricsMap.
-    if (readFooterMetrics == null) {
-      readFooterMetrics = new AbfsReadFooterMetrics();
-      metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+    private final Map<String, FileTypeMetrics> fileTypeMetricsMap = new 
HashMap<>();
+
+    /**
+     * Constructor to initialize the IOStatisticsStore with counters and mean 
statistics.
+     */
+    public AbfsReadFooterMetrics() {
+        IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+                .withCounters(getMetricNames(TYPE_COUNTER))
+                .withMeanStatistics(getMetricNames(TYPE_MEAN))
+                .build();
+        setIOStatistics(ioStatisticsStore);
     }
 
-    int readCount;
-    synchronized (this) {
-      readCount = readFooterMetrics.incrementReadCount();
+    /**
+     * Returns the metric names for a specific statistic type.
+     *
+     * @param type the statistic type
+     * @return the metric names
+     */
+    private String[] getMetricNames(StatisticTypeEnum type) {
+        return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+                .filter(readFooterMetricsEnum -> 
readFooterMetricsEnum.getStatisticType().equals(type))
+                .flatMap(readFooterMetricsEnum ->
+                        FILE.equals(readFooterMetricsEnum.getType())
+                                ? FILE_TYPE_LIST.stream().map(fileType -> 
fileType + COLON + readFooterMetricsEnum.getName())
+                                : Stream.of(readFooterMetricsEnum.getName()))
+                .toArray(String[]::new);
     }
 
-    if (readCount == 1) {
-      // Update metrics for the first read.
-      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, 
contentLength);
+    /**
+     * Looks up the counter value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metric the metric to look up
+     * @return the counter value
+     */
+    private long getCounterMetricValue(FileType fileType, 
AbfsReadFooterMetricsEnum metric) {
+        return lookupCounterValue(fileType + COLON + metric.getName());
     }
 
-    synchronized (this) {
-      if (readFooterMetrics.getCollectLenMetrics()) {
-        readFooterMetrics.updateDataLenRequested(len);
-      }
+    /**
+     * Looks up the mean statistic value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metric the metric to look up
+     * @return the mean statistic value
+     */
+    private String getMeanMetricValue(FileType fileType, 
AbfsReadFooterMetricsEnum metric) {
+        return format(DOUBLE_PRECISION_FORMAT, lookupMeanStatistic(fileType + 
COLON + metric.getName()));
     }
 
-    if (readCount == 2) {
-      // Update metrics for the second read.
-      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+    /**
+     * Increments the value of a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metricName the metric to increment
+     */
+    public void incrementMetricValue(FileType fileType, 
AbfsReadFooterMetricsEnum metricName) {
+        incCounterValue(fileType + COLON + metricName.getName());
     }
-  }
-
-  /**
-   * Updates metrics for the first read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   */
-  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len, long contentLength) {
-    if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) 
* ONE_KB) {
-      readFooterMetrics.setCollectMetrics(true);
-      readFooterMetrics.setCollectMetricsForNextRead(true);
-      readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
-      readFooterMetrics.setSizeReadByFirstRead(len + "_" + 
Math.abs(contentLength - nextReadPos));
-      readFooterMetrics.setFileLength(contentLength);
+
+    /**
+     * Adds a mean statistic value for a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metricName the metric to update
+     * @param value the new value of the metric
+     */
+    public void addMeanMetricValue(FileType fileType, 
AbfsReadFooterMetricsEnum metricName, long value) {
+        addMeanStatistic(fileType + COLON + metricName.getName(), value);
     }
-  }
-
-  /**
-   * Updates metrics for the second read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   */
-  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len) {
-    if (readFooterMetrics.getCollectMetricsForNextRead()) {
-      long offsetDiff = Math.abs(nextReadPos - 
readFooterMetrics.getOffsetOfFirstRead());
-      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + 
offsetDiff);
-      readFooterMetrics.setCollectLenMetrics(true);
+
+    /**
+     * Returns the total number of files.
+     *
+     * @return the total number of files
+     */
+    public Long getTotalFiles() {
+        return getCounterMetricValue(PARQUET, TOTAL_FILES) + 
getCounterMetricValue(NON_PARQUET, TOTAL_FILES);
     }
-  }
-
-
-  /**
-   * Check if the given file should be marked as a Parquet file.
-   *
-   * @param metrics The metrics to evaluate.
-   * @return True if the file meet the criteria for being marked as a Parquet 
file, false otherwise.
-   */
-  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
-    return metrics.getCollectMetrics()
-            && metrics.getReadCount() >= 2
-            && !metrics.getIsParquetEvaluated()
-            && haveEqualValues(metrics.getSizeReadByFirstRead())
-            && 
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
-  }
-
-  /**
-   * Check if two values are equal, considering they are in the format 
"value1_value2".
-   *
-   * @param value The value to check.
-   * @return True if the two parts of the value are equal, false otherwise.
-   */
-  private boolean haveEqualValues(String value) {
-    String[] parts = value.split("_");
-    return parts.length == 2 && parts[0].equals(parts[1]);
-  }
-
-  /**
-   * Mark the given metrics as a Parquet file and update related values.
-   *
-   * @param metrics The metrics to mark as Parquet.
-   */
-  private void markAsParquet(AbfsReadFooterMetrics metrics) {
-    metrics.setIsParquetFile(true);
-    String[] parts = metrics.getSizeReadByFirstRead().split("_");
-    metrics.setSizeReadByFirstRead(parts[0]);
-    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
-    metrics.setIsParquetEvaluated(true);
-  }
-
-  /**
-   * Check each metric in the provided map and mark them as Parquet files if 
they meet the criteria.
-   *
-   * @param metricsMap The map containing metrics to evaluate.
-   */
-  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
-    for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
-      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
-      if (shouldMarkAsParquet(readFooterMetrics)) {
-        markAsParquet(readFooterMetrics);
-        metricsMap.replace(entry.getKey(), readFooterMetrics);
-      }
+
+    /**
+     * Updates the map with a new file path identifier.
+     *
+     * @param filePathIdentifier the file path identifier
+     */
+    public void updateMap(String filePathIdentifier) {
+        fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new 
FileTypeMetrics());
     }
-  }
-
-  /**
-   * Updates the average read length requested for metrics of all files in the 
metrics map.
-   * If the metrics indicate that the update is needed, it calculates the 
average read length and updates the metrics.
-   *
-   * @param metricsMap A map containing metrics for different files with 
unique identifiers.
-   */
-  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> 
metricsMap) {
-    for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
-      if (shouldUpdateLenRequested(readFooterMetrics)) {
-        int readReqCount = readFooterMetrics.getReadCount() - 2;
-        readFooterMetrics.setAvgReadLenRequested(
-                (double) readFooterMetrics.getDataLenRequested() / 
readReqCount);
-        readFooterMetrics.setIsLenUpdated(true);
-      }
+
+    /**
+     * Checks and updates the metrics for a given file read.
+     *
+     * @param filePathIdentifier the file path identifier
+     * @param len the length of the read
+     * @param contentLength the total content length of the file
+     * @param nextReadPos the position of the next read
+     */
+    public void updateReadMetrics(final String filePathIdentifier,
+                                  final int len,
+                                  final long contentLength,
+                                  final long nextReadPos) {
+        FileTypeMetrics fileTypeMetrics = 
fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new 
FileTypeMetrics());
+        if (fileTypeMetrics.getReadCount() == 0 || 
(fileTypeMetrics.getReadCount() >= 1 && fileTypeMetrics.getCollectMetrics())) {
+            updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
+        }
     }
-  }
-
-  /**
-   * Checks whether the average read length requested should be updated for 
the given metrics.
-   *
-   * The method returns true if the following conditions are met:
-   * - Metrics collection is enabled.
-   * - The number of read counts is greater than 2.
-   * - The average read length has not been updated previously.
-   *
-   * @param readFooterMetrics The metrics object to evaluate.
-   * @return True if the average read length should be updated, false 
otherwise.
-   */
-  private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics 
readFooterMetrics) {
-    return readFooterMetrics.getCollectMetrics()
-            && readFooterMetrics.getReadCount() > 2
-            && !readFooterMetrics.getIsLenUpdated();
-  }
-
-  /**
-   * Calculates the average metrics from a list of AbfsReadFooterMetrics and 
sets the values in the provided 'avgParquetReadFooterMetrics' object.
-   *
-   * @param isParquetList The list of AbfsReadFooterMetrics to compute the 
averages from.
-   * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics 
object to store the computed average values.
-   *
-   * This method calculates various average metrics from the provided list and 
sets them in the 'avgParquetReadFooterMetrics' object.
-   * The metrics include:
-   * - Size read by the first read
-   * - Offset difference between the first and second read
-   * - Average file length
-   * - Average requested read length
-   */
-  private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isParquetList,
-      AbfsReadFooterMetrics avgParquetReadFooterMetrics){
-    avgParquetReadFooterMetrics.setSizeReadByFirstRead(
-        String.format("%.3f", isParquetList.stream()
-            .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
-                Double::parseDouble).average().orElse(0.0)));
-    avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
-        String.format("%.3f", isParquetList.stream()
-            .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
-            .mapToDouble(Double::parseDouble).average().orElse(0.0)));
-    avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
-        
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-    avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
-        map(AbfsReadFooterMetrics::getAvgReadLenRequested).
-        mapToDouble(Double::doubleValue).average().orElse(0.0));
-  }
-
-  /**
-   * Calculates the average metrics from a list of non-Parquet 
AbfsReadFooterMetrics instances.
-   *
-   * This method takes a list of AbfsReadFooterMetrics representing 
non-Parquet reads and calculates
-   * the average values for the size read by the first read and the offset 
difference between the first
-   * and second read. The averages are then set in the provided 
AbfsReadFooterMetrics instance.
-   *
-   * @param isNonParquetList A list of AbfsReadFooterMetrics instances 
representing non-Parquet reads.
-   * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance 
to store the calculated averages.
-   *                                      It is assumed that the size of the 
list is at least 1, and the first
-   *                                      element of the list is used to 
determine the size of arrays.
-   *                                      The instance is modified in-place 
with the calculated averages.
-   *
-   *
-   **/
-  private void 
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics> 
isNonParquetList,
-                                                     AbfsReadFooterMetrics 
avgNonParquetReadFooterMetrics) {
-    int size = 
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
-    double[] store = new double[2 * size];
-    // Calculating sum of individual values
-    isNonParquetList.forEach(abfsReadFooterMetrics -> {
-      String[] firstReadSize = 
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
-      String[] offDiffFirstSecondRead = 
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-
-      for (int i = 0; i < firstReadSize.length; i++) {
-        store[i] += Long.parseLong(firstReadSize[i]);
-        store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
-      }
-    });
-
-    // Calculating averages and creating formatted strings
-    StringJoiner firstReadSize = new StringJoiner("_");
-    StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
-
-    for (int j = 0; j < size; j++) {
-      firstReadSize.add(String.format("%.3f", store[j] / 
isNonParquetList.size()));
-      offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] / 
isNonParquetList.size()));
+
+    /**
+     * Updates metrics for a specific file identified by filePathIdentifier.
+     *
+     * @param fileTypeMetrics    File metadata to know file type.
+     * @param len                The length of the read operation.
+     * @param contentLength      The total content length of the file.
+     * @param nextReadPos        The position of the next read operation.
+     */
+    private void updateMetrics(FileTypeMetrics fileTypeMetrics,
+                               int len,
+                               long contentLength,
+                               long nextReadPos) {
+        synchronized (this) {
+            fileTypeMetrics.incrementReadCount();
+        }
+
+        long readCount = fileTypeMetrics.getReadCount();
+
+        if (readCount == 1) {
+            handleFirstRead(fileTypeMetrics, nextReadPos, len, contentLength);
+        } else if (readCount == 2) {
+            handleSecondRead(fileTypeMetrics, nextReadPos, len, contentLength);
+        } else {
+            handleFurtherRead(fileTypeMetrics, len);
+        }
     }
 
-    
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
-    
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
-    avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
-            
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-    
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
-            
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
-  }
-
-  /*
-  Acronyms:
-  1.FR :- First Read (In case of parquet we only maintain the size requested 
by application for
-  the first read, in case of non parquet we maintain a string separated by "_" 
delimiter where the first
-  substring represents the len requested for first read and the second 
substring represents the seek pointer difference from the
-  end of the file.)
-  2.SR :- Second Read (In case of parquet we only maintain the size requested 
by application for
-  the second read, in case of non parquet we maintain a string separated by 
"_" delimiter where the first
-  substring represents the len requested for second read and the second 
substring represents the seek pointer difference from the
-  offset of the first read.)
-  3.FL :- Total length of the file requested for read
-   */
-  public String getReadFooterMetrics(AbfsReadFooterMetrics 
avgReadFooterMetrics) {
-    String readFooterMetric = "";
-    if (avgReadFooterMetrics.getIsParquetFile()) {
-      readFooterMetric += "$Parquet:";
-    } else {
-      readFooterMetric += "$NonParquet:";
+    /**
+     * Handles the first read operation by checking if the current read 
position is near the end of the file.
+     * If it is, updates the {@link FileTypeMetrics} object to enable metrics 
collection and records the first read's
+     * offset and size.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object to update 
with metrics and read details.
+     * @param nextReadPos The position where the next read will start.
+     * @param len The length of the current read operation.
+     * @param contentLength The total length of the file content.
+     */
+    private void handleFirstRead(FileTypeMetrics fileTypeMetrics,
+                                 long nextReadPos,
+                                 int len,
+                                 long contentLength) {
+        if (nextReadPos >= contentLength - (long) 
Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
+            fileTypeMetrics.setCollectMetrics(true);
+            fileTypeMetrics.setCollectMetricsForNextRead(true);
+            fileTypeMetrics.setOffsetOfFirstRead(nextReadPos);
+            fileTypeMetrics.setSizeReadByFirstRead(len + "_" + 
Math.abs(contentLength - nextReadPos));
+        }
     }
-    readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
-        + "$SR="
-        + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
-        + "$FL=" + String.format("%.3f",
-        avgReadFooterMetrics.getAvgFileLength())
-        + "$RL=" + String.format("%.3f",
-        avgReadFooterMetrics.getAvgReadLenRequested());
-    return readFooterMetric;
-  }
 
-/**
- * Retrieves and aggregates read footer metrics for both Parquet and 
non-Parquet files from a list
- * of AbfsReadFooterMetrics instances. The function calculates the average 
metrics separately for
- * Parquet and non-Parquet files and returns a formatted string containing the 
aggregated metrics.
- *
- * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances 
containing read footer metrics
- *                              for both Parquet and non-Parquet files.
- *
- * @return A formatted string containing the aggregated read footer metrics 
for both Parquet and non-Parquet files.
- *
- **/
-private String getFooterMetrics(List<AbfsReadFooterMetrics> 
readFooterMetricsList) {
-  List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
-  List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
-  for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
-    if (abfsReadFooterMetrics.getIsParquetFile()) {
-      isParquetList.add(abfsReadFooterMetrics);
-    } else {
-      if (abfsReadFooterMetrics.getReadCount() >= 2) {
-        isNonParquetList.add(abfsReadFooterMetrics);
-      }
+    /**
+     * Handles the second read operation by checking if metrics collection is 
enabled for the next read.
+     * If it is, calculates the offset difference between the first and second 
reads, updates the {@link FileTypeMetrics}
+     * object with this information, and sets the file type. Then, updates the 
metrics data.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object to update 
with metrics and read details.
+     * @param nextReadPos The position where the next read will start.
+     * @param len The length of the current read operation.
+     * @param contentLength The total length of the file content.
+     */
+    private void handleSecondRead(FileTypeMetrics fileTypeMetrics,
+                                  long nextReadPos,
+                                  int len,
+                                  long contentLength) {
+        if (fileTypeMetrics.getCollectMetricsForNextRead()) {
+            long offsetDiff = Math.abs(nextReadPos - 
fileTypeMetrics.getOffsetOfFirstRead());
+            fileTypeMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + 
offsetDiff);
+            fileTypeMetrics.setCollectLenMetrics(true);
+            fileTypeMetrics.updateFileType();
+            updateMetricsData(fileTypeMetrics, len, contentLength);
+        }
     }
-  }
-  AbfsReadFooterMetrics avgParquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
-  AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new 
AbfsReadFooterMetrics();
-  String readFooterMetric = "";
-  if (!isParquetList.isEmpty()) {
-    avgParquetReadFooterMetrics.setIsParquetFile(true);
-    getParquetReadFooterMetricsAverage(isParquetList, 
avgParquetReadFooterMetrics);
-    readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
-  }
-  if (!isNonParquetList.isEmpty()) {
-    avgNonparquetReadFooterMetrics.setIsParquetFile(false);
-    getNonParquetReadFooterMetricsAverage(isNonParquetList, 
avgNonparquetReadFooterMetrics);
-    readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
-  }
-  return readFooterMetric;
-}
 
+    /**
+     * Handles further read operations beyond the second read. If metrics 
collection is enabled and the file type is set,
+     * updates the read length requested and increments the read count for the 
specific file type.
+     *
+     * @param fileTypeMetrics The {@link FileTypeMetrics} object containing 
metrics and read details.
+     * @param len The length of the current read operation.
+     */

Review Comment:
   Yes, we need synchronization only to update Mean Statistic, and as rightly 
mentioned it is already Concurrent so we don't need this. Reverted this.





> [ABFS] Implement Backoff and Read Footer metrics using IOStatistics Class
> -------------------------------------------------------------------------
>
>                 Key: HADOOP-19311
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19311
>             Project: Hadoop Common
>          Issue Type: Task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Manish Bhatt
>            Assignee: Manish Bhatt
>            Priority: Major
>              Labels: pull-request-available
>
> Current Flow: We have implemented metrics collection in ABFS flow. We have 
> created a custom AbfsBackoffMetrics and AbfsReadFooterMetrics class which 
> stores all the metrics on the file system level. Our objective is to move 
> away from the custom class implementation and use IOStatisticsStore to store 
> the metrics which is present in hadoop-common.
> Changes Made: This PR contains the changes related to storing metrics related 
> to above mentioned classes in IOStatisticStore which is present in 
> hadoop-common. AbstractAbfsStatisticsSource abstract class is created which 
> is implementing IOStatisticsSource interface. This will store IOStatistics of 
> the child metrics class.
> Both AbfsBackoffMetrics and AbfsReadFooterMetrics is inheriting 
> AbstractAbfsStatisticsSource and store the respective metrics in 
> IOStatisticsStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to