This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e0cd1925 PARQUET-2374: Add metrics support for parquet file reader 
(#1187)
2e0cd1925 is described below

commit 2e0cd1925546d2560f7658086251851e6fa68559
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Jan 12 07:24:18 2024 -0800

    PARQUET-2374: Add metrics support for parquet file reader (#1187)
---
 .../java/org/apache/parquet/HadoopReadOptions.java |  8 +++-
 .../org/apache/parquet/ParquetReadOptions.java     | 20 +++++++-
 .../parquet/hadoop/ColumnChunkPageReadStore.java   | 29 ++++++++++++
 .../apache/parquet/hadoop/ParquetFileReader.java   | 54 +++++++++++++++++++++-
 .../parquet/hadoop/ParquetFileReaderMetrics.java   | 41 ++++++++++++++++
 .../parquet/hadoop/ParquetMetricsCallback.java     | 46 ++++++++++++++++++
 6 files changed, 194 insertions(+), 4 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 78a24a03f..51a89a678 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -29,6 +29,7 @@ import org.apache.parquet.crypto.DecryptionPropertiesFactory;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.parquet.hadoop.ParquetMetricsCallback;
 
 public class HadoopReadOptions extends ParquetReadOptions {
   private final Configuration conf;
@@ -49,7 +50,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
       int maxAllocationSize,
       Map<String, String> properties,
       Configuration conf,
-      FileDecryptionProperties fileDecryptionProperties) {
+      FileDecryptionProperties fileDecryptionProperties,
+      ParquetMetricsCallback metricsCallback) {
     super(
         useSignedStringMinMax,
         useStatsFilter,
@@ -66,6 +68,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
         maxAllocationSize,
         properties,
         fileDecryptionProperties,
+        metricsCallback,
         new HadoopParquetConfiguration(conf));
     this.conf = conf;
   }
@@ -127,7 +130,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
           maxAllocationSize,
           properties,
           conf,
-          fileDecryptionProperties);
+          fileDecryptionProperties,
+          metricsCallback);
     }
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index ee7595212..72fe230bd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -43,6 +43,7 @@ import org.apache.parquet.conf.ParquetConfiguration;
 import org.apache.parquet.crypto.FileDecryptionProperties;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetMetricsCallback;
 import org.apache.parquet.hadoop.util.HadoopCodecs;
 
 // Internal use only
