bhattmanish98 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1904257729
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,494 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+ private static final String FOOTER_LENGTH = "20";
+ private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(PARQUET, NON_PARQUET);
+
+ /**
+ * Inner class to handle file type checks.
+ */
+ private static final class FileTypeMetrics {
+ private final AtomicBoolean collectMetrics;
+ private final AtomicBoolean collectMetricsForNextRead;
+ private final AtomicBoolean collectLenMetrics;
+ private final AtomicLong readCount;
+ private final AtomicLong offsetOfFirstRead;
+ private FileType fileType = null;
+ private String sizeReadByFirstRead;
+ private String offsetDiffBetweenFirstAndSecondRead;
+
+ /**
+ * Constructor to initialize the file type metrics.
+ */
+ private FileTypeMetrics() {
+ collectMetrics = new AtomicBoolean(false);
+ collectMetricsForNextRead = new AtomicBoolean(false);
+ collectLenMetrics = new AtomicBoolean(false);
+ readCount = new AtomicLong(0);
+ offsetOfFirstRead = new AtomicLong(0);
+ }
+
+ /**
+ * Updates the file type based on the metrics collected.
+ */
+ private void updateFileType() {
+ if (fileType == null) {
+ fileType = collectMetrics.get() && readCount.get() >= 2
+ && haveEqualValues(sizeReadByFirstRead)
+ &&
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+ }
+ }
+
+ /**
+ * Checks if the given value has equal parts.
+ *
+ * @param value the value to check
+ * @return true if the value has equal parts, false otherwise
+ */
+ private boolean haveEqualValues(String value) {
+ String[] parts = value.split("_");
+ return parts.length == 2
+ && parts[0].equals(parts[1]);
+ }
+
+ /**
+ * Increments the read count.
+ */
+ private void incrementReadCount() {
+ readCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the read count.
+ *
+ * @return the read count
+ */
+ private long getReadCount() {
+ return readCount.get();
+ }
+
+ /**
+ * Sets the collect metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetrics(boolean collect) {
+ collectMetrics.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics flag.
+ *
+ * @return the collect metrics flag
+ */
+ private boolean getCollectMetrics() {
+ return collectMetrics.get();
+ }
+
+ /**
+ * Sets the collect metrics for the next read flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetricsForNextRead(boolean collect) {
+ collectMetricsForNextRead.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics for the next read flag.
+ *
+ * @return the collect metrics for the next read flag
+ */
+ private boolean getCollectMetricsForNextRead() {
+ return collectMetricsForNextRead.get();
+ }
+
+ /**
+ * Returns the collect length metrics flag.
+ *
+ * @return the collect length metrics flag
+ */
+ private boolean getCollectLenMetrics() {
+ return collectLenMetrics.get();
+ }
+
+ /**
+ * Sets the collect length metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectLenMetrics(boolean collect) {
+ collectLenMetrics.set(collect);
+ }
+
+ /**
+ * Sets the offset of the first read.
+ *
+ * @param offset the value to set
+ */
+ private void setOffsetOfFirstRead(long offset) {
+ offsetOfFirstRead.set(offset);
+ }
+
+ /**
+ * Returns the offset of the first read.
+ *
+ * @return the offset of the first read
+ */
+ private long getOffsetOfFirstRead() {
+ return offsetOfFirstRead.get();
+ }
+
+ /**
+ * Sets the size read by the first read.
+ *
+ * @param size the value to set
+ */
+ private void setSizeReadByFirstRead(String size) {
+ sizeReadByFirstRead = size;
+ }
+
+ /**
+ * Returns the size read by the first read.
+ *
+ * @return the size read by the first read
+ */
+ private String getSizeReadByFirstRead() {
+ return sizeReadByFirstRead;
+ }
-public class AbfsReadFooterMetrics {
- private final AtomicBoolean isParquetFile;
- private final AtomicBoolean isParquetEvaluated;
- private final AtomicBoolean isLenUpdated;
- private String sizeReadByFirstRead;
- private String offsetDiffBetweenFirstAndSecondRead;
- private final AtomicLong fileLength;
- private double avgFileLength;
- private double avgReadLenRequested;
- private final AtomicBoolean collectMetrics;
- private final AtomicBoolean collectMetricsForNextRead;
- private final AtomicBoolean collectLenMetrics;
- private final AtomicLong dataLenRequested;
- private final AtomicLong offsetOfFirstRead;
- private final AtomicInteger readCount;
- private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics>
metricsMap;
- private static final String FOOTER_LENGTH = "20";
-
- public AbfsReadFooterMetrics() {
- this.isParquetFile = new AtomicBoolean(false);
- this.isParquetEvaluated = new AtomicBoolean(false);
- this.isLenUpdated = new AtomicBoolean(false);
- this.fileLength = new AtomicLong();
- this.readCount = new AtomicInteger(0);
- this.offsetOfFirstRead = new AtomicLong();
- this.collectMetrics = new AtomicBoolean(false);
- this.collectMetricsForNextRead = new AtomicBoolean(false);
- this.collectLenMetrics = new AtomicBoolean(false);
- this.dataLenRequested = new AtomicLong(0);
- this.metricsMap = new ConcurrentSkipListMap<>();
- }
-
- public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
- return metricsMap;
- }
-
- private boolean getIsParquetFile() {
- return isParquetFile.get();
- }
-
- public void setIsParquetFile(boolean isParquetFile) {
- this.isParquetFile.set(isParquetFile);
- }
-
- private String getSizeReadByFirstRead() {
- return sizeReadByFirstRead;
- }
-
- public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
- this.sizeReadByFirstRead = sizeReadByFirstRead;
- }
-
- private String getOffsetDiffBetweenFirstAndSecondRead() {
- return offsetDiffBetweenFirstAndSecondRead;
- }
-
- public void setOffsetDiffBetweenFirstAndSecondRead(final String
offsetDiffBetweenFirstAndSecondRead) {
- this.offsetDiffBetweenFirstAndSecondRead
- = offsetDiffBetweenFirstAndSecondRead;
- }
-
- private long getFileLength() {
- return fileLength.get();
- }
-
- private void setFileLength(long fileLength) {
- this.fileLength.set(fileLength);
- }
-
- private double getAvgFileLength() {
- return avgFileLength;
- }
-
- public void setAvgFileLength(final double avgFileLength) {
- this.avgFileLength = avgFileLength;
- }
-
- private double getAvgReadLenRequested() {
- return avgReadLenRequested;
- }
-
- public void setAvgReadLenRequested(final double avgReadLenRequested) {
- this.avgReadLenRequested = avgReadLenRequested;
- }
-
- private boolean getCollectMetricsForNextRead() {
- return collectMetricsForNextRead.get();
- }
-
- private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead)
{
- this.collectMetricsForNextRead.set(collectMetricsForNextRead);
- }
-
- private long getOffsetOfFirstRead() {
- return offsetOfFirstRead.get();
- }
-
- private void setOffsetOfFirstRead(long offsetOfFirstRead) {
- this.offsetOfFirstRead.set(offsetOfFirstRead);
- }
-
- private int getReadCount() {
- return readCount.get();
- }
-
- private void setReadCount(int readCount) {
- this.readCount.set(readCount);
- }
-
- private int incrementReadCount() {
- this.readCount.incrementAndGet();
- return getReadCount();
- }
-
- private boolean getCollectLenMetrics() {
- return collectLenMetrics.get();
- }
-
- private void setCollectLenMetrics(boolean collectLenMetrics) {
- this.collectLenMetrics.set(collectLenMetrics);
-
- }
-
- private long getDataLenRequested() {
- return dataLenRequested.get();
- }
-
- private void setDataLenRequested(long dataLenRequested) {
- this.dataLenRequested.set(dataLenRequested);
- }
-
- private void updateDataLenRequested(long dataLenRequested){
- this.dataLenRequested.addAndGet(dataLenRequested);
- }
-
- private boolean getCollectMetrics() {
- return collectMetrics.get();
- }
-
- private void setCollectMetrics(boolean collectMetrics) {
- this.collectMetrics.set(collectMetrics);
- }
-
- private boolean getIsParquetEvaluated() {
- return isParquetEvaluated.get();
- }
-
- private void setIsParquetEvaluated(boolean isParquetEvaluated) {
- this.isParquetEvaluated.set(isParquetEvaluated);
- }
-
- private boolean getIsLenUpdated() {
- return isLenUpdated.get();
- }
-
- private void setIsLenUpdated(boolean isLenUpdated) {
- this.isLenUpdated.set(isLenUpdated);
- }
-
- /**
- * Updates the metrics map with an entry for the specified file if it
doesn't already exist.
- *
- * @param filePathIdentifier The unique identifier for the file.
- */
- public void updateMap(String filePathIdentifier) {
- // If the file is not already in the metrics map, add it with a new
AbfsReadFooterMetrics object.
- metricsMap.computeIfAbsent(filePathIdentifier, key -> new
AbfsReadFooterMetrics());
- }
-
- /**
- * Checks and updates metrics for a specific file identified by
filePathIdentifier.
- * If the metrics do not exist for the file, they are initialized.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
- filePathIdentifier, key -> new AbfsReadFooterMetrics());
- if (readFooterMetrics.getReadCount() == 0
- || (readFooterMetrics.getReadCount() >= 1
- && readFooterMetrics.getCollectMetrics())) {
- updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+ /**
+ * Sets the offset difference between the first and second read.
+ *
+ * @param offsetDiff the value to set
+ */
+ private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff)
{
+ offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+ }
+
+ /**
+ * Returns the offset difference between the first and second read.
+ *
+ * @return the offset difference between the first and second read
+ */
+ private String getOffsetDiffBetweenFirstAndSecondRead() {
+ return offsetDiffBetweenFirstAndSecondRead;
+ }
+
+ /**
+ * Returns the file type.
+ *
+ * @return the file type
+ */
+ private FileType getFileType() {
+ return fileType;
+ }
}
- }
-
- /**
- * Updates metrics for a specific file identified by filePathIdentifier.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- private void updateMetrics(final String filePathIdentifier, final int len,
final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics =
metricsMap.get(filePathIdentifier);
-
- // Create a new AbfsReadFooterMetrics object if it doesn't exist in the
metricsMap.
- if (readFooterMetrics == null) {
- readFooterMetrics = new AbfsReadFooterMetrics();
- metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+ private final Map<String, FileTypeMetrics> fileTypeMetricsMap = new
HashMap<>();
+
+ /**
+ * Constructor to initialize the IOStatisticsStore with counters and mean
statistics.
+ */
+ public AbfsReadFooterMetrics() {
+ IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+ .withCounters(getMetricNames(TYPE_COUNTER))
+ .withMeanStatistics(getMetricNames(TYPE_MEAN))
+ .build();
+ setIOStatistics(ioStatisticsStore);
}
- int readCount;
- synchronized (this) {
- readCount = readFooterMetrics.incrementReadCount();
+ /**
+ * Returns the metric names for a specific statistic type.
+ *
+ * @param type the statistic type
+ * @return the metric names
+ */
+ private String[] getMetricNames(StatisticTypeEnum type) {
+ return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+ .filter(readFooterMetricsEnum ->
readFooterMetricsEnum.getStatisticType().equals(type))
+ .flatMap(readFooterMetricsEnum ->
+ FILE.equals(readFooterMetricsEnum.getType())
+ ? FILE_TYPE_LIST.stream().map(fileType ->
fileType + COLON + readFooterMetricsEnum.getName())
+ : Stream.of(readFooterMetricsEnum.getName()))
+ .toArray(String[]::new);
}
- if (readCount == 1) {
- // Update metrics for the first read.
- updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len,
contentLength);
+ /**
+ * Looks up the counter value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metric the metric to look up
+ * @return the counter value
+ */
+ private long getCounterMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metric) {
+ return lookupCounterValue(fileType + COLON + metric.getName());
}
- synchronized (this) {
- if (readFooterMetrics.getCollectLenMetrics()) {
- readFooterMetrics.updateDataLenRequested(len);
- }
+ /**
+ * Looks up the mean statistic value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metric the metric to look up
+ * @return the mean statistic value
+ */
+ private String getMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metric) {
+ return format(DOUBLE_PRECISION_FORMAT, lookupMeanStatistic(fileType +
COLON + metric.getName()));
}
- if (readCount == 2) {
- // Update metrics for the second read.
- updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+ /**
+ * Increments the value of a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metricName the metric to increment
+ */
+ public void incrementMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metricName) {
+ incCounterValue(fileType + COLON + metricName.getName());
}
- }
-
- /**
- * Updates metrics for the first read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- */
- private void updateMetricsOnFirstRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len, long contentLength) {
- if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH)
* ONE_KB) {
- readFooterMetrics.setCollectMetrics(true);
- readFooterMetrics.setCollectMetricsForNextRead(true);
- readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
- readFooterMetrics.setSizeReadByFirstRead(len + "_" +
Math.abs(contentLength - nextReadPos));
- readFooterMetrics.setFileLength(contentLength);
+
+ /**
+ * Adds a mean statistic value for a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metricName the metric to update
+ * @param value the new value of the metric
+ */
+ public void addMeanMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metricName, long value) {
+ addMeanStatistic(fileType + COLON + metricName.getName(), value);
}
- }
-
- /**
- * Updates metrics for the second read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- */
- private void updateMetricsOnSecondRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len) {
- if (readFooterMetrics.getCollectMetricsForNextRead()) {
- long offsetDiff = Math.abs(nextReadPos -
readFooterMetrics.getOffsetOfFirstRead());
- readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" +
offsetDiff);
- readFooterMetrics.setCollectLenMetrics(true);
+
+ /**
+ * Returns the total number of files.
+ *
+ * @return the total number of files
+ */
+ public Long getTotalFiles() {
+ return getCounterMetricValue(PARQUET, TOTAL_FILES) +
getCounterMetricValue(NON_PARQUET, TOTAL_FILES);
}
- }
-
-
- /**
- * Check if the given file should be marked as a Parquet file.
- *
- * @param metrics The metrics to evaluate.
- * @return True if the file meet the criteria for being marked as a Parquet
file, false otherwise.
- */
- private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
- return metrics.getCollectMetrics()
- && metrics.getReadCount() >= 2
- && !metrics.getIsParquetEvaluated()
- && haveEqualValues(metrics.getSizeReadByFirstRead())
- &&
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
- }
-
- /**
- * Check if two values are equal, considering they are in the format
"value1_value2".
- *
- * @param value The value to check.
- * @return True if the two parts of the value are equal, false otherwise.
- */
- private boolean haveEqualValues(String value) {
- String[] parts = value.split("_");
- return parts.length == 2 && parts[0].equals(parts[1]);
- }
-
- /**
- * Mark the given metrics as a Parquet file and update related values.
- *
- * @param metrics The metrics to mark as Parquet.
- */
- private void markAsParquet(AbfsReadFooterMetrics metrics) {
- metrics.setIsParquetFile(true);
- String[] parts = metrics.getSizeReadByFirstRead().split("_");
- metrics.setSizeReadByFirstRead(parts[0]);
- parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
- metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
- metrics.setIsParquetEvaluated(true);
- }
-
- /**
- * Check each metric in the provided map and mark them as Parquet files if
they meet the criteria.
- *
- * @param metricsMap The map containing metrics to evaluate.
- */
- public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
- for (Map.Entry<String, AbfsReadFooterMetrics> entry :
metricsMap.entrySet()) {
- AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
- if (shouldMarkAsParquet(readFooterMetrics)) {
- markAsParquet(readFooterMetrics);
- metricsMap.replace(entry.getKey(), readFooterMetrics);
- }
+
+ /**
+ * Updates the map with a new file path identifier.
+ *
+ * @param filePathIdentifier the file path identifier
+ */
+ public void updateMap(String filePathIdentifier) {
+ fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new
FileTypeMetrics());
}
- }
-
- /**
- * Updates the average read length requested for metrics of all files in the
metrics map.
- * If the metrics indicate that the update is needed, it calculates the
average read length and updates the metrics.
- *
- * @param metricsMap A map containing metrics for different files with
unique identifiers.
- */
- private void updateLenRequested(Map<String, AbfsReadFooterMetrics>
metricsMap) {
- for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
- if (shouldUpdateLenRequested(readFooterMetrics)) {
- int readReqCount = readFooterMetrics.getReadCount() - 2;
- readFooterMetrics.setAvgReadLenRequested(
- (double) readFooterMetrics.getDataLenRequested() /
readReqCount);
- readFooterMetrics.setIsLenUpdated(true);
- }
+
+ /**
+ * Checks and updates the metrics for a given file read.
+ *
+ * @param filePathIdentifier the file path identifier
+ * @param len the length of the read
+ * @param contentLength the total content length of the file
+ * @param nextReadPos the position of the next read
+ */
+ public void updateReadMetrics(final String filePathIdentifier,
+ final int len,
+ final long contentLength,
+ final long nextReadPos) {
+ FileTypeMetrics fileTypeMetrics =
fileTypeMetricsMap.computeIfAbsent(filePathIdentifier, key -> new
FileTypeMetrics());
+ if (fileTypeMetrics.getReadCount() == 0 ||
(fileTypeMetrics.getReadCount() >= 1 && fileTypeMetrics.getCollectMetrics())) {
+ updateMetrics(fileTypeMetrics, len, contentLength, nextReadPos);
+ }
}
- }
-
- /**
- * Checks whether the average read length requested should be updated for
the given metrics.
- *
- * The method returns true if the following conditions are met:
- * - Metrics collection is enabled.
- * - The number of read counts is greater than 2.
- * - The average read length has not been updated previously.
- *
- * @param readFooterMetrics The metrics object to evaluate.
- * @return True if the average read length should be updated, false
otherwise.
- */
- private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics
readFooterMetrics) {
- return readFooterMetrics.getCollectMetrics()
- && readFooterMetrics.getReadCount() > 2
- && !readFooterMetrics.getIsLenUpdated();
- }
-
- /**
- * Calculates the average metrics from a list of AbfsReadFooterMetrics and
sets the values in the provided 'avgParquetReadFooterMetrics' object.
- *
- * @param isParquetList The list of AbfsReadFooterMetrics to compute the
averages from.
- * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics
object to store the computed average values.
- *
- * This method calculates various average metrics from the provided list and
sets them in the 'avgParquetReadFooterMetrics' object.
- * The metrics include:
- * - Size read by the first read
- * - Offset difference between the first and second read
- * - Average file length
- * - Average requested read length
- */
- private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isParquetList,
- AbfsReadFooterMetrics avgParquetReadFooterMetrics){
- avgParquetReadFooterMetrics.setSizeReadByFirstRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
- Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
- .mapToDouble(Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
- avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
- map(AbfsReadFooterMetrics::getAvgReadLenRequested).
- mapToDouble(Double::doubleValue).average().orElse(0.0));
- }
-
- /**
- * Calculates the average metrics from a list of non-Parquet
AbfsReadFooterMetrics instances.
- *
- * This method takes a list of AbfsReadFooterMetrics representing
non-Parquet reads and calculates
- * the average values for the size read by the first read and the offset
difference between the first
- * and second read. The averages are then set in the provided
AbfsReadFooterMetrics instance.
- *
- * @param isNonParquetList A list of AbfsReadFooterMetrics instances
representing non-Parquet reads.
- * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance
to store the calculated averages.
- * It is assumed that the size of the
list is at least 1, and the first
- * element of the list is used to
determine the size of arrays.
- * The instance is modified in-place
with the calculated averages.
- *
- *
- **/
- private void
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isNonParquetList,
- AbfsReadFooterMetrics
avgNonParquetReadFooterMetrics) {
- int size =
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
- double[] store = new double[2 * size];
- // Calculating sum of individual values
- isNonParquetList.forEach(abfsReadFooterMetrics -> {
- String[] firstReadSize =
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
- String[] offDiffFirstSecondRead =
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-
- for (int i = 0; i < firstReadSize.length; i++) {
- store[i] += Long.parseLong(firstReadSize[i]);
- store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
- }
- });
-
- // Calculating averages and creating formatted strings
- StringJoiner firstReadSize = new StringJoiner("_");
- StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
-
- for (int j = 0; j < size; j++) {
- firstReadSize.add(String.format("%.3f", store[j] /
isNonParquetList.size()));
- offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] /
isNonParquetList.size()));
+
+ /**
+ * Updates metrics for a specific file identified by filePathIdentifier.
+ *
+ * @param fileTypeMetrics File metadata to know file type.
+ * @param len The length of the read operation.
+ * @param contentLength The total content length of the file.
+ * @param nextReadPos The position of the next read operation.
+ */
+ private void updateMetrics(FileTypeMetrics fileTypeMetrics,
+ int len,
+ long contentLength,
+ long nextReadPos) {
+ synchronized (this) {
Review Comment:
Make sense, removed synchronized block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]