[
https://issues.apache.org/jira/browse/HADOOP-19311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910263#comment-17910263
]
ASF GitHub Bot commented on HADOOP-19311:
-----------------------------------------
bhattmanish98 commented on code in PR #7122:
URL: https://github.com/apache/hadoop/pull/7122#discussion_r1904258316
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadFooterMetrics.java:
##########
@@ -17,533 +17,494 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.HashMap;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.enums.FileType;
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.DOUBLE_PRECISION_FORMAT;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.READ_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.TOTAL_FILES;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FILE_LENGTH;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SIZE_READ_BY_FIRST_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_OFFSET_DIFF_BETWEEN_FIRST_AND_SECOND_READ;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_READ_LEN_REQUESTED;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_FIRST_OFFSET_DIFF;
+import static
org.apache.hadoop.fs.azurebfs.enums.AbfsReadFooterMetricsEnum.AVG_SECOND_OFFSET_DIFF;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.PARQUET;
+import static org.apache.hadoop.fs.azurebfs.enums.FileType.NON_PARQUET;
+import static
org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_COUNTER;
+import static org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum.TYPE_MEAN;
+import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+import static org.apache.hadoop.util.StringUtils.format;
+
+/**
+ * This class is responsible for tracking and updating metrics related to
reading footers in files.
+ */
+public class AbfsReadFooterMetrics extends AbstractAbfsStatisticsSource {
+ private static final String FOOTER_LENGTH = "20";
+ private static final List<FileType> FILE_TYPE_LIST =
Arrays.asList(PARQUET, NON_PARQUET);
+
+ /**
+ * Inner class to handle file type checks.
+ */
+ private static final class FileTypeMetrics {
+ private final AtomicBoolean collectMetrics;
+ private final AtomicBoolean collectMetricsForNextRead;
+ private final AtomicBoolean collectLenMetrics;
+ private final AtomicLong readCount;
+ private final AtomicLong offsetOfFirstRead;
+ private FileType fileType = null;
+ private String sizeReadByFirstRead;
+ private String offsetDiffBetweenFirstAndSecondRead;
+
+ /**
+ * Constructor to initialize the file type metrics.
+ */
+ private FileTypeMetrics() {
+ collectMetrics = new AtomicBoolean(false);
+ collectMetricsForNextRead = new AtomicBoolean(false);
+ collectLenMetrics = new AtomicBoolean(false);
+ readCount = new AtomicLong(0);
+ offsetOfFirstRead = new AtomicLong(0);
+ }
+
+ /**
+ * Updates the file type based on the metrics collected.
+ */
+ private void updateFileType() {
+ if (fileType == null) {
+ fileType = collectMetrics.get() && readCount.get() >= 2
+ && haveEqualValues(sizeReadByFirstRead)
+ &&
haveEqualValues(offsetDiffBetweenFirstAndSecondRead) ? PARQUET : NON_PARQUET;
+ }
+ }
+
+ /**
+ * Checks if the given value has equal parts.
+ *
+ * @param value the value to check
+ * @return true if the value has equal parts, false otherwise
+ */
+ private boolean haveEqualValues(String value) {
+ String[] parts = value.split("_");
+ return parts.length == 2
+ && parts[0].equals(parts[1]);
+ }
+
+ /**
+ * Increments the read count.
+ */
+ private void incrementReadCount() {
+ readCount.incrementAndGet();
+ }
+
+ /**
+ * Returns the read count.
+ *
+ * @return the read count
+ */
+ private long getReadCount() {
+ return readCount.get();
+ }
+
+ /**
+ * Sets the collect metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetrics(boolean collect) {
+ collectMetrics.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics flag.
+ *
+ * @return the collect metrics flag
+ */
+ private boolean getCollectMetrics() {
+ return collectMetrics.get();
+ }
+
+ /**
+ * Sets the collect metrics for the next read flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectMetricsForNextRead(boolean collect) {
+ collectMetricsForNextRead.set(collect);
+ }
+
+ /**
+ * Returns the collect metrics for the next read flag.
+ *
+ * @return the collect metrics for the next read flag
+ */
+ private boolean getCollectMetricsForNextRead() {
+ return collectMetricsForNextRead.get();
+ }
+
+ /**
+ * Returns the collect length metrics flag.
+ *
+ * @return the collect length metrics flag
+ */
+ private boolean getCollectLenMetrics() {
+ return collectLenMetrics.get();
+ }
+
+ /**
+ * Sets the collect length metrics flag.
+ *
+ * @param collect the value to set
+ */
+ private void setCollectLenMetrics(boolean collect) {
+ collectLenMetrics.set(collect);
+ }
+
+ /**
+ * Sets the offset of the first read.
+ *
+ * @param offset the value to set
+ */
+ private void setOffsetOfFirstRead(long offset) {
+ offsetOfFirstRead.set(offset);
+ }
+
+ /**
+ * Returns the offset of the first read.
+ *
+ * @return the offset of the first read
+ */
+ private long getOffsetOfFirstRead() {
+ return offsetOfFirstRead.get();
+ }
+
+ /**
+ * Sets the size read by the first read.
+ *
+ * @param size the value to set
+ */
+ private void setSizeReadByFirstRead(String size) {
+ sizeReadByFirstRead = size;
+ }
+
+ /**
+ * Returns the size read by the first read.
+ *
+ * @return the size read by the first read
+ */
+ private String getSizeReadByFirstRead() {
+ return sizeReadByFirstRead;
+ }
-public class AbfsReadFooterMetrics {
- private final AtomicBoolean isParquetFile;
- private final AtomicBoolean isParquetEvaluated;
- private final AtomicBoolean isLenUpdated;
- private String sizeReadByFirstRead;
- private String offsetDiffBetweenFirstAndSecondRead;
- private final AtomicLong fileLength;
- private double avgFileLength;
- private double avgReadLenRequested;
- private final AtomicBoolean collectMetrics;
- private final AtomicBoolean collectMetricsForNextRead;
- private final AtomicBoolean collectLenMetrics;
- private final AtomicLong dataLenRequested;
- private final AtomicLong offsetOfFirstRead;
- private final AtomicInteger readCount;
- private final ConcurrentSkipListMap<String, AbfsReadFooterMetrics>
metricsMap;
- private static final String FOOTER_LENGTH = "20";
-
- public AbfsReadFooterMetrics() {
- this.isParquetFile = new AtomicBoolean(false);
- this.isParquetEvaluated = new AtomicBoolean(false);
- this.isLenUpdated = new AtomicBoolean(false);
- this.fileLength = new AtomicLong();
- this.readCount = new AtomicInteger(0);
- this.offsetOfFirstRead = new AtomicLong();
- this.collectMetrics = new AtomicBoolean(false);
- this.collectMetricsForNextRead = new AtomicBoolean(false);
- this.collectLenMetrics = new AtomicBoolean(false);
- this.dataLenRequested = new AtomicLong(0);
- this.metricsMap = new ConcurrentSkipListMap<>();
- }
-
- public Map<String, AbfsReadFooterMetrics> getMetricsMap() {
- return metricsMap;
- }
-
- private boolean getIsParquetFile() {
- return isParquetFile.get();
- }
-
- public void setIsParquetFile(boolean isParquetFile) {
- this.isParquetFile.set(isParquetFile);
- }
-
- private String getSizeReadByFirstRead() {
- return sizeReadByFirstRead;
- }
-
- public void setSizeReadByFirstRead(final String sizeReadByFirstRead) {
- this.sizeReadByFirstRead = sizeReadByFirstRead;
- }
-
- private String getOffsetDiffBetweenFirstAndSecondRead() {
- return offsetDiffBetweenFirstAndSecondRead;
- }
-
- public void setOffsetDiffBetweenFirstAndSecondRead(final String
offsetDiffBetweenFirstAndSecondRead) {
- this.offsetDiffBetweenFirstAndSecondRead
- = offsetDiffBetweenFirstAndSecondRead;
- }
-
- private long getFileLength() {
- return fileLength.get();
- }
-
- private void setFileLength(long fileLength) {
- this.fileLength.set(fileLength);
- }
-
- private double getAvgFileLength() {
- return avgFileLength;
- }
-
- public void setAvgFileLength(final double avgFileLength) {
- this.avgFileLength = avgFileLength;
- }
-
- private double getAvgReadLenRequested() {
- return avgReadLenRequested;
- }
-
- public void setAvgReadLenRequested(final double avgReadLenRequested) {
- this.avgReadLenRequested = avgReadLenRequested;
- }
-
- private boolean getCollectMetricsForNextRead() {
- return collectMetricsForNextRead.get();
- }
-
- private void setCollectMetricsForNextRead(boolean collectMetricsForNextRead)
{
- this.collectMetricsForNextRead.set(collectMetricsForNextRead);
- }
-
- private long getOffsetOfFirstRead() {
- return offsetOfFirstRead.get();
- }
-
- private void setOffsetOfFirstRead(long offsetOfFirstRead) {
- this.offsetOfFirstRead.set(offsetOfFirstRead);
- }
-
- private int getReadCount() {
- return readCount.get();
- }
-
- private void setReadCount(int readCount) {
- this.readCount.set(readCount);
- }
-
- private int incrementReadCount() {
- this.readCount.incrementAndGet();
- return getReadCount();
- }
-
- private boolean getCollectLenMetrics() {
- return collectLenMetrics.get();
- }
-
- private void setCollectLenMetrics(boolean collectLenMetrics) {
- this.collectLenMetrics.set(collectLenMetrics);
-
- }
-
- private long getDataLenRequested() {
- return dataLenRequested.get();
- }
-
- private void setDataLenRequested(long dataLenRequested) {
- this.dataLenRequested.set(dataLenRequested);
- }
-
- private void updateDataLenRequested(long dataLenRequested){
- this.dataLenRequested.addAndGet(dataLenRequested);
- }
-
- private boolean getCollectMetrics() {
- return collectMetrics.get();
- }
-
- private void setCollectMetrics(boolean collectMetrics) {
- this.collectMetrics.set(collectMetrics);
- }
-
- private boolean getIsParquetEvaluated() {
- return isParquetEvaluated.get();
- }
-
- private void setIsParquetEvaluated(boolean isParquetEvaluated) {
- this.isParquetEvaluated.set(isParquetEvaluated);
- }
-
- private boolean getIsLenUpdated() {
- return isLenUpdated.get();
- }
-
- private void setIsLenUpdated(boolean isLenUpdated) {
- this.isLenUpdated.set(isLenUpdated);
- }
-
- /**
- * Updates the metrics map with an entry for the specified file if it
doesn't already exist.
- *
- * @param filePathIdentifier The unique identifier for the file.
- */
- public void updateMap(String filePathIdentifier) {
- // If the file is not already in the metrics map, add it with a new
AbfsReadFooterMetrics object.
- metricsMap.computeIfAbsent(filePathIdentifier, key -> new
AbfsReadFooterMetrics());
- }
-
- /**
- * Checks and updates metrics for a specific file identified by
filePathIdentifier.
- * If the metrics do not exist for the file, they are initialized.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- public void checkMetricUpdate(final String filePathIdentifier, final int
len, final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics = metricsMap.computeIfAbsent(
- filePathIdentifier, key -> new AbfsReadFooterMetrics());
- if (readFooterMetrics.getReadCount() == 0
- || (readFooterMetrics.getReadCount() >= 1
- && readFooterMetrics.getCollectMetrics())) {
- updateMetrics(filePathIdentifier, len, contentLength, nextReadPos);
+ /**
+ * Sets the offset difference between the first and second read.
+ *
+ * @param offsetDiff the value to set
+ */
+ private void setOffsetDiffBetweenFirstAndSecondRead(String offsetDiff)
{
+ offsetDiffBetweenFirstAndSecondRead = offsetDiff;
+ }
+
+ /**
+ * Returns the offset difference between the first and second read.
+ *
+ * @return the offset difference between the first and second read
+ */
+ private String getOffsetDiffBetweenFirstAndSecondRead() {
+ return offsetDiffBetweenFirstAndSecondRead;
+ }
+
+ /**
+ * Returns the file type.
+ *
+ * @return the file type
+ */
+ private FileType getFileType() {
+ return fileType;
+ }
}
- }
-
- /**
- * Updates metrics for a specific file identified by filePathIdentifier.
- *
- * @param filePathIdentifier The unique identifier for the file.
- * @param len The length of the read operation.
- * @param contentLength The total content length of the file.
- * @param nextReadPos The position of the next read operation.
- */
- private void updateMetrics(final String filePathIdentifier, final int len,
final long contentLength,
- final long nextReadPos) {
- AbfsReadFooterMetrics readFooterMetrics =
metricsMap.get(filePathIdentifier);
-
- // Create a new AbfsReadFooterMetrics object if it doesn't exist in the
metricsMap.
- if (readFooterMetrics == null) {
- readFooterMetrics = new AbfsReadFooterMetrics();
- metricsMap.put(filePathIdentifier, readFooterMetrics);
+
+ private final Map<String, FileTypeMetrics> fileTypeMetricsMap = new
HashMap<>();
Review Comment:
Taken!
> [ABFS] Implement Backoff and Read Footer metrics using IOStatistics Class
> -------------------------------------------------------------------------
>
> Key: HADOOP-19311
> URL: https://issues.apache.org/jira/browse/HADOOP-19311
> Project: Hadoop Common
> Issue Type: Task
> Components: fs/azure
> Affects Versions: 3.5.0, 3.4.1
> Reporter: Manish Bhatt
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> Current Flow: We have implemented metrics collection in ABFS flow. We have
> created a custom AbfsBackoffMetrics and AbfsReadFooterMetrics class which
> stores all the metrics on the file system level. Our objective is to move
> away from the custom class implementation and use IOStatisticsStore to store
> the metrics which is present in hadoop-common.
> Changes Made: This PR contains the changes related to storing metrics related
> to above mentioned classes in IOStatisticStore which is present in
> hadoop-common. AbstractAbfsStatisticsSource abstract class is created which
> is implementing IOStatisticsSource interface. This will store IOStatistics of
> the child metrics class.
> Both AbfsBackoffMetrics and AbfsReadFooterMetrics is inheriting
> AbstractAbfsStatisticsSource and store the respective metrics in
> IOStatisticsStore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]