@@ -75,6 +76,7 @@ public class ParquetReadOptions {
   private final Map<String, String> properties;
   private final FileDecryptionProperties fileDecryptionProperties;
   private final ParquetConfiguration conf;
+  private final ParquetMetricsCallback metricsCallback;
 
   ParquetReadOptions(
       boolean useSignedStringMinMax,
@@ -91,7 +93,8 @@ public class ParquetReadOptions {
       ByteBufferAllocator allocator,
       int maxAllocationSize,
       Map<String, String> properties,
-      FileDecryptionProperties fileDecryptionProperties) {
+      FileDecryptionProperties fileDecryptionProperties,
+      ParquetMetricsCallback metricsCallback) {
     this(
         useSignedStringMinMax,
         useStatsFilter,
@@ -108,6 +111,7 @@ public class ParquetReadOptions {
         maxAllocationSize,
         properties,
         fileDecryptionProperties,
+        metricsCallback,
         new HadoopParquetConfiguration());
   }
 
@@ -127,6 +131,7 @@ public class ParquetReadOptions {
       int maxAllocationSize,
       Map<String, String> properties,
       FileDecryptionProperties fileDecryptionProperties,
+      ParquetMetricsCallback metricsCallback,
       ParquetConfiguration conf) {
     this.useSignedStringMinMax = useSignedStringMinMax;
     this.useStatsFilter = useStatsFilter;
@@ -143,6 +148,7 @@ public class ParquetReadOptions {
     this.maxAllocationSize = maxAllocationSize;
     this.properties = Collections.unmodifiableMap(properties);
     this.fileDecryptionProperties = fileDecryptionProperties;
+    this.metricsCallback = metricsCallback;
     this.conf = conf;
   }
 
@@ -210,6 +216,10 @@ public class ParquetReadOptions {
     return fileDecryptionProperties;
   }
 
+  public ParquetMetricsCallback getMetricsCallback() {
+    return metricsCallback;
+  }
+
   public boolean isEnabled(String property, boolean defaultValue) {
     Optional<String> propValue = Optional.ofNullable(properties.get(property));
     return propValue.map(Boolean::parseBoolean).orElse(defaultValue);
@@ -245,6 +255,7 @@ public class ParquetReadOptions {
     protected Map<String, String> properties = new HashMap<>();
     protected FileDecryptionProperties fileDecryptionProperties = null;
     protected ParquetConfiguration conf;
+    protected ParquetMetricsCallback metricsCallback;
 
     public Builder() {
       this(new HadoopParquetConfiguration());
@@ -391,6 +402,11 @@ public class ParquetReadOptions {
       return this;
     }
 
+    public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback) 
{
+      this.metricsCallback = metricsCallback;
+      return this;
+    }
+
     public Builder set(String key, String value) {
       properties.put(key, value);
       return this;
@@ -407,6 +423,7 @@ public class ParquetReadOptions {
       withAllocator(options.allocator);
       withPageChecksumVerification(options.usePageChecksumVerification);
       withDecryption(options.fileDecryptionProperties);
+      withMetricsCallback(options.metricsCallback);
       conf = options.conf;
       for (Map.Entry<String, String> keyValue : options.properties.entrySet()) 
{
         set(keyValue.getKey(), keyValue.getValue());
@@ -439,6 +456,7 @@ public class ParquetReadOptions {
           maxAllocationSize,
           properties,
           fileDecryptionProperties,
+          metricsCallback,
           conf);
     }
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 4905e94d2..5c376c8ce 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -156,11 +156,13 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
 
               ByteBuffer decompressedBuffer =
                   
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
+              long start = System.nanoTime();
               decompressor.decompress(
                   byteBuffer,
                   (int) compressedSize,
                   decompressedBuffer,
                   dataPageV1.getUncompressedSize());
+              setDecompressMetrics(bytes, start);
 
               // HACKY: sometimes we need to do `flip` because the position of 
output bytebuffer is
               // not reset.
@@ -172,7 +174,9 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
               if (null != blockDecryptor) {
                 bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
               }
+              long start = System.nanoTime();
               decompressed = decompressor.decompress(bytes, 
dataPageV1.getUncompressedSize());
+              setDecompressMetrics(bytes, start);
             }
 
             final DataPageV1 decompressedPage;
@@ -234,8 +238,10 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                     - dataPageV2.getRepetitionLevels().size());
                 ByteBuffer decompressedBuffer =
                     options.getAllocator().allocate(uncompressedSize);
+                long start = System.nanoTime();
                 decompressor.decompress(
                     byteBuffer, (int) compressedSize, decompressedBuffer, 
uncompressedSize);
+                setDecompressMetrics(pageBytes, start);
 
                 // HACKY: sometimes we need to do `flip` because the position 
of output bytebuffer is
                 // not reset.
@@ -255,7 +261,9 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
                 int uncompressedSize = 
Math.toIntExact(dataPageV2.getUncompressedSize()
                     - dataPageV2.getDefinitionLevels().size()
                     - dataPageV2.getRepetitionLevels().size());
+                long start = System.nanoTime();
                 pageBytes = decompressor.decompress(pageBytes, 
uncompressedSize);
+                setDecompressMetrics(pageBytes, start);
               }
             }
           } catch (IOException e) {
@@ -293,6 +301,23 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
       });
     }
 
+    private void setDecompressMetrics(BytesInput bytes, long start) {
+      final ParquetMetricsCallback metricsCallback = 
options.getMetricsCallback();
+      if (metricsCallback != null) {
+        long time = Math.max(System.nanoTime() - start, 0);
+        long len = bytes.size();
+        double throughput = ((double) len / time) * ((double) 1000_000_000L) / 
(1024 * 1024);
+        LOG.debug(
+            "Decompress block: Length: {} MB, Time: {} msecs, throughput: {} 
MB/s",
+            len / (1024 * 1024),
+            time / 1000_000L,
+            throughput);
+        
metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(), 
time);
+        
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(), 
len);
+        
metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(),
 throughput);
+      }
+    }
+
     @Override
     public DictionaryPage readDictionaryPage() {
       if (compressedDictionaryPage == null) {
@@ -303,6 +328,10 @@ class ColumnChunkPageReadStore implements PageReadStore, 
DictionaryPageReadStore
         if (null != blockDecryptor) {
           bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), 
dictionaryPageAAD));
         }
