anmolanmol1234 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1843595536


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,348 @@
  */
 package org.apache.hadoop.fs.azurebfs.services;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.azurebfs.statistics.AbstractAbfsStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 
 import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.COLON;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.FILE_LENGTH;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.SIZE_READ_BY_FIRST_READ;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.READ_LEN_REQUESTED;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.READ_COUNT;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.FIRST_OFFSET_DIFF;
+import static 
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static 
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
+/**
+ * This class is responsible for tracking and updating metrics related to 
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+    private static final String FOOTER_LENGTH = "20";
+    private static final List<FileType> FILE_TYPE_LIST = 
Arrays.asList(PARQUET, NON_PARQUET);
+
+    /**
+     * Inner class to handle file type checks.
+     */
+    private static final class CheckFileType {
+        private final AtomicBoolean collectMetrics;
+        private final AtomicBoolean collectMetricsForNextRead;
+        private final AtomicBoolean collectLenMetrics;
+        private final AtomicLong readCount;
+        private final AtomicLong offsetOfFirstRead;
+        private FileType fileType = null;
+        private String sizeReadByFirstRead;
+        private String offsetDiffBetweenFirstAndSecondRead;
+
+        private CheckFileType() {
+            collectMetrics = new AtomicBoolean(false);
+            collectMetricsForNextRead = new AtomicBoolean(false);
+            collectLenMetrics = new AtomicBoolean(false);
+            readCount = new AtomicLong(0);
+            offsetOfFirstRead = new AtomicLong(0);
+        }
+
+        private void updateFileType() {
+            if (fileType == null) {
+                fileType = collectMetrics.get() && readCount.get() >= 2
+                        && haveEqualValues(sizeReadByFirstRead)
+                        && 
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+            }
+        }
+
+        private boolean haveEqualValues(String value) {
+            String[] parts = value.split("_");
+            return parts.length == 2
+                    && parts[0].equals(parts[1]);
+        }
+
+        private void incrementReadCount() {
+            readCount.incrementAndGet();
+        }
+
+        private long getReadCount() {
+            return readCount.get();
+        }
+
+        private void setCollectMetrics(boolean collect) {
+            collectMetrics.set(collect);
+        }
+
+        private boolean getCollectMetrics() {
+            return collectMetrics.get();
+        }
+
+        private void setCollectMetricsForNextRead(boolean collect) {
+            collectMetricsForNextRead.set(collect);
+        }
+
+        private boolean getCollectMetricsForNextRead() {
+            return collectMetricsForNextRead.get();
+        }
+
+        private boolean getCollectLenMetrics() {
+            return collectLenMetrics.get();
+        }
+
+        private void setCollectLenMetrics(boolean collect) {
+            collectLenMetrics.set(collect);
+        }
+
+        private void setOffsetOfFirstRead(long offset) {
+            offsetOfFirstRead.set(offset);
+        }
+
+        private long getOffsetOfFirstRead() {
+            return offsetOfFirstRead.get();
+        }
+
+        private void setSizeReadByFirstRead(String size) {
+            sizeReadByFirstRead = size;
+        }
+
+        private String getSizeReadByFirstRead() {
+            return sizeReadByFirstRead;
+        }
+
+        private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff) 
{
+            offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+        }
 
