This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 337d0825d PARQUET-2171: (followup) add read metrics and hadoop conf 
integration for vector io reader (#1330)
337d0825d is described below

commit 337d0825d30bdcfcde884fb7509d5d6f6d5f8416
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Apr 29 01:30:03 2024 -0700

    PARQUET-2171: (followup) add read metrics and hadoop conf integration for 
vector io reader (#1330)
---
 .../src/main/java/org/apache/parquet/ParquetReadOptions.java      | 2 ++
 .../main/java/org/apache/parquet/hadoop/ParquetFileReader.java    | 8 +++++---
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index e737c799b..8c05d0224 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -23,6 +23,7 @@ import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static 
org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
 import static 
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
@@ -285,6 +286,7 @@ public class ParquetReadOptions {
       withCodecFactory(HadoopCodecs.newFactory(conf, 0));
       withRecordFilter(getFilter(conf));
       withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
+      withUseHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED, 
HADOOP_VECTORED_IO_ENABLED_DEFAULT));
       String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
       if (badRecordThresh != null) {
         set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8776e85e6..1d8cce3d8 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -2165,7 +2165,7 @@ public class ParquetFileReader implements Closeable {
         f.readFully(buffer);
         buffer.flip();
       }
-      setReadMetrics(readStart);
+      setReadMetrics(readStart, length);
 
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
@@ -2175,11 +2175,11 @@ public class ParquetFileReader implements Closeable {
       }
     }
 
-    private void setReadMetrics(long startNs) {
+    private void setReadMetrics(long startNs, long len) {
       ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
       if (metricsCallback != null) {
         long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
-        double sizeInMb = ((double) length) / (1024 * 1024);
+        double sizeInMb = ((double) len) / (1024 * 1024);
         double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
         double throughput = sizeInMb / timeInSec;
         LOG.debug(
@@ -2203,12 +2203,14 @@ public class ParquetFileReader implements Closeable {
     public void readFromVectoredRange(ParquetFileRange currRange, 
ChunkListBuilder builder) throws IOException {
       ByteBuffer buffer;
       final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
+      long readStart = System.nanoTime();
       try {
         LOG.debug(
             "Waiting for vectored read to finish for range {} with timeout {} 
seconds",
             currRange,
             timeoutSeconds);
         buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(), 
timeoutSeconds, TimeUnit.SECONDS);
+        setReadMetrics(readStart, currRange.getLength());
         // report in a counter the data we just scanned
         BenchmarkCounter.incrementBytesRead(currRange.getLength());
       } catch (TimeoutException e) {

Reply via email to