+        long start = System.nanoTime();
+        BytesInput decompressed =
+            decompressor.decompress(bytes, 
compressedDictionaryPage.getUncompressedSize());
+        setDecompressMetrics(bytes, start);
         DictionaryPage decompressedPage = new DictionaryPage(
             decompressor.decompress(bytes, 
compressedDictionaryPage.getUncompressedSize()),
             compressedDictionaryPage.getDictionarySize(),
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index ce154e0aa..91be68ed6 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -823,6 +823,38 @@ public class ParquetFileReader implements Closeable {
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
+  /**
+   * @param conf   the Hadoop Configuration
+   * @param file   Path to a parquet file
+   * @param footer a {@link ParquetMetadata} footer already read from the file
+   * @param options {@link ParquetReadOptions}
+   * @throws IOException if the file can not be opened
+   */
+  public ParquetFileReader(Configuration conf, Path file, ParquetMetadata 
footer, ParquetReadOptions options)
+      throws IOException {
+    this.converter = new ParquetMetadataConverter(conf);
+    this.file = HadoopInputFile.fromPath(file, conf);
+    this.f = this.file.newStream();
+    this.fileMetaData = footer.getFileMetaData();
+    this.fileDecryptor = fileMetaData.getFileDecryptor();
+    this.options = options;
+    this.footer = footer;
+    try {
+      this.blocks = filterRowGroups(footer.getBlocks());
+    } catch (Exception e) {
+      // In case that filterRowGroups throws an exception in the constructor, 
the new stream
+      // should be closed. Otherwise, there's no way to close this outside.
+      f.close();
+      throw e;
+    }
+    this.blockIndexStores = listWithNulls(this.blocks.size());
+    this.blockRowRanges = listWithNulls(this.blocks.size());
+    for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+    this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+  }
+
   public ParquetFileReader(InputFile file, ParquetReadOptions options) throws 
IOException {
     this.converter = new ParquetMetadataConverter(options);
     this.file = file;
@@ -1003,7 +1035,7 @@ public class ParquetFileReader implements Closeable {
     ColumnChunkPageReadStore rowGroup =
         new ColumnChunkPageReadStore(block.getRowCount(), 
block.getRowIndexOffset());
     // prepare the list of consecutive parts to read them in one scan
-    List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+    List<ConsecutivePartList> allParts = new ArrayList<>();
     ConsecutivePartList currentParts = null;
     for (ColumnChunkMetaData mc : block.getColumns()) {
       ColumnPath pathKey = mc.getPath();
@@ -1961,10 +1993,12 @@ public class ParquetFileReader implements Closeable {
         buffers.add(options.getAllocator().allocate(lastAllocationSize));
       }
 
+      long readStart = System.nanoTime();
       for (ByteBuffer buffer : buffers) {
         f.readFully(buffer);
         buffer.flip();
       }
+      setReadMetrics(readStart);
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
@@ -1974,6 +2008,24 @@ public class ParquetFileReader implements Closeable {
       }
     }
 
+    private void setReadMetrics(long startNs) {
+      ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
+      if (metricsCallback != null) {
+        long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
+        double sizeInMb = ((double) length) / (1024 * 1024);
+        double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
+        double throughput = sizeInMb / timeInSec;
+        LOG.debug(
+            "Parquet: File Read stats:  Length: {} MB, Time: {} secs, 
throughput: {} MB/sec ",
+            sizeInMb,
+            timeInSec,
+            throughput);
+        metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(), 
totalFileReadTimeNs);
+        metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), 
length);
+        
metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(), 
throughput);
+      }
+    }
+
     /**
      * @return the position following the last byte of these chunks
      */
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
new file mode 100644
index 000000000..737e6abb9
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+public enum ParquetFileReaderMetrics {
+
+  // metrics
+  ReadTime("time spent in reading Parquet file from storage"),
+  SeekTime("time spent in seek when reading Parquet file from storage"),
+  ReadSize("read size when reading Parquet file from storage (MB)"),
+  ReadThroughput("read throughput when reading Parquet file from storage 
(MB/sec)"),
+  DecompressTime("time spent in block decompression"),
+  DecompressSize("decompressed data size (MB)"),
+  DecompressThroughput("block decompression throughput (MB/sec)");
+
+  private final String desc;
+
+  ParquetFileReaderMetrics(String desc) {
+    this.desc = desc;
+  }
+
+  public String description() {
+    return desc;
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
new file mode 100644
index 000000000..6527fa1f7
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ *  A simple interface to pass basic metric values by name to any 
implementation. Typically, an
+ *  implementation of this interface will serve as a bridge to pass metric 
values on
+ *  to the metrics system of a distributed engine (hadoop, spark, etc).
+ *  <br>
+ *  Development Note: This interface should provide a default implementation 
for any new metric tracker
+ *  added to allow for backward compatibility
+ *  <br>
+ *   e.g.
+ *  <br>
+ *  <code>default addMaximum(key, value) { } ; </code>
+ */
[email protected]
+public interface ParquetMetricsCallback {
+  void setValueInt(String name, int value);
+
+  void setValueLong(String name, long value);
+
+  void setValueFloat(String name, float value);
+
+  void setValueDouble(String name, double value);
+
+  void setDuration(String name, long value);
+}

Reply via email to