-public class AbfsReadFooterMetrics {
-  private final AtomicBoolean isParquetFile;
-  private final AtomicBoolean isParquetEvaluated;
-  private final AtomicBoolean isLenUpdated;
-  private String sizeReadByFirstRead;
-  private String offsetDiffBetweenFirstAndSecondRead;
-  private final AtomicLong fileLength;
-  private double avgFileLength;
-  private double avgReadLenRequested;
-  private final AtomicBoolean collectMetrics;
-  private final AtomicBoolean collectMetricsForNextRead;
-  private final AtomicBoolean collectLenMetrics;
-  private final AtomicLong dataLenRequested;
-  private final AtomicLong offsetOfFirstRead;
-  private final AtomicInteger readCount;
-  private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics> 
metricsMap;
-  private static final String FOOTER_LENGTH = "20";
-
-  public AbfsReadFooterMetrics() {
-    this.isParquetFile = new AtomicBoolean(false);
-    this.isParquetEvaluated = new AtomicBoolean(false);
-    this.isLenUpdated = new AtomicBoolean(false);
-    this.fileLength = new AtomicLong();
-    this.readCount = new AtomicInteger(0);
-    this.offsetOfFirstRead = new AtomicLong();
-    this.collectMetrics = new AtomicBoolean(false);
-    this.collectMetricsForNextRead = new AtomicBoolean(false);
-    this.collectLenMetrics = new AtomicBoolean(false);
-    this.dataLenRequested = new AtomicLong(0);
-    this.metricsMap = new ConcurrentSkipListMap<>();
-  }
-
-  public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
-    return metricsMap;
-  }
-
-  private boolean getIsParquetFile() {
-    return isParquetFile.get();
-  }
-
-  public void setIsParquetFile(boolean isParquetFile) {
-    this.isParquetFile.set(isParquetFile);
-  }
-
-  private String getSizeReadByFirstRead() {
-    return sizeReadByFirstRead;
-  }
-
-  public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
-    this.sizeReadByFirstRead = sizeReadByFirstRead;
-  }
-
-  private String getOffsetDiffBetweenFirstAndSecondRead() {
-    return offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  public void setOffsetDiffBetweenFirstAndSecondRead(final String 
offsetDiffBetweenFirstAndSecondRead) {
-    this.offsetDiffBetweenFirstAndSecondRead
-        = offsetDiffBetweenFirstAndSecondRead;
-  }
-
-  private long getFileLength() {
-    return fileLength.get();
-  }
-
-  private void setFileLength(long fileLength) {
-    this.fileLength.set(fileLength);
-  }
-
-  private double getAvgFileLength() {
-    return avgFileLength;
-  }
-
-  public void setAvgFileLength(final double avgFileLength) {
-    this.avgFileLength = avgFileLength;
-  }
-
-  private double getAvgReadLenRequested() {
-    return avgReadLenRequested;
-  }
-
-  public void setAvgReadLenRequested(final double avgReadLenRequested) {
-    this.avgReadLenRequested = avgReadLenRequested;
-  }
-
-  private boolean getCollectMetricsForNextRead() {
-    return collectMetricsForNextRead.get();
-  }
-
-  private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead) 
{
-    this.collectMetricsForNextRead.set(collectMetricsForNextRead);
-  }
-
-  private long getOffsetOfFirstRead() {
-    return offsetOfFirstRead.get();
-  }
-
-  private void setOffsetOfFirstRead(long offsetOfFirstRead) {
-    this.offsetOfFirstRead.set(offsetOfFirstRead);
-  }
-
-  private int getReadCount() {
-    return readCount.get();
-  }
-
-  private void setReadCount(int readCount) {
-    this.readCount.set(readCount);
-  }
-
-  private int incrementReadCount() {
-    this.readCount.incrementAndGet();
-    return getReadCount();
-  }
-
-  private boolean getCollectLenMetrics() {
-    return collectLenMetrics.get();
-  }
-
-  private void setCollectLenMetrics(boolean collectLenMetrics) {
-    this.collectLenMetrics.set(collectLenMetrics);
-
-  }
-
-  private long getDataLenRequested() {
-    return dataLenRequested.get();
-  }
-
-  private void setDataLenRequested(long dataLenRequested) {
-    this.dataLenRequested.set(dataLenRequested);
-  }
-
-  private void updateDataLenRequested(long dataLenRequested){
-    this.dataLenRequested.addAndGet(dataLenRequested);
-  }
-
-  private boolean getCollectMetrics() {
-    return collectMetrics.get();
-  }
-
-  private void setCollectMetrics(boolean collectMetrics) {
-    this.collectMetrics.set(collectMetrics);
-  }
-
-  private boolean getIsParquetEvaluated() {
-    return isParquetEvaluated.get();
-  }
-
-  private void setIsParquetEvaluated(boolean isParquetEvaluated) {
-    this.isParquetEvaluated.set(isParquetEvaluated);
-  }
-
-  private boolean getIsLenUpdated() {
-    return isLenUpdated.get();
-  }
-
-  private void setIsLenUpdated(boolean isLenUpdated) {
-    this.isLenUpdated.set(isLenUpdated);
-  }
-
-  /**
-   * Updates the metrics map with an entry for the specified file if it 
doesn't already exist.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   */
-  public void updateMap(String filePathIdentifier) {
-    // If the file is not already in the metrics map, add it with a new 
AbfsReadFooterMetrics object.
-    metricsMap.computeIfAbsent(filePathIdentifier, key -> new 
AbfsReadFooterMetrics());
-  }
-
-  /**
-   * Checks and updates metrics for a specific file identified by 
filePathIdentifier.
-   * If the metrics do not exist for the file, they are initialized.
-   *
-   * @param filePathIdentifier The unique identifier for the file.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   * @param nextReadPos       The position of the next read operation.
-   */
-  public void checkMetricUpdate(final String filePathIdentifier, final int 
len, final long contentLength,
-      final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
-            filePathIdentifier, key -> new AbfsReadFooterMetrics());
-    if (readFooterMetrics.getReadCount() == 0
-        || (readFooterMetrics.getReadCount() >= 1
-        && readFooterMetrics.getCollectMetrics())) {
-      updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+        private String getOffsetDiffBetweenFirstAndSecondRead() {
+            return offsetDiffBetweenFirstAndSecondRead;
+        }
+
+        private FileType getFileType() {
+            return fileType;
+        }
     }
