[
https://issues.apache.org/jira/browse/HADOOP-19311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910264#comment-17910264
]
ASF GitHub Bot commented on HADOOP-19311:
-----------------------------------------
bhattmanish98 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1904265337
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,494 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+ private static final String FOOTER_LENGTH = "20";
+ private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(PARQUET, NON_PARQUET);
+
+ /**
+ * Inner class to handle file type checks.
+ */
+ private static final class FileTypeMetrics {
+ private final AtomicBoolean collectMetrics;
+ private final AtomicBoolean collectMetricsForNextRead;
+ private final AtomicBoolean collectLenMetrics;
+ private final AtomicLong readCount;
+ private final AtomicLong offsetOfFirstRead;
+ private FileType fileType = null;
+ private String sizeReadByFirstRead;
+ private String offsetDiffBetweenFirstAndSecondRead;
+
+ /**
+ * Constructor to initialize the file type metrics.
+ */
+ private FileTypeMetrics() {
+ collectMetrics = new AtomicBoolean(false);
+ collectMetricsForNextRead = new AtomicBoolean(false);
+ collectLenMetrics = new AtomicBoolean(false);
+ readCount = new AtomicLong(0);
+ offsetOfFirstRead = new AtomicLong(0);
+ }
+
+ /**
+ * Updates the file type based on the metrics collected.
+ */
+ private void updateFileType() {
+ if (fileType == null) {
+ fileType = collectMetrics.get() && readCount.get() >= 2
+ && haveEqualValues(sizeReadByFirstRead)
+ &&
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+ }
+ }
+
+ /**
+ * Checks if the given value has equal parts.
+ *
+ * @param value the value to check
+ * @return true if the value has equal parts, false otherwise
+ */
+ private boolean haveEqualValues(String value) {
+ String[] parts = value.split("_");
+ return parts.length == 2
+ && parts[0].equals(parts[1]);
+ }
+
+ /**
+ * Increments the read count.
+ */
+ private void incrementReadCount() {
+ readCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the read count.
+ *
+ * @return the read count
+ */
+ private long getReadCount() {
+ return readCount.get();
+ }
+
+ /**
+ * Sets the collect metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetrics(boolean collect) {
+ collectMetrics.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics flag.
+ *
+ * @return the collect metrics flag
+ */
+ private boolean getCollectMetrics() {
+ return collectMetrics.get();
+ }
+
+ /**
+ * Sets the collect metrics for the next read flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetricsForNextRead(boolean collect) {
+ collectMetricsForNextRead.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics for the next read flag.
+ *
+ * @return the collect metrics for the next read flag
+ */
+ private boolean getCollectMetricsForNextRead() {
+ return collectMetricsForNextRead.get();
+ }
+
+ /**
+ * Returns the collect length metrics flag.
+ *
+ * @return the collect length metrics flag
+ */
+ private boolean getCollectLenMetrics() {
+ return collectLenMetrics.get();
+ }
+
+ /**
+ * Sets the collect length metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectLenMetrics(boolean collect) {
+ collectLenMetrics.set(collect);
+ }
+
+ /**
+ * Sets the offset of the first read.
+ *
+ * @param offset the value to set
+ */
+ private void setOffsetOfFirstRead(long offset) {
+ offsetOfFirstRead.set(offset);
+ }
+
+ /**
+ * Returns the offset of the first read.
+ *
+ * @return the offset of the first read
+ */
+ private long getOffsetOfFirstRead() {
+ return offsetOfFirstRead.get();
+ }
+
+ /**
+ * Sets the size read by the first read.
+ *
+ * @param size the value to set
+ */
+ private void setSizeReadByFirstRead(String size) {
+ sizeReadByFirstRead = size;
+ }
+
+ /**
+ * Returns the size read by the first read.
+ *
+ * @return the size read by the first read
+ */
+ private String getSizeReadByFirstRead() {
+ return sizeReadByFirstRead;
+ }
-public class AbfsReadFooterMetrics {
- private final AtomicBoolean isParquetFile;
- private final AtomicBoolean isParquetEvaluated;
- private final AtomicBoolean isLenUpdated;
- private String sizeReadByFirstRead;
- private String offsetDiffBetweenFirstAndSecondRead;
- private final AtomicLong fileLength;
- private double avgFileLength;
- private double avgReadLenRequested;
- private final AtomicBoolean collectMetrics;
- private final AtomicBoolean collectMetricsForNextRead;
- private final AtomicBoolean collectLenMetrics;
- private final AtomicLong dataLenRequested;
- private final AtomicLong offsetOfFirstRead;
- private final AtomicInteger readCount;
- private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics>
metricsMap;
- private static final String FOOTER_LENGTH = "20";
-
- public AbfsReadFooterMetrics() {
- this.isParquetFile = new AtomicBoolean(false);
- this.isParquetEvaluated = new AtomicBoolean(false);
- this.isLenUpdated = new AtomicBoolean(false);
- this.fileLength = new AtomicLong();
- this.readCount = new AtomicInteger(0);
- this.offsetOfFirstRead = new AtomicLong();
- this.collectMetrics = new AtomicBoolean(false);
- this.collectMetricsForNextRead = new AtomicBoolean(false);
- this.collectLenMetrics = new AtomicBoolean(false);
- this.dataLenRequested = new AtomicLong(0);
- this.metricsMap = new ConcurrentSkipListMap<>();
- }
-
- public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
- return metricsMap;
- }
-
- private boolean getIsParquetFile() {
- return isParquetFile.get();
- }
-
- public void setIsParquetFile(boolean isParquetFile) {
- this.isParquetFile.set(isParquetFile);
- }
-
- private String getSizeReadByFirstRead() {
- return sizeReadByFirstRead;
- }
-
- public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
- this.sizeReadByFirstRead = sizeReadByFirstRead;
- }
-
- private String getOffsetDiffBetweenFirstAndSecondRead() {
- return offsetDiffBetweenFirstAndSecondRead;
- }
-
- public void setOffsetDiffBetweenFirstAndSecondRead(final String
offsetDiffBetweenFirstAndSecondRead) {
- this.offsetDiffBetweenFirstAndSecondRead
- = offsetDiffBetweenFirstAndSecondRead;
- }
-
- private long getFileLength() {
- return fileLength.get();
- }
-
- private void setFileLength(long fileLength) {
- this.fileLength.set(fileLength);
- }
-
- private double getAvgFileLength() {
- return avgFileLength;
- }
-
- public void setAvgFileLength(final double avgFileLength) {
- this.avgFileLength = avgFileLength;
- }
-
- private double getAvgReadLenRequested() {
- return avgReadLenRequested;
- }
-
- public void setAvgReadLenRequested(final double avgReadLenRequested) {
- this.avgReadLenRequested = avgReadLenRequested;
- }
-
- private boolean getCollectMetricsForNextRead() {
- return collectMetricsForNextRead.get();
- }
-
- private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead)
{
- this.collectMetricsForNextRead.set(collectMetricsForNextRead);
- }
-
- private long getOffsetOfFirstRead() {
- return offsetOfFirstRead.get();
- }
-
- private void setOffsetOfFirstRead(long offsetOfFirstRead) {
- this.offsetOfFirstRead.set(offsetOfFirstRead);
- }
-
- private int getReadCount() {
- return readCount.get();
- }
-
- private void setReadCount(int readCount) {
- this.readCount.set(readCount);
- }
-
- private int incrementReadCount() {
- this.readCount.incrementAndGet();
- return getReadCount();
- }
-
- private boolean getCollectLenMetrics() {
- return collectLenMetrics.get();
- }
-
- private void setCollectLenMetrics(boolean collectLenMetrics) {
- this.collectLenMetrics.set(collectLenMetrics);
-
- }
-
- private long getDataLenRequested() {
- return dataLenRequested.get();
- }
-
- private void setDataLenRequested(long dataLenRequested) {
- this.dataLenRequested.set(dataLenRequested);
- }
-
- private void updateDataLenRequested(long dataLenRequested){
- this.dataLenRequested.addAndGet(dataLenRequested);
- }
-
- private boolean getCollectMetrics() {
- return collectMetrics.get();
- }
-
- private void setCollectMetrics(boolean collectMetrics) {
- this.collectMetrics.set(collectMetrics);
- }
-
- private boolean getIsParquetEvaluated() {
- return isParquetEvaluated.get();
- }
-
- private void setIsParquetEvaluated(boolean isParquetEvaluated) {
- this.isParquetEvaluated.set(isParquetEvaluated);
- }
-
- private boolean getIsLenUpdated() {
- return isLenUpdated.get();
- }
-
- private void setIsLenUpdated(boolean isLenUpdated) {
- this.isLenUpdated.set(isLenUpdated);
- }
-
- /**
- * Updates the metrics map with an entry for the specified file if it
doesn't already exist.
- *
- * @param filePathIdentifier The unique identifier for the file.
- */
- public void updateMap(String filePathIdentifier) {
- // If the file is not already in the metrics map, add it with a new
AbfsReadFooterMetrics object.
- metricsMap.computeIfAbsent(filePathIdentifier, key -> new
AbfsReadFooterMetrics());
- }
-
- /**
- * Checks and updates metrics for a specific file identified by
filePathIdentifier.
- * If the metrics do not exist for the file, they are initialized.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
- filePathIdentifier, key -> new AbfsReadFooterMetrics());
- if (readFooterMetrics.getReadCount() == 0
- || (readFooterMetrics.getReadCount() >= 1
- && readFooterMetrics.getCollectMetrics())) {
- updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+ /**
+ * Sets the offset difference between the first and second read.
+ *
+ * @param offsetDiff the value to set
+ */
+ private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff)
{
+ offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+ }
+
+ /**
+ * Returns the offset difference between the first and second read.
+ *
+ * @return the offset difference between the first and second read
+ */
+ private String getOffsetDiffBetweenFirstAndSecondRead() {
+ return offsetDiffBetweenFirstAndSecondRead;
+ }
+
+ /**
+ * Returns the file type.
+ *
+ * @return the file type
+ */
+ private FileType getFileType() {
+ return fileType;
+ }
}
- }
-
- /**
- * Updates metrics for a specific file identified by filePathIdentifier.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- private void updateMetrics(final String filePathIdentifier, final int len,
final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics =
metricsMap.get(filePathIdentifier);
-
- // Create a new AbfsReadFooterMetrics object if it doesn't exist in the
metricsMap.
- if (readFooterMetrics == null) {
- readFooterMetrics = new AbfsReadFooterMetrics();
- metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+ private final Map<String, FileTypeMetrics> fileTypeMetricsMap = new
HashMap<>();
+
+ /**
+ * Constructor to initialize the IOStatisticsStore with counters and mean
statistics.
+ */
+ public AbfsReadFooterMetrics() {
+ IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+ .withCounters(getMetricNames(TYPE_COUNTER))
+ .withMeanStatistics(getMetricNames(TYPE_MEAN))
+ .build();
+ setIOStatistics(ioStatisticsStore);
}
- int readCount;
- synchronized (this) {
- readCount = readFooterMetrics.incrementReadCount();
+ /**
+ * Returns the metric names for a specific statistic type.
+ *
+ * @param type the statistic type
+ * @return the metric names
+ */
+ private String[] getMetricNames(StatisticTypeEnum type) {
+ return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+ .filter(readFooterMetricsEnum ->
readFooterMetricsEnum.getStatisticType().equals(type))
+ .flatMap(readFooterMetricsEnum ->
+ FILE.equals(readFooterMetricsEnum.getType())
+ ? FILE_TYPE_LIST.stream().map(fileType ->
fileType + COLON + readFooterMetricsEnum.getName())
+ : Stream.of(readFooterMetricsEnum.getName()))
+ .toArray(String[]::new);
}
- if (readCount == 1) {
- // Update metrics for the first read.
- updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len,
contentLength);
+ /**
+ * Looks up the counter value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metric the metric to look up
+ * @return the counter value
+ */
+ private long getCounterMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metric) {
+ return lookupCounterValue(fileType + COLON + metric.getName());
}
- synchronized (this) {
- if (readFooterMetrics.getCollectLenMetrics()) {
- readFooterMetrics.updateDataLenRequested(len);
- }
+ /**
+ * Looks up the mean statistic value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metric the metric to look up
+ * @return the mean statistic value
+ */
+ private String getMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metric) {
+ return format(DOUBLE_PRECISION_FORMAT, lookupMeanStatistic(fileType +
COLON + metric.getName()));
}
- if (readCount == 2) {
- // Update metrics for the second read.
- updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+ /**
+ * Increments the value of a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metricName the metric to increment
+ */
+ public void incrementMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metricName) {
+ incCounterValue(fileType + COLON + metricName.getName());
}
- }
-
- /**
- * Updates metrics for the first read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- */
- private void updateMetricsOnFirstRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len, long contentLength) {
- if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH)
* ONE_KB) {
- readFooterMetrics.setCollectMetrics(true);
- readFooterMetrics.setCollectMetricsForNextRead(true);
- readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
- readFooterMetrics.setSizeReadByFirstRead(len + "_" +
Math.abs(contentLength - nextReadPos));
- readFooterMetrics.setFileLength(contentLength);
+
+ /**
+ * Adds a mean statistic value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metricName the metric to update
+ * @param value the new value of the metric
+ */
+ public void addMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metricName, long value) {
+ addMeanStatistic(fileType + COLON + metricName.getName(), value);
}
- }
-
- /**
- * Updates metrics for the second read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- */
- private void updateMetricsOnSecondRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len) {
- if (readFooterMetrics.getCollectMetricsForNextRead()) {
- long offsetDiff = Math.abs(nextReadPos -
readFooterMetrics.getOffsetOfFirstRead());
- readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" +
offsetDiff);
- readFooterMetrics.setCollectLenMetrics(true);
+
+ /**
+ * Returns the total number of files.
+ *
+ * @return the total number of files
+ */
+ public Long getTotalFiles() {
+ return getCounterMetricValue(PARQUET, TOTAL_FILES) +
getCounterMetricValue(NON_PARQUET, TOTAL_FILES);
}
- }
-
-
- /**
- * Check if the given file should be marked as a Parquet file.
- *
- * @param metrics The metrics to evaluate.
- * @return True if the file meet the criteria for being marked as a Parquet
file, false otherwise.
- */
- private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
- return metrics.getCollectMetrics()
- && metrics.getReadCount() >= 2
- && !metrics.getIsParquetEvaluated()
- && haveEqualValues(metrics.getSizeReadByFirstRead())
- &&
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
- }
-
- /**
- * Check if two values are equal, considering they are in the format
"value1_value2".
- *
- * @param value The value to check.
- * @return True if the two parts of the value are equal, false otherwise.
- */
- private boolean haveEqualValues(String value) {
- String[] parts = value.split("_");
- return parts.length == 2 && parts[0].equals(parts[1]);
- }
-
- /**
- * Mark the given metrics as a Parquet file and update related values.
- *
- * @param metrics The metrics to mark as Parquet.
- */
- private void markAsParquet(AbfsReadFooterMetrics metrics) {
- metrics.setIsParquetFile(true);
- String[] parts = metrics.getSizeReadByFirstRead().split("_");
- metrics.setSizeReadByFirstRead(parts[0]);
- parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
- metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
- metrics.setIsParquetEvaluated(true);
- }
-
- /**
- * Check each metric in the provided map and mark them as Parquet files if
they meet the criteria.
- *
- * @param metricsMap The map containing metrics to evaluate.
- */
- public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
- for (Map.Entry<String, AbfsReadFooterMetrics> entry :
metricsMap.entrySet()) {
- AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
- if (shouldMarkAsParquet(readFooterMetrics)) {
- markAsParquet(readFooterMetrics);
- metricsMap.replace(entry.getKey(), readFooterMetrics);
- }
+
+ /**
+ * Updates the map with a new file path identifier.
+ *
+ * @param filePathIdentifier the file path identifier
+ */
+ public void updateMap(String filePathIdentifier) {
+ fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new
FileTypeMetrics());
}
- }
-
- /**
- * Updates the average read length requested for metrics of all files in the
metrics map.
- * If the metrics indicate that the update is needed, it calculates the
average read length and updates the metrics.
- *
- * @param metricsMap A map containing metrics for different files with
unique identifiers.
- */
- private void updateLenRequested(Map<String, AbfsReadFooterMetrics>
metricsMap) {
- for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
- if (shouldUpdateLenRequested(readFooterMetrics)) {
- int readReqCount = readFooterMetrics.getReadCount() - 2;
- readFooterMetrics.setAvgReadLenRequested(
- (double) readFooterMetrics.getDataLenRequested() /
readReqCount);
- readFooterMetrics.setIsLenUpdated(true);
- }
+
+ /**
+ * Checks and updates the metrics for a given file read.
+ *
+ * @param filePathIdentifier the file path identifier
+ * @param len the length of the read
+ * @param contentLength the total content length of the file
+ * @param nextReadPos the position of the next read
+ */
+ public void updateReadMetrics(final String filePathIdentifier,
+ final int len,
+ final long contentLength,
+ final long nextReadPos) {
+ FileTypeMetrics fileTypeMetrics =
fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new
FileTypeMetrics());
+ if (fileTypeMetrics.getReadCount() == 0 ||
(fileTypeMetrics.getReadCount() >= 1 && fileTypeMetrics.getCollectMetrics())) {
+ updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
+ }
}
- }
-
- /**
- * Checks whether the average read length requested should be updated for
the given metrics.
- *
- * The method returns true if the following conditions are met:
- * - Metrics collection is enabled.
- * - The number of read counts is greater than 2.
- * - The average read length has not been updated previously.
- *
- * @param readFooterMetrics The metrics object to evaluate.
- * @return True if the average read length should be updated, false
otherwise.
- */
- private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics
readFooterMetrics) {
- return readFooterMetrics.getCollectMetrics()
- && readFooterMetrics.getReadCount() > 2
- && !readFooterMetrics.getIsLenUpdated();
- }
-
- /**
- * Calculates the average metrics from a list of AbfsReadFooterMetrics and
sets the values in the provided 'avgParquetReadFooterMetrics' object.
- *
- * @param isParquetList The list of AbfsReadFooterMetrics to compute the
averages from.
- * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics
object to store the computed average values.
- *
- * This method calculates various average metrics from the provided list and
sets them in the 'avgParquetReadFooterMetrics' object.
- * The metrics include:
- * - Size read by the first read
- * - Offset difference between the first and second read
- * - Average file length
- * - Average requested read length
- */
- private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isParquetList,
- AbfsReadFooterMetrics avgParquetReadFooterMetrics){
- avgParquetReadFooterMetrics.setSizeReadByFirstRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
- Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
- .mapToDouble(Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
- avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
- map(AbfsReadFooterMetrics::getAvgReadLenRequested).
- mapToDouble(Double::doubleValue).average().orElse(0.0));
- }
-
- /**
- * Calculates the average metrics from a list of non-Parquet
AbfsReadFooterMetrics instances.
- *
- * This method takes a list of AbfsReadFooterMetrics representing
non-Parquet reads and calculates
- * the average values for the size read by the first read and the offset
difference between the first
- * and second read. The averages are then set in the provided
AbfsReadFooterMetrics instance.
- *
- * @param isNonParquetList A list of AbfsReadFooterMetrics instances
representing non-Parquet reads.
- * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance
to store the calculated averages.
- * It is assumed that the size of the
list is at least 1, and the first
- * element of the list is used to
determine the size of arrays.
- * The instance is modified in-place
with the calculated averages.
- *
- *
- **/
- private void
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isNonParquetList,
- AbfsReadFooterMetrics
avgNonParquetReadFooterMetrics) {
- int size =
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
- double[] store = new double[2 * size];
- // Calculating sum of individual values
- isNonParquetList.forEach(abfsReadFooterMetrics -> {
- String[] firstReadSize =
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
- String[] offDiffFirstSecondRead =
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-
- for (int i = 0; i < firstReadSize.length; i++) {
- store[i] += Long.parseLong(firstReadSize[i]);
- store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
- }
- });
-
- // Calculating averages and creating formatted strings
- StringJoiner firstReadSize = new StringJoiner("_");
- StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
-
- for (int j = 0; j < size; j++) {
- firstReadSize.add(String.format("%.3f", store[j] /
isNonParquetList.size()));
- offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] /
isNonParquetList.size()));
+
+ /**
+ * Updates metrics for a specific file identified by filePathIdentifier.
+ *
+ * @param fileTypeMetrics File metadata to know file type.
+ * @param len The length of the read operation.
+ * @param contentLength The total content length of the file.
+ * @param nextReadPos The position of the next read operation.
+ */
+ private void updateMetrics(FileTypeMetrics fileTypeMetrics,
+ int len,
+ long contentLength,
+ long nextReadPos) {
+ synchronized (this) {
+ fileTypeMetrics.incrementReadCount();
+ }
+
+ long readCount = fileTypeMetrics.getReadCount();
+
+ if (readCount == 1) {
+ handleFirstRead(fileTypeMetrics, nextReadPos, len, contentLength);
+ } else if (readCount == 2) {
+ handleSecondRead(fileTypeMetrics, nextReadPos, len, contentLength);
+ } else {
+ handleFurtherRead(fileTypeMetrics, len);
+ }
}
-
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
-
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
- avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
- }
-
- /*
- Acronyms:
- 1.FR :- First Read (In case of parquet we only maintain the size requested
by application for
- the first read, in case of non parquet we maintain a string separated by "_"
delimiter where the first
- substring represents the len requested for first read and the second
substring represents the seek pointer difference from the
- end of the file.)
- 2.SR :- Second Read (In case of parquet we only maintain the size requested
by application for
- the second read, in case of non parquet we maintain a string separated by
"_" delimiter where the first
- substring represents the len requested for second read and the second
substring represents the seek pointer difference from the
- offset of the first read.)
- 3.FL :- Total length of the file requested for read
- */
- public String getReadFooterMetrics(AbfsReadFooterMetrics
avgReadFooterMetrics) {
- String readFooterMetric = "";
- if (avgReadFooterMetrics.getIsParquetFile()) {
- readFooterMetric += "$Parquet:";
- } else {
- readFooterMetric += "$NonParquet:";
+ /**
+ * Handles the first read operation by checking if the current read
position is near the end of the file.
+ * If it is, updates the {@link FileTypeMetrics} object to enable metrics
collection and records the first read's
+ * offset and size.
+ *
+ * @param fileTypeMetrics The {@link FileTypeMetrics} object to update
with metrics and read details.
+ * @param nextReadPos The position where the next read will start.
+ * @param len The length of the current read operation.
+ * @param contentLength The total length of the file content.
+ */
+ private void handleFirstRead(FileTypeMetrics fileTypeMetrics,
+ long nextReadPos,
+ int len,
+ long contentLength) {
+ if (nextReadPos >= contentLength - (long)
Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
+ fileTypeMetrics.setCollectMetrics(true);
+ fileTypeMetrics.setCollectMetricsForNextRead(true);
+ fileTypeMetrics.setOffsetOfFirstRead(nextReadPos);
+ fileTypeMetrics.setSizeReadByFirstRead(len + "_" +
Math.abs(contentLength - nextReadPos));
+ }
}
- readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
- + "$SR="
- + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
- + "$FL=" + String.format("%.3f",
- avgReadFooterMetrics.getAvgFileLength())
- + "$RL=" + String.format("%.3f",
- avgReadFooterMetrics.getAvgReadLenRequested());
- return readFooterMetric;
- }
-/**
- * Retrieves and aggregates read footer metrics for both Parquet and
non-Parquet files from a list
- * of AbfsReadFooterMetrics instances. The function calculates the average
metrics separately for
- * Parquet and non-Parquet files and returns a formatted string containing the
aggregated metrics.
- *
- * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances
containing read footer metrics
- * for both Parquet and non-Parquet files.
- *
- * @return A formatted string containing the aggregated read footer metrics
for both Parquet and non-Parquet files.
- *
- **/
-private String getFooterMetrics(List<AbfsReadFooterMetrics>
readFooterMetricsList) {
- List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
- List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
- for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
- if (abfsReadFooterMetrics.getIsParquetFile()) {
- isParquetList.add(abfsReadFooterMetrics);
- } else {
- if (abfsReadFooterMetrics.getReadCount() >= 2) {
- isNonParquetList.add(abfsReadFooterMetrics);
- }
+ /**
+ * Handles the second read operation by checking if metrics collection is
enabled for the next read.
+ * If it is, calculates the offset difference between the first and second
reads, updates the {@link FileTypeMetrics}
+ * object with this information, and sets the file type. Then, updates the
metrics data.
+ *
+ * @param fileTypeMetrics The {@link FileTypeMetrics} object to update
with metrics and read details.
+ * @param nextReadPos The position where the next read will start.
+ * @param len The length of the current read operation.
+ * @param contentLength The total length of the file content.
+ */
+ private void handleSecondRead(FileTypeMetrics fileTypeMetrics,
+ long nextReadPos,
+ int len,
+ long contentLength) {
+ if (fileTypeMetrics.getCollectMetricsForNextRead()) {
+ long offsetDiff = Math.abs(nextReadPos -
fileTypeMetrics.getOffsetOfFirstRead());
+ fileTypeMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" +
offsetDiff);
+ fileTypeMetrics.setCollectLenMetrics(true);
+ fileTypeMetrics.updateFileType();
+ updateMetricsData(fileTypeMetrics, len, contentLength);
+ }
}
- }
- AbfsReadFooterMetrics avgParquetReadFooterMetrics = new
AbfsReadFooterMetrics();
- AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new
AbfsReadFooterMetrics();
- String readFooterMetric = "";
- if (!isParquetList.isEmpty()) {
- avgParquetReadFooterMetrics.setIsParquetFile(true);
- getParquetReadFooterMetricsAverage(isParquetList,
avgParquetReadFooterMetrics);
- readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
- }
- if (!isNonParquetList.isEmpty()) {
- avgNonparquetReadFooterMetrics.setIsParquetFile(false);
- getNonParquetReadFooterMetricsAverage(isNonParquetList,
avgNonparquetReadFooterMetrics);
- readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
- }
- return readFooterMetric;
-}
+ /**
+ * Handles further read operations beyond the second read. If metrics
collection is enabled and the file type is set,
+ * updates the read length requested and increments the read count for the
specific file type.
+ *
+ * @param fileTypeMetrics The {@link FileTypeMetrics} object containing
metrics and read details.
+ * @param len The length of the current read operation.
+ */
Review Comment:
Yes, we need synchronization only to update Mean Statistic, and as rightly
mentioned it is already Concurrent so we don't need this. Reverted this.
> [ABFS] Implement Backoff and Read Footer metrics using IOStatistics Class
> -------------------------------------------------------------------------
>
> Key: HADOOP-19311
> URL: https://issues.apache.org/jira/browse/HADOOP-19311
> Project: Hadoop Common
> Issue Type: Task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> Current Flow: We have implemented metrics collection in ABFS flow. We have
> created a custom AbfsBackoffMetrics and AbfsReadFooterMetrics class which
> stores all the metrics on the file system level. Our objective is to move
> away from the custom class implementation and use IOStatisticsStore to store
> the metrics which is present in hadoop-common.
> Changes Made: This PR contains the changes related to storing metrics related
> to above mentioned classes in IOStatisticStore which is present in
> hadoop-common. AbstractAbfsStatisticsSource abstract class is created which
> is implementing IOStatisticsSource interface. This will store IOStatistics of
> the child metrics class.
> Both AbfsBackoffMetrics and AbfsReadFooterMetrics is inheriting
> AbstractAbfsStatisticsSource and store the respective metrics in
> IOStatisticsStore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]