bhattmanish98 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1904258316
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,494 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+ private static final String FOOTER_LENGTH = "20";
+ private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(PARQUET, NON_PARQUET);
+
+ /**
+ * Inner class to handle file type checks.
+ */
+ private static final class FileTypeMetrics {
+ private final AtomicBoolean collectMetrics;
+ private final AtomicBoolean collectMetricsForNextRead;
+ private final AtomicBoolean collectLenMetrics;
+ private final AtomicLong readCount;
+ private final AtomicLong offsetOfFirstRead;
+ private FileType fileType = null;
+ private String sizeReadByFirstRead;
+ private String offsetDiffBetweenFirstAndSecondRead;
+
+ /**
+ * Constructor to initialize the file type metrics.
+ */
+ private FileTypeMetrics() {
+ collectMetrics = new AtomicBoolean(false);
+ collectMetricsForNextRead = new AtomicBoolean(false);
+ collectLenMetrics = new AtomicBoolean(false);
+ readCount = new AtomicLong(0);
+ offsetOfFirstRead = new AtomicLong(0);
+ }
+
+ /**
+ * Updates the file type based on the metrics collected.
+ */
+ private void updateFileType() {
+ if (fileType == null) {
+ fileType = collectMetrics.get() && readCount.get() >= 2
+ && haveEqualValues(sizeReadByFirstRead)
+ &&
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+ }
+ }
+
+ /**
+ * Checks if the given value has equal parts.
+ *
+ * @param value the value to check
+ * @return true if the value has equal parts, false otherwise
+ */
+ private boolean haveEqualValues(String value) {
+ String[] parts = value.split("_");
+ return parts.length == 2
+ && parts[0].equals(parts[1]);
+ }
+
+ /**
+ * Increments the read count.
+ */
+ private void incrementReadCount() {
+ readCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the read count.
+ *
+ * @return the read count
+ */
+ private long getReadCount() {
+ return readCount.get();
+ }
+
+ /**
+ * Sets the collect metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetrics(boolean collect) {
+ collectMetrics.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics flag.
+ *
+ * @return the collect metrics flag
+ */
+ private boolean getCollectMetrics() {
+ return collectMetrics.get();
+ }
+
+ /**
+ * Sets the collect metrics for the next read flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetricsForNextRead(boolean collect) {
+ collectMetricsForNextRead.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics for the next read flag.
+ *
+ * @return the collect metrics for the next read flag
+ */
+ private boolean getCollectMetricsForNextRead() {
+ return collectMetricsForNextRead.get();
+ }
+
+ /**
+ * Returns the collect length metrics flag.
+ *
+ * @return the collect length metrics flag
+ */
+ private boolean getCollectLenMetrics() {
+ return collectLenMetrics.get();
+ }
+
+ /**
+ * Sets the collect length metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectLenMetrics(boolean collect) {
+ collectLenMetrics.set(collect);
+ }
+
+ /**
+ * Sets the offset of the first read.
+ *
+ * @param offset the value to set
+ */
+ private void setOffsetOfFirstRead(long offset) {
+ offsetOfFirstRead.set(offset);
+ }
+
+ /**
+ * Returns the offset of the first read.
+ *
+ * @return the offset of the first read
+ */
+ private long getOffsetOfFirstRead() {
+ return offsetOfFirstRead.get();
+ }
+
+ /**
+ * Sets the size read by the first read.
+ *
+ * @param size the value to set
+ */
+ private void setSizeReadByFirstRead(String size) {
+ sizeReadByFirstRead = size;
+ }
+
+ /**
+ * Returns the size read by the first read.
+ *
+ * @return the size read by the first read
+ */
+ private String getSizeReadByFirstRead() {
+ return sizeReadByFirstRead;
+ }
-public class AbfsReadFooterMetrics {
- private final AtomicBoolean isParquetFile;
- private final AtomicBoolean isParquetEvaluated;
- private final AtomicBoolean isLenUpdated;
- private String sizeReadByFirstRead;
- private String offsetDiffBetweenFirstAndSecondRead;
- private final AtomicLong fileLength;
- private double avgFileLength;
- private double avgReadLenRequested;
- private final AtomicBoolean collectMetrics;
- private final AtomicBoolean collectMetricsForNextRead;
- private final AtomicBoolean collectLenMetrics;
- private final AtomicLong dataLenRequested;
- private final AtomicLong offsetOfFirstRead;
- private final AtomicInteger readCount;
- private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics>
metricsMap;
- private static final String FOOTER_LENGTH = "20";
-
- public AbfsReadFooterMetrics() {
- this.isParquetFile = new AtomicBoolean(false);
- this.isParquetEvaluated = new AtomicBoolean(false);
- this.isLenUpdated = new AtomicBoolean(false);
- this.fileLength = new AtomicLong();
- this.readCount = new AtomicInteger(0);
- this.offsetOfFirstRead = new AtomicLong();
- this.collectMetrics = new AtomicBoolean(false);
- this.collectMetricsForNextRead = new AtomicBoolean(false);
- this.collectLenMetrics = new AtomicBoolean(false);
- this.dataLenRequested = new AtomicLong(0);
- this.metricsMap = new ConcurrentSkipListMap<>();
- }
-
- public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
- return metricsMap;
- }
-
- private boolean getIsParquetFile() {
- return isParquetFile.get();
- }
-
- public void setIsParquetFile(boolean isParquetFile) {
- this.isParquetFile.set(isParquetFile);
- }
-
- private String getSizeReadByFirstRead() {
- return sizeReadByFirstRead;
- }
-
- public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
- this.sizeReadByFirstRead = sizeReadByFirstRead;
- }
-
- private String getOffsetDiffBetweenFirstAndSecondRead() {
- return offsetDiffBetweenFirstAndSecondRead;
- }
-
- public void setOffsetDiffBetweenFirstAndSecondRead(final String
offsetDiffBetweenFirstAndSecondRead) {
- this.offsetDiffBetweenFirstAndSecondRead
- = offsetDiffBetweenFirstAndSecondRead;
- }
-
- private long getFileLength() {
- return fileLength.get();
- }
-
- private void setFileLength(long fileLength) {
- this.fileLength.set(fileLength);
- }
-
- private double getAvgFileLength() {
- return avgFileLength;
- }
-
- public void setAvgFileLength(final double avgFileLength) {
- this.avgFileLength = avgFileLength;
- }
-
- private double getAvgReadLenRequested() {
- return avgReadLenRequested;
- }
-
- public void setAvgReadLenRequested(final double avgReadLenRequested) {
- this.avgReadLenRequested = avgReadLenRequested;
- }
-
- private boolean getCollectMetricsForNextRead() {
- return collectMetricsForNextRead.get();
- }
-
- private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead)
{
- this.collectMetricsForNextRead.set(collectMetricsForNextRead);
- }
-
- private long getOffsetOfFirstRead() {
- return offsetOfFirstRead.get();
- }
-
- private void setOffsetOfFirstRead(long offsetOfFirstRead) {
- this.offsetOfFirstRead.set(offsetOfFirstRead);
- }
-
- private int getReadCount() {
- return readCount.get();
- }
-
- private void setReadCount(int readCount) {
- this.readCount.set(readCount);
- }
-
- private int incrementReadCount() {
- this.readCount.incrementAndGet();
- return getReadCount();
- }
-
- private boolean getCollectLenMetrics() {
- return collectLenMetrics.get();
- }
-
- private void setCollectLenMetrics(boolean collectLenMetrics) {
- this.collectLenMetrics.set(collectLenMetrics);
-
- }
-
- private long getDataLenRequested() {
- return dataLenRequested.get();
- }
-
- private void setDataLenRequested(long dataLenRequested) {
- this.dataLenRequested.set(dataLenRequested);
- }
-
- private void updateDataLenRequested(long dataLenRequested){
- this.dataLenRequested.addAndGet(dataLenRequested);
- }
-
- private boolean getCollectMetrics() {
- return collectMetrics.get();
- }
-
- private void setCollectMetrics(boolean collectMetrics) {
- this.collectMetrics.set(collectMetrics);
- }
-
- private boolean getIsParquetEvaluated() {
- return isParquetEvaluated.get();
- }
-
- private void setIsParquetEvaluated(boolean isParquetEvaluated) {
- this.isParquetEvaluated.set(isParquetEvaluated);
- }
-
- private boolean getIsLenUpdated() {
- return isLenUpdated.get();
- }
-
- private void setIsLenUpdated(boolean isLenUpdated) {
- this.isLenUpdated.set(isLenUpdated);
- }
-
- /**
- * Updates the metrics map with an entry for the specified file if it
doesn't already exist.
- *
- * @param filePathIdentifier The unique identifier for the file.
- */
- public void updateMap(String filePathIdentifier) {
- // If the file is not already in the metrics map, add it with a new
AbfsReadFooterMetrics object.
- metricsMap.computeIfAbsent(filePathIdentifier, key -> new
AbfsReadFooterMetrics());
- }
-
- /**
- * Checks and updates metrics for a specific file identified by
filePathIdentifier.
- * If the metrics do not exist for the file, they are initialized.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
- filePathIdentifier, key -> new AbfsReadFooterMetrics());
- if (readFooterMetrics.getReadCount() == 0
- || (readFooterMetrics.getReadCount() >= 1
- && readFooterMetrics.getCollectMetrics())) {
- updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+ /**
+ * Sets the offset difference between the first and second read.
+ *
+ * @param offsetDiff the value to set
+ */
+ private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff)
{
+ offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+ }
+
+ /**
+ * Returns the offset difference between the first and second read.
+ *
+ * @return the offset difference between the first and second read
+ */
+ private String getOffsetDiffBetweenFirstAndSecondRead() {
+ return offsetDiffBetweenFirstAndSecondRead;
+ }
+
+ /**
+ * Returns the file type.
+ *
+ * @return the file type
+ */
+ private FileType getFileType() {
+ return fileType;
+ }
}
- }
-
- /**
- * Updates metrics for a specific file identified by filePathIdentifier.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- private void updateMetrics(final String filePathIdentifier, final int len,
final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics =
metricsMap.get(filePathIdentifier);
-
- // Create a new AbfsReadFooterMetrics object if it doesn't exist in the
metricsMap.
- if (readFooterMetrics == null) {
- readFooterMetrics = new AbfsReadFooterMetrics();
- metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+ private final Map<String, FileTypeMetrics> fileTypeMetricsMap = new
HashMap<>();
Review Comment:
Taken!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]