-  }
-
-  /**
-   * Updates metrics for a specific file identified by filePathIdentifier.
-   *
-   * @param filePathIdentifier  The unique identifier for the file.
-   * @param len                The length of the read operation.
-   * @param contentLength      The total content length of the file.
-   * @param nextReadPos        The position of the next read operation.
-   */
-  private void updateMetrics(final String filePathIdentifier, final int len, 
final long contentLength,
-                             final long nextReadPos) {
-    AbfsReadFooterMetrics readFooterMetrics = 
metricsMap.get(filePathIdentifier);
-
-    // Create a new AbfsReadFooterMetrics object if it doesn't exist in the 
metricsMap.
-    if (readFooterMetrics == null) {
-      readFooterMetrics = new AbfsReadFooterMetrics();
-      metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+    private final Map<String, CheckFileType> checkFileMap = new HashMap<>();
+
+    /**
+     * Constructor to initialize the IOStatisticsStore with counters and 
gauges.
+     */
+    public AbfsReadFooterMetrics() {
+        IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+                .withCounters(getMetricNames(TYPE_COUNTER))
+                .withGauges(getMetricNames(TYPE_GAUGE))
+                .build();
+        setIOStatistics(ioStatisticsStore);
     }
 
-    int readCount;
-    synchronized (this) {
-      readCount = readFooterMetrics.incrementReadCount();
+    private String[] getMetricNames(StatisticTypeEnum type) {
+        return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+                .filter(readFooterMetricsEnum -> 
readFooterMetricsEnum.getStatisticType().equals(type))
+                .flatMap(readFooterMetricsEnum ->
+                        FILE.equals(readFooterMetricsEnum.getType())
+                                ? FILE_TYPE_LIST.stream().map(fileType -> 
fileType + COLON + readFooterMetricsEnum.getName())
+                                : Stream.of(readFooterMetricsEnum.getName()))
+                .toArray(String[]::new);
     }
 
-    if (readCount == 1) {
-      // Update metrics for the first read.
-      updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len, 
contentLength);
+    private long getMetricValue(FileType fileType, AbfsReadFooterMetricsEnum 
metric) {
+        switch (metric.getStatisticType()) {
+            case TYPE_COUNTER:
+                return lookupCounterValue(fileType + COLON + metric.getName());
+            case TYPE_GAUGE:
+                return lookupGaugeValue(fileType + COLON + metric.getName());
+            default:
+                return 0;
+        }
     }
 
-    synchronized (this) {
-      if (readFooterMetrics.getCollectLenMetrics()) {
-        readFooterMetrics.updateDataLenRequested(len);
-      }
+    /**
+     * Updates the value of a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metric the metric to update
+     * @param value the new value of the metric
+     */
+    public void updateMetricValue(FileType fileType, AbfsReadFooterMetricsEnum 
metric, long value) {
+        updateGaugeValue(fileType + COLON + metric.getName(), value);
     }
 
-    if (readCount == 2) {
-      // Update metrics for the second read.
-      updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+    /**
+     * Increments the value of a specific metric.
+     *
+     * @param fileType the type of the file
+     * @param metricName the metric to increment
+     */
+    public void incrementMetricValue(FileType fileType, 
AbfsReadFooterMetricsEnum metricName) {
+        incCounterValue(fileType + COLON + metricName.getName());
     }
-  }
-
-  /**
-   * Updates metrics for the first read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   * @param contentLength     The total content length of the file.
-   */
-  private void updateMetricsOnFirstRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len, long contentLength) {
-    if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH) 
* ONE_KB) {
-      readFooterMetrics.setCollectMetrics(true);
-      readFooterMetrics.setCollectMetricsForNextRead(true);
-      readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
-      readFooterMetrics.setSizeReadByFirstRead(len + "_" + 
Math.abs(contentLength - nextReadPos));
-      readFooterMetrics.setFileLength(contentLength);
+
+    /**
+     * Returns the total number of files.
+     *
+     * @return the total number of files
+     */
+    public Long getTotalFiles() {
+        return getMetricValue(PARQUET, TOTAL_FILES) + 
getMetricValue(NON_PARQUET, TOTAL_FILES);
     }
-  }
-
-  /**
-   * Updates metrics for the second read operation.
-   *
-   * @param readFooterMetrics The metrics object to update.
-   * @param nextReadPos       The position of the next read operation.
-   * @param len               The length of the read operation.
-   */
-  private void updateMetricsOnSecondRead(AbfsReadFooterMetrics 
readFooterMetrics, long nextReadPos, int len) {
-    if (readFooterMetrics.getCollectMetricsForNextRead()) {
-      long offsetDiff = Math.abs(nextReadPos - 
readFooterMetrics.getOffsetOfFirstRead());
-      readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" + 
offsetDiff);
-      readFooterMetrics.setCollectLenMetrics(true);
+
+    /**
+     * Returns the total read count.
+     *
+     * @return the total read count
+     */
+    public Long getTotalReadCount() {
+        return getMetricValue(PARQUET, READ_COUNT) + 
getMetricValue(NON_PARQUET, READ_COUNT);
     }
-  }
-
-
-  /**
-   * Check if the given file should be marked as a Parquet file.
-   *
-   * @param metrics The metrics to evaluate.
-   * @return True if the file meet the criteria for being marked as a Parquet 
file, false otherwise.
-   */
-  private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
-    return metrics.getCollectMetrics()
-            && metrics.getReadCount() >= 2
-            && !metrics.getIsParquetEvaluated()
-            && haveEqualValues(metrics.getSizeReadByFirstRead())
-            && 
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
-  }
-
-  /**
-   * Check if two values are equal, considering they are in the format 
"value1_value2".
-   *
-   * @param value The value to check.
-   * @return True if the two parts of the value are equal, false otherwise.
-   */
-  private boolean haveEqualValues(String value) {
-    String[] parts = value.split("_");
-    return parts.length == 2 && parts[0].equals(parts[1]);
-  }
-
-  /**
-   * Mark the given metrics as a Parquet file and update related values.
-   *
-   * @param metrics The metrics to mark as Parquet.
-   */
-  private void markAsParquet(AbfsReadFooterMetrics metrics) {
-    metrics.setIsParquetFile(true);
-    String[] parts = metrics.getSizeReadByFirstRead().split("_");
-    metrics.setSizeReadByFirstRead(parts[0]);
-    parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-    metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
-    metrics.setIsParquetEvaluated(true);
-  }
-
-  /**
-   * Check each metric in the provided map and mark them as Parquet files if 
they meet the criteria.
-   *
-   * @param metricsMap The map containing metrics to evaluate.
-   */
-  public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
-    for (Map.Entry<String, AbfsReadFooterMetrics> entry : 
metricsMap.entrySet()) {
-      AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
-      if (shouldMarkAsParquet(readFooterMetrics)) {
-        markAsParquet(readFooterMetrics);
-        metricsMap.replace(entry.getKey(), readFooterMetrics);
-      }
+
+    /**
+     * Updates the map with a new file path identifier.
+     *
+     * @param filePathIdentifier the file path identifier
+     */
+    public void updateMap(String filePathIdentifier) {
+        checkFileMap.computeIfAbsent(filePathIdentifier, key -> new 
CheckFileType());
     }
-  }
-
-  /**
-   * Updates the average read length requested for metrics of all files in the 
metrics map.
-   * If the metrics indicate that the update is needed, it calculates the 
average read length and updates the metrics.
-   *
-   * @param metricsMap A map containing metrics for different files with 
unique identifiers.
-   */
-  private void updateLenRequested(Map<String, AbfsReadFooterMetrics> 
metricsMap) {
-    for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
-      if (shouldUpdateLenRequested(readFooterMetrics)) {
-        int readReqCount = readFooterMetrics.getReadCount() - 2;
-        readFooterMetrics.setAvgReadLenRequested(
-                (double) readFooterMetrics.getDataLenRequested() / 
readReqCount);
-        readFooterMetrics.setIsLenUpdated(true);
-      }
+
+    /**
+     * Checks and updates the metrics for a given file read.
+     *
+     * @param filePathIdentifier the file path identifier
+     * @param len the length of the read
+     * @param contentLength the total content length of the file
+     * @param nextReadPos the position of the next read
+     */
+    public void checkMetricUpdate(final String filePathIdentifier, final int 
len, final long contentLength, final long nextReadPos) {
+        CheckFileType checkFileType = 
checkFileMap.computeIfAbsent(filePathIdentifier, key -> new CheckFileType());

Review Comment:
   variable name should be fileType and not for checkFileType, same for the 
inner class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to