This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 337d0825d PARQUET-2171: (followup) add read metrics and hadoop conf
integration for vector io reader (#1330)
337d0825d is described below
commit 337d0825d30bdcfcde884fb7509d5d6f6d5f8416
Author: Parth Chandra <[email protected]>
AuthorDate: Mon Apr 29 01:30:03 2024 -0700
PARQUET-2171: (followup) add read metrics and hadoop conf integration for
vector io reader (#1330)
---
.../src/main/java/org/apache/parquet/ParquetReadOptions.java | 2 ++
.../main/java/org/apache/parquet/hadoop/ParquetFileReader.java | 8 +++++---
2 files changed, 7 insertions(+), 3 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index e737c799b..8c05d0224 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -23,6 +23,7 @@ import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FI
import static
org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED;
import static
org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
import static
org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static
org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;
import static
org.apache.parquet.hadoop.ParquetInputFormat.OFF_HEAP_DECRYPT_BUFFER_ENABLED;
import static
org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
import static
org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
@@ -285,6 +286,7 @@ public class ParquetReadOptions {
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
+ withUseHadoopVectoredIo(conf.getBoolean(HADOOP_VECTORED_IO_ENABLED,
HADOOP_VECTORED_IO_ENABLED_DEFAULT));
String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY);
if (badRecordThresh != null) {
set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8776e85e6..1d8cce3d8 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -2165,7 +2165,7 @@ public class ParquetFileReader implements Closeable {
f.readFully(buffer);
buffer.flip();
}
- setReadMetrics(readStart);
+ setReadMetrics(readStart, length);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
@@ -2175,11 +2175,11 @@ public class ParquetFileReader implements Closeable {
}
}
- private void setReadMetrics(long startNs) {
+ private void setReadMetrics(long startNs, long len) {
ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
if (metricsCallback != null) {
long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
- double sizeInMb = ((double) length) / (1024 * 1024);
+ double sizeInMb = ((double) len) / (1024 * 1024);
double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
double throughput = sizeInMb / timeInSec;
LOG.debug(
@@ -2203,12 +2203,14 @@ public class ParquetFileReader implements Closeable {
public void readFromVectoredRange(ParquetFileRange currRange,
ChunkListBuilder builder) throws IOException {
ByteBuffer buffer;
final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
+ long readStart = System.nanoTime();
try {
LOG.debug(
"Waiting for vectored read to finish for range {} with timeout {}
seconds",
currRange,
timeoutSeconds);
buffer = FutureIO.awaitFuture(currRange.getDataReadFuture(),
timeoutSeconds, TimeUnit.SECONDS);
+ setReadMetrics(readStart, currRange.getLength());
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(currRange.getLength());
} catch (TimeoutException e) {