anmolanmol1234 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1843606379
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,348 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.azurebfs.statistics.AbstractAbfsStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.COLON;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.SIZE_READ_BY_FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.READ_LEN_REQUESTED;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.READ_COUNT;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.FIRST_OFFSET_DIFF;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_GAUGE;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
+/**
+ * This class is responsible for tracking and updating metrics related to
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+ private static final String FOOTER_LENGTH = "20";
+ private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(PARQUET, NON_PARQUET);
+
+ /**
+ * Inner class to handle file type checks.
+ */
+ private static final class CheckFileType {
+ private final AtomicBoolean collectMetrics;
+ private final AtomicBoolean collectMetricsForNextRead;
+ private final AtomicBoolean collectLenMetrics;
+ private final AtomicLong readCount;
+ private final AtomicLong offsetOfFirstRead;
+ private FileType fileType = null;
+ private String sizeReadByFirstRead;
+ private String offsetDiffBetweenFirstAndSecondRead;
+
+ private CheckFileType() {
+ collectMetrics = new AtomicBoolean(false);
+ collectMetricsForNextRead = new AtomicBoolean(false);
+ collectLenMetrics = new AtomicBoolean(false);
+ readCount = new AtomicLong(0);
+ offsetOfFirstRead = new AtomicLong(0);
+ }
+
+ private void updateFileType() {
+ if (fileType == null) {
+ fileType = collectMetrics.get() && readCount.get() >= 2
+ && haveEqualValues(sizeReadByFirstRead)
+ &&
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+ }
+ }
+
+ private boolean haveEqualValues(String value) {
+ String[] parts = value.split("_");
+ return parts.length == 2
+ && parts[0].equals(parts[1]);
+ }
+
+ private void incrementReadCount() {
+ readCount.incrementAndGet();
+ }
+
+ private long getReadCount() {
+ return readCount.get();
+ }
+
+ private void setCollectMetrics(boolean collect) {
+ collectMetrics.set(collect);
+ }
+
+ private boolean getCollectMetrics() {
+ return collectMetrics.get();
+ }
+
+ private void setCollectMetricsForNextRead(boolean collect) {
+ collectMetricsForNextRead.set(collect);
+ }
+
+ private boolean getCollectMetricsForNextRead() {
+ return collectMetricsForNextRead.get();
+ }
+
+ private boolean getCollectLenMetrics() {
+ return collectLenMetrics.get();
+ }
+
+ private void setCollectLenMetrics(boolean collect) {
+ collectLenMetrics.set(collect);
+ }
+
+ private void setOffsetOfFirstRead(long offset) {
+ offsetOfFirstRead.set(offset);
+ }
+
+ private long getOffsetOfFirstRead() {
+ return offsetOfFirstRead.get();
+ }
+
+ private void setSizeReadByFirstRead(String size) {
+ sizeReadByFirstRead = size;
+ }
+
+ private String getSizeReadByFirstRead() {
+ return sizeReadByFirstRead;
+ }
+
+ private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff)
{
+ offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+ }
-public class AbfsReadFooterMetrics {
- private final AtomicBoolean isParquetFile;
- private final AtomicBoolean isParquetEvaluated;
- private final AtomicBoolean isLenUpdated;
- private String sizeReadByFirstRead;
- private String offsetDiffBetweenFirstAndSecondRead;
- private final AtomicLong fileLength;
- private double avgFileLength;
- private double avgReadLenRequested;
- private final AtomicBoolean collectMetrics;
- private final AtomicBoolean collectMetricsForNextRead;
- private final AtomicBoolean collectLenMetrics;
- private final AtomicLong dataLenRequested;
- private final AtomicLong offsetOfFirstRead;
- private final AtomicInteger readCount;
- private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics>
metricsMap;
- private static final String FOOTER_LENGTH = "20";
-
- public AbfsReadFooterMetrics() {
- this.isParquetFile = new AtomicBoolean(false);
- this.isParquetEvaluated = new AtomicBoolean(false);
- this.isLenUpdated = new AtomicBoolean(false);
- this.fileLength = new AtomicLong();
- this.readCount = new AtomicInteger(0);
- this.offsetOfFirstRead = new AtomicLong();
- this.collectMetrics = new AtomicBoolean(false);
- this.collectMetricsForNextRead = new AtomicBoolean(false);
- this.collectLenMetrics = new AtomicBoolean(false);
- this.dataLenRequested = new AtomicLong(0);
- this.metricsMap = new ConcurrentSkipListMap<>();
- }
-
- public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
- return metricsMap;
- }
-
- private boolean getIsParquetFile() {
- return isParquetFile.get();
- }
-
- public void setIsParquetFile(boolean isParquetFile) {
- this.isParquetFile.set(isParquetFile);
- }
-
- private String getSizeReadByFirstRead() {
- return sizeReadByFirstRead;
- }
-
- public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
- this.sizeReadByFirstRead = sizeReadByFirstRead;
- }
-
- private String getOffsetDiffBetweenFirstAndSecondRead() {
- return offsetDiffBetweenFirstAndSecondRead;
- }
-
- public void setOffsetDiffBetweenFirstAndSecondRead(final String
offsetDiffBetweenFirstAndSecondRead) {
- this.offsetDiffBetweenFirstAndSecondRead
- = offsetDiffBetweenFirstAndSecondRead;
- }
-
- private long getFileLength() {
- return fileLength.get();
- }
-
- private void setFileLength(long fileLength) {
- this.fileLength.set(fileLength);
- }
-
- private double getAvgFileLength() {
- return avgFileLength;
- }
-
- public void setAvgFileLength(final double avgFileLength) {
- this.avgFileLength = avgFileLength;
- }
-
- private double getAvgReadLenRequested() {
- return avgReadLenRequested;
- }
-
- public void setAvgReadLenRequested(final double avgReadLenRequested) {
- this.avgReadLenRequested = avgReadLenRequested;
- }
-
- private boolean getCollectMetricsForNextRead() {
- return collectMetricsForNextRead.get();
- }
-
- private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead)
{
- this.collectMetricsForNextRead.set(collectMetricsForNextRead);
- }
-
- private long getOffsetOfFirstRead() {
- return offsetOfFirstRead.get();
- }
-
- private void setOffsetOfFirstRead(long offsetOfFirstRead) {
- this.offsetOfFirstRead.set(offsetOfFirstRead);
- }
-
- private int getReadCount() {
- return readCount.get();
- }
-
- private void setReadCount(int readCount) {
- this.readCount.set(readCount);
- }
-
- private int incrementReadCount() {
- this.readCount.incrementAndGet();
- return getReadCount();
- }
-
- private boolean getCollectLenMetrics() {
- return collectLenMetrics.get();
- }
-
- private void setCollectLenMetrics(boolean collectLenMetrics) {
- this.collectLenMetrics.set(collectLenMetrics);
-
- }
-
- private long getDataLenRequested() {
- return dataLenRequested.get();
- }
-
- private void setDataLenRequested(long dataLenRequested) {
- this.dataLenRequested.set(dataLenRequested);
- }
-
- private void updateDataLenRequested(long dataLenRequested){
- this.dataLenRequested.addAndGet(dataLenRequested);
- }
-
- private boolean getCollectMetrics() {
- return collectMetrics.get();
- }
-
- private void setCollectMetrics(boolean collectMetrics) {
- this.collectMetrics.set(collectMetrics);
- }
-
- private boolean getIsParquetEvaluated() {
- return isParquetEvaluated.get();
- }
-
- private void setIsParquetEvaluated(boolean isParquetEvaluated) {
- this.isParquetEvaluated.set(isParquetEvaluated);
- }
-
- private boolean getIsLenUpdated() {
- return isLenUpdated.get();
- }
-
- private void setIsLenUpdated(boolean isLenUpdated) {
- this.isLenUpdated.set(isLenUpdated);
- }
-
- /**
- * Updates the metrics map with an entry for the specified file if it
doesn't already exist.
- *
- * @param filePathIdentifier The unique identifier for the file.
- */
- public void updateMap(String filePathIdentifier) {
- // If the file is not already in the metrics map, add it with a new
AbfsReadFooterMetrics object.
- metricsMap.computeIfAbsent(filePathIdentifier, key -> new
AbfsReadFooterMetrics());
- }
-
- /**
- * Checks and updates metrics for a specific file identified by
filePathIdentifier.
- * If the metrics do not exist for the file, they are initialized.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
- filePathIdentifier, key -> new AbfsReadFooterMetrics());
- if (readFooterMetrics.getReadCount() == 0
- || (readFooterMetrics.getReadCount() >= 1
- && readFooterMetrics.getCollectMetrics())) {
- updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+ private String getOffsetDiffBetweenFirstAndSecondRead() {
+ return offsetDiffBetweenFirstAndSecondRead;
+ }
+
+ private FileType getFileType() {
+ return fileType;
+ }
}
- }
-
- /**
- * Updates metrics for a specific file identified by filePathIdentifier.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- private void updateMetrics(final String filePathIdentifier, final int len,
final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics =
metricsMap.get(filePathIdentifier);
-
- // Create a new AbfsReadFooterMetrics object if it doesn't exist in the
metricsMap.
- if (readFooterMetrics == null) {
- readFooterMetrics = new AbfsReadFooterMetrics();
- metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+ private final Map<String, CheckFileType> checkFileMap = new HashMap<>();
+
+ /**
+ * Constructor to initialize the IOStatisticsStore with counters and
gauges.
+ */
+ public AbfsReadFooterMetrics() {
+ IOStatisticsStore ioStatisticsStore = iostatisticsStore()
+ .withCounters(getMetricNames(TYPE_COUNTER))
+ .withGauges(getMetricNames(TYPE_GAUGE))
+ .build();
+ setIOStatistics(ioStatisticsStore);
}
- int readCount;
- synchronized (this) {
- readCount = readFooterMetrics.incrementReadCount();
+ private String[] getMetricNames(StatisticTypeEnum type) {
+ return Arrays.stream(AbfsReadFooterMetricsEnum.values())
+ .filter(readFooterMetricsEnum ->
readFooterMetricsEnum.getStatisticType().equals(type))
+ .flatMap(readFooterMetricsEnum ->
+ FILE.equals(readFooterMetricsEnum.getType())
+ ? FILE_TYPE_LIST.stream().map(fileType ->
fileType + COLON + readFooterMetricsEnum.getName())
+ : Stream.of(readFooterMetricsEnum.getName()))
+ .toArray(String[]::new);
}
- if (readCount == 1) {
- // Update metrics for the first read.
- updateMetricsOnFirstRead(readFooterMetrics, nextReadPos, len,
contentLength);
+ private long getMetricValue(FileType fileType, AbfsReadFooterMetricsEnum
metric) {
+ switch (metric.getStatisticType()) {
+ case TYPE_COUNTER:
+ return lookupCounterValue(fileType + COLON + metric.getName());
+ case TYPE_GAUGE:
+ return lookupGaugeValue(fileType + COLON + metric.getName());
+ default:
+ return 0;
+ }
}
- synchronized (this) {
- if (readFooterMetrics.getCollectLenMetrics()) {
- readFooterMetrics.updateDataLenRequested(len);
- }
+ /**
+ * Updates the value of a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metric the metric to update
+ * @param value the new value of the metric
+ */
+ public void updateMetricValue(FileType fileType, AbfsReadFooterMetricsEnum
metric, long value) {
+ updateGaugeValue(fileType + COLON + metric.getName(), value);
}
- if (readCount == 2) {
- // Update metrics for the second read.
- updateMetricsOnSecondRead(readFooterMetrics, nextReadPos, len);
+ /**
+ * Increments the value of a specific metric.
+ *
+ * @param fileType the type of the file
+ * @param metricName the metric to increment
+ */
+ public void incrementMetricValue(FileType fileType,
AbfsReadFooterMetricsEnum metricName) {
+ incCounterValue(fileType + COLON + metricName.getName());
}
- }
-
- /**
- * Updates metrics for the first read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- */
- private void updateMetricsOnFirstRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len, long contentLength) {
- if (nextReadPos >= contentLength - (long) Integer.parseInt(FOOTER_LENGTH)
* ONE_KB) {
- readFooterMetrics.setCollectMetrics(true);
- readFooterMetrics.setCollectMetricsForNextRead(true);
- readFooterMetrics.setOffsetOfFirstRead(nextReadPos);
- readFooterMetrics.setSizeReadByFirstRead(len + "_" +
Math.abs(contentLength - nextReadPos));
- readFooterMetrics.setFileLength(contentLength);
+
+ /**
+ * Returns the total number of files.
+ *
+ * @return the total number of files
+ */
+ public Long getTotalFiles() {
+ return getMetricValue(PARQUET, TOTAL_FILES) +
getMetricValue(NON_PARQUET, TOTAL_FILES);
}
- }
-
- /**
- * Updates metrics for the second read operation.
- *
- * @param readFooterMetrics The metrics object to update.
- * @param nextReadPos The position of the next read operation.
- * @param len The length of the read operation.
- */
- private void updateMetricsOnSecondRead(AbfsReadFooterMetrics
readFooterMetrics, long nextReadPos, int len) {
- if (readFooterMetrics.getCollectMetricsForNextRead()) {
- long offsetDiff = Math.abs(nextReadPos -
readFooterMetrics.getOffsetOfFirstRead());
- readFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(len + "_" +
offsetDiff);
- readFooterMetrics.setCollectLenMetrics(true);
+
+ /**
+ * Returns the total read count.
+ *
+ * @return the total read count
+ */
+ public Long getTotalReadCount() {
+ return getMetricValue(PARQUET, READ_COUNT) +
getMetricValue(NON_PARQUET, READ_COUNT);
}
- }
-
-
- /**
- * Check if the given file should be marked as a Parquet file.
- *
- * @param metrics The metrics to evaluate.
- * @return True if the file meet the criteria for being marked as a Parquet
file, false otherwise.
- */
- private boolean shouldMarkAsParquet(AbfsReadFooterMetrics metrics) {
- return metrics.getCollectMetrics()
- && metrics.getReadCount() >= 2
- && !metrics.getIsParquetEvaluated()
- && haveEqualValues(metrics.getSizeReadByFirstRead())
- &&
haveEqualValues(metrics.getOffsetDiffBetweenFirstAndSecondRead());
- }
-
- /**
- * Check if two values are equal, considering they are in the format
"value1_value2".
- *
- * @param value The value to check.
- * @return True if the two parts of the value are equal, false otherwise.
- */
- private boolean haveEqualValues(String value) {
- String[] parts = value.split("_");
- return parts.length == 2 && parts[0].equals(parts[1]);
- }
-
- /**
- * Mark the given metrics as a Parquet file and update related values.
- *
- * @param metrics The metrics to mark as Parquet.
- */
- private void markAsParquet(AbfsReadFooterMetrics metrics) {
- metrics.setIsParquetFile(true);
- String[] parts = metrics.getSizeReadByFirstRead().split("_");
- metrics.setSizeReadByFirstRead(parts[0]);
- parts = metrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
- metrics.setOffsetDiffBetweenFirstAndSecondRead(parts[0]);
- metrics.setIsParquetEvaluated(true);
- }
-
- /**
- * Check each metric in the provided map and mark them as Parquet files if
they meet the criteria.
- *
- * @param metricsMap The map containing metrics to evaluate.
- */
- public void checkIsParquet(Map<String, AbfsReadFooterMetrics> metricsMap) {
- for (Map.Entry<String, AbfsReadFooterMetrics> entry :
metricsMap.entrySet()) {
- AbfsReadFooterMetrics readFooterMetrics = entry.getValue();
- if (shouldMarkAsParquet(readFooterMetrics)) {
- markAsParquet(readFooterMetrics);
- metricsMap.replace(entry.getKey(), readFooterMetrics);
- }
+
+ /**
+ * Updates the map with a new file path identifier.
+ *
+ * @param filePathIdentifier the file path identifier
+ */
+ public void updateMap(String filePathIdentifier) {
+ checkFileMap.computeIfAbsent(filePathIdentifier, key -> new
CheckFileType());
}
- }
-
- /**
- * Updates the average read length requested for metrics of all files in the
metrics map.
- * If the metrics indicate that the update is needed, it calculates the
average read length and updates the metrics.
- *
- * @param metricsMap A map containing metrics for different files with
unique identifiers.
- */
- private void updateLenRequested(Map<String, AbfsReadFooterMetrics>
metricsMap) {
- for (AbfsReadFooterMetrics readFooterMetrics : metricsMap.values()) {
- if (shouldUpdateLenRequested(readFooterMetrics)) {
- int readReqCount = readFooterMetrics.getReadCount() - 2;
- readFooterMetrics.setAvgReadLenRequested(
- (double) readFooterMetrics.getDataLenRequested() /
readReqCount);
- readFooterMetrics.setIsLenUpdated(true);
- }
+
+ /**
+ * Checks and updates the metrics for a given file read.
+ *
+ * @param filePathIdentifier the file path identifier
+ * @param len the length of the read
+ * @param contentLength the total content length of the file
+ * @param nextReadPos the position of the next read
+ */
+ public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength, final long nextReadPos) {
+ CheckFileType checkFileType =
checkFileMap.computeIfAbsent(filePathIdentifier, key -> new CheckFileType());
+ if (checkFileType.getReadCount() == 0 || (checkFileType.getReadCount()
>= 1 && checkFileType.getCollectMetrics())) {
+ updateMetrics(checkFileType, len, contentLength, nextReadPos);
+ }
}
- }
-
- /**
- * Checks whether the average read length requested should be updated for
the given metrics.
- *
- * The method returns true if the following conditions are met:
- * - Metrics collection is enabled.
- * - The number of read counts is greater than 2.
- * - The average read length has not been updated previously.
- *
- * @param readFooterMetrics The metrics object to evaluate.
- * @return True if the average read length should be updated, false
otherwise.
- */
- private boolean shouldUpdateLenRequested(AbfsReadFooterMetrics
readFooterMetrics) {
- return readFooterMetrics.getCollectMetrics()
- && readFooterMetrics.getReadCount() > 2
- && !readFooterMetrics.getIsLenUpdated();
- }
-
- /**
- * Calculates the average metrics from a list of AbfsReadFooterMetrics and
sets the values in the provided 'avgParquetReadFooterMetrics' object.
- *
- * @param isParquetList The list of AbfsReadFooterMetrics to compute the
averages from.
- * @param avgParquetReadFooterMetrics The target AbfsReadFooterMetrics
object to store the computed average values.
- *
- * This method calculates various average metrics from the provided list and
sets them in the 'avgParquetReadFooterMetrics' object.
- * The metrics include:
- * - Size read by the first read
- * - Offset difference between the first and second read
- * - Average file length
- * - Average requested read length
- */
- private void getParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isParquetList,
- AbfsReadFooterMetrics avgParquetReadFooterMetrics){
- avgParquetReadFooterMetrics.setSizeReadByFirstRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getSizeReadByFirstRead).mapToDouble(
- Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(
- String.format("%.3f", isParquetList.stream()
- .map(AbfsReadFooterMetrics::getOffsetDiffBetweenFirstAndSecondRead)
- .mapToDouble(Double::parseDouble).average().orElse(0.0)));
- avgParquetReadFooterMetrics.setAvgFileLength(isParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
- avgParquetReadFooterMetrics.setAvgReadLenRequested(isParquetList.stream().
- map(AbfsReadFooterMetrics::getAvgReadLenRequested).
- mapToDouble(Double::doubleValue).average().orElse(0.0));
- }
-
- /**
- * Calculates the average metrics from a list of non-Parquet
AbfsReadFooterMetrics instances.
- *
- * This method takes a list of AbfsReadFooterMetrics representing
non-Parquet reads and calculates
- * the average values for the size read by the first read and the offset
difference between the first
- * and second read. The averages are then set in the provided
AbfsReadFooterMetrics instance.
- *
- * @param isNonParquetList A list of AbfsReadFooterMetrics instances
representing non-Parquet reads.
- * @param avgNonParquetReadFooterMetrics The AbfsReadFooterMetrics instance
to store the calculated averages.
- * It is assumed that the size of the
list is at least 1, and the first
- * element of the list is used to
determine the size of arrays.
- * The instance is modified in-place
with the calculated averages.
- *
- *
- **/
- private void
getNonParquetReadFooterMetricsAverage(List<AbfsReadFooterMetrics>
isNonParquetList,
- AbfsReadFooterMetrics
avgNonParquetReadFooterMetrics) {
- int size =
isNonParquetList.get(0).getSizeReadByFirstRead().split("_").length;
- double[] store = new double[2 * size];
- // Calculating sum of individual values
- isNonParquetList.forEach(abfsReadFooterMetrics -> {
- String[] firstReadSize =
abfsReadFooterMetrics.getSizeReadByFirstRead().split("_");
- String[] offDiffFirstSecondRead =
abfsReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead().split("_");
-
- for (int i = 0; i < firstReadSize.length; i++) {
- store[i] += Long.parseLong(firstReadSize[i]);
- store[i + size] += Long.parseLong(offDiffFirstSecondRead[i]);
- }
- });
-
- // Calculating averages and creating formatted strings
- StringJoiner firstReadSize = new StringJoiner("_");
- StringJoiner offDiffFirstSecondRead = new StringJoiner("_");
-
- for (int j = 0; j < size; j++) {
- firstReadSize.add(String.format("%.3f", store[j] /
isNonParquetList.size()));
- offDiffFirstSecondRead.add(String.format("%.3f", store[j + size] /
isNonParquetList.size()));
+
+ /**
+ * Updates metrics for a specific file identified by filePathIdentifier.
+ *
+ * @param checkFileType File metadata to know file type.
+ * @param len The length of the read operation.
+ * @param contentLength The total content length of the file.
+ * @param nextReadPos The position of the next read operation.
+ */
+ private void updateMetrics(CheckFileType checkFileType, int len, long
contentLength, long nextReadPos) {
+ synchronized (this) {
+ checkFileType.incrementReadCount();
+ }
+
+ long readCount = checkFileType.getReadCount();
+
+ if (readCount == 1) {
+ handleFirstRead(checkFileType, nextReadPos, len, contentLength);
+ } else if (readCount == 2) {
+ handleSecondRead(checkFileType, nextReadPos, len, contentLength);
+ } else {
+ handleFurtherRead(checkFileType, len);
+ }
}
-
avgNonParquetReadFooterMetrics.setSizeReadByFirstRead(firstReadSize.toString());
-
avgNonParquetReadFooterMetrics.setOffsetDiffBetweenFirstAndSecondRead(offDiffFirstSecondRead.toString());
- avgNonParquetReadFooterMetrics.setAvgFileLength(isNonParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getFileLength).average().orElse(0.0));
-
avgNonParquetReadFooterMetrics.setAvgReadLenRequested(isNonParquetList.stream()
-
.mapToDouble(AbfsReadFooterMetrics::getAvgReadLenRequested).average().orElse(0.0));
- }
-
- /*
- Acronyms:
- 1.FR :- First Read (In case of parquet we only maintain the size requested
by application for
- the first read, in case of non parquet we maintain a string separated by "_"
delimiter where the first
- substring represents the len requested for first read and the second
substring represents the seek pointer difference from the
- end of the file.)
- 2.SR :- Second Read (In case of parquet we only maintain the size requested
by application for
- the second read, in case of non parquet we maintain a string separated by
"_" delimiter where the first
- substring represents the len requested for second read and the second
substring represents the seek pointer difference from the
- offset of the first read.)
- 3.FL :- Total length of the file requested for read
- */
- public String getReadFooterMetrics(AbfsReadFooterMetrics
avgReadFooterMetrics) {
- String readFooterMetric = "";
- if (avgReadFooterMetrics.getIsParquetFile()) {
- readFooterMetric += "$Parquet:";
- } else {
- readFooterMetric += "$NonParquet:";
+ private void handleFirstRead(CheckFileType checkFileType, long
nextReadPos, int len, long contentLength) {
+ if (nextReadPos >= contentLength - (long)
Integer.parseInt(FOOTER_LENGTH) * ONE_KB) {
+ checkFileType.setCollectMetrics(true);
+ checkFileType.setCollectMetricsForNextRead(true);
+ checkFileType.setOffsetOfFirstRead(nextReadPos);
+ checkFileType.setSizeReadByFirstRead(len + "_" +
Math.abs(contentLength - nextReadPos));
+ }
}
- readFooterMetric += "$FR=" + avgReadFooterMetrics.getSizeReadByFirstRead()
- + "$SR="
- + avgReadFooterMetrics.getOffsetDiffBetweenFirstAndSecondRead()
- + "$FL=" + String.format("%.3f",
- avgReadFooterMetrics.getAvgFileLength())
- + "$RL=" + String.format("%.3f",
- avgReadFooterMetrics.getAvgReadLenRequested());
- return readFooterMetric;
- }
-/**
- * Retrieves and aggregates read footer metrics for both Parquet and
non-Parquet files from a list
- * of AbfsReadFooterMetrics instances. The function calculates the average
metrics separately for
- * Parquet and non-Parquet files and returns a formatted string containing the
aggregated metrics.
- *
- * @param readFooterMetricsList A list of AbfsReadFooterMetrics instances
containing read footer metrics
- * for both Parquet and non-Parquet files.
- *
- * @return A formatted string containing the aggregated read footer metrics
for both Parquet and non-Parquet files.
- *
- **/
-private String getFooterMetrics(List<AbfsReadFooterMetrics>
readFooterMetricsList) {
- List<AbfsReadFooterMetrics> isParquetList = new ArrayList<>();
- List<AbfsReadFooterMetrics> isNonParquetList = new ArrayList<>();
- for (AbfsReadFooterMetrics abfsReadFooterMetrics : readFooterMetricsList) {
- if (abfsReadFooterMetrics.getIsParquetFile()) {
- isParquetList.add(abfsReadFooterMetrics);
- } else {
- if (abfsReadFooterMetrics.getReadCount() >= 2) {
- isNonParquetList.add(abfsReadFooterMetrics);
- }
+ private void handleSecondRead(CheckFileType checkFileType, long
nextReadPos, int len, long contentLength) {
+ if (checkFileType.getCollectMetricsForNextRead()) {
+ long offsetDiff = Math.abs(nextReadPos -
checkFileType.getOffsetOfFirstRead());
+ checkFileType.setOffsetDiffBetweenFirstAndSecondRead(len + "_" +
offsetDiff);
+ checkFileType.setCollectLenMetrics(true);
+ checkFileType.updateFileType();
+ updateMetricsData(checkFileType, len, contentLength);
+ }
}
- }
- AbfsReadFooterMetrics avgParquetReadFooterMetrics = new
AbfsReadFooterMetrics();
- AbfsReadFooterMetrics avgNonparquetReadFooterMetrics = new
AbfsReadFooterMetrics();
- String readFooterMetric = "";
- if (!isParquetList.isEmpty()) {
- avgParquetReadFooterMetrics.setIsParquetFile(true);
- getParquetReadFooterMetricsAverage(isParquetList,
avgParquetReadFooterMetrics);
- readFooterMetric += getReadFooterMetrics(avgParquetReadFooterMetrics);
- }
- if (!isNonParquetList.isEmpty()) {
- avgNonparquetReadFooterMetrics.setIsParquetFile(false);
- getNonParquetReadFooterMetricsAverage(isNonParquetList,
avgNonparquetReadFooterMetrics);
- readFooterMetric += getReadFooterMetrics(avgNonparquetReadFooterMetrics);
- }
- return readFooterMetric;
-}
+ private synchronized void handleFurtherRead(CheckFileType checkFileType,
int len) {
+ if (checkFileType.getCollectLenMetrics() &&
checkFileType.getFileType() != null) {
+ FileType fileType = checkFileType.getFileType();
+ updateMetricValue(fileType, READ_LEN_REQUESTED, len);
+ incrementMetricValue(fileType, READ_COUNT);
+ }
+ }
+
+ private synchronized void updateMetricsData(CheckFileType checkFileType,
int len, long contentLength) {
+ long sizeReadByFirstRead =
Long.parseLong(checkFileType.getSizeReadByFirstRead().split("_")[0]);
+ long firstOffsetDiff =
Long.parseLong(checkFileType.getSizeReadByFirstRead().split("_")[1]);
+ long secondOffsetDiff =
Long.parseLong(checkFileType.getOffsetDiffBetweenFirstAndSecondRead().split("_")[1]);
+ FileType fileType = checkFileType.getFileType();
+
+ updateMetricValue(fileType, READ_LEN_REQUESTED, len +
sizeReadByFirstRead);
+ updateMetricValue(fileType, FILE_LENGTH, contentLength);
+ updateMetricValue(fileType, SIZE_READ_BY_FIRST_READ,
sizeReadByFirstRead);
+ updateMetricValue(fileType, OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ,
len);
+ updateMetricValue(fileType, FIRST_OFFSET_DIFF, firstOffsetDiff);
+ updateMetricValue(fileType, SECOND_OFFSET_DIFF, secondOffsetDiff);
+ incrementMetricValue(fileType, TOTAL_FILES);
+ }
- @Override
- public String toString() {
- Map<String, AbfsReadFooterMetrics> metricsMap = getMetricsMap();
- List<AbfsReadFooterMetrics> readFooterMetricsList = new ArrayList<>();
- if (metricsMap != null && !(metricsMap.isEmpty())) {
- checkIsParquet(metricsMap);
- updateLenRequested(metricsMap);
- for (Map.Entry<String, AbfsReadFooterMetrics> entry :
metricsMap.entrySet()) {
- AbfsReadFooterMetrics abfsReadFooterMetrics = entry.getValue();
- if (abfsReadFooterMetrics.getCollectMetrics()) {
- readFooterMetricsList.add(entry.getValue());
+ /**
+ * Returns the read footer metrics for a given file type.
+ *
+ * @param fileType the type of the file
+ * @return the read footer metrics as a string
+ */
+ public String getReadFooterMetrics(FileType fileType) {
+ StringBuilder readFooterMetric = new StringBuilder();
+ appendMetrics(readFooterMetric, fileType);
+ return readFooterMetric.toString();
+ }
+
+ private void appendMetrics(StringBuilder metricBuilder, FileType fileType)
{
+ long totalFiles = getMetricValue(fileType, TOTAL_FILES);
Review Comment:
Use stringbuilder
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]