This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 2e0cd1925 PARQUET-2374: Add metrics support for parquet file reader
(#1187)
2e0cd1925 is described below
commit 2e0cd1925546d2560f7658086251851e6fa68559
Author: Parth Chandra <[email protected]>
AuthorDate: Fri Jan 12 07:24:18 2024 -0800
PARQUET-2374: Add metrics support for parquet file reader (#1187)
---
.../java/org/apache/parquet/HadoopReadOptions.java | 8 +++-
.../org/apache/parquet/ParquetReadOptions.java | 20 +++++++-
.../parquet/hadoop/ColumnChunkPageReadStore.java | 29 ++++++++++++
.../apache/parquet/hadoop/ParquetFileReader.java | 54 +++++++++++++++++++++-
.../parquet/hadoop/ParquetFileReaderMetrics.java | 41 ++++++++++++++++
.../parquet/hadoop/ParquetMetricsCallback.java | 46 ++++++++++++++++++
6 files changed, 194 insertions(+), 4 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 78a24a03f..51a89a678 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -29,6 +29,7 @@ import org.apache.parquet.crypto.DecryptionPropertiesFactory;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.parquet.hadoop.ParquetMetricsCallback;
public class HadoopReadOptions extends ParquetReadOptions {
private final Configuration conf;
@@ -49,7 +50,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
int maxAllocationSize,
Map<String, String> properties,
Configuration conf,
- FileDecryptionProperties fileDecryptionProperties) {
+ FileDecryptionProperties fileDecryptionProperties,
+ ParquetMetricsCallback metricsCallback) {
super(
useSignedStringMinMax,
useStatsFilter,
@@ -66,6 +68,7 @@ public class HadoopReadOptions extends ParquetReadOptions {
maxAllocationSize,
properties,
fileDecryptionProperties,
+ metricsCallback,
new HadoopParquetConfiguration(conf));
this.conf = conf;
}
@@ -127,7 +130,8 @@ public class HadoopReadOptions extends ParquetReadOptions {
maxAllocationSize,
properties,
conf,
- fileDecryptionProperties);
+ fileDecryptionProperties,
+ metricsCallback);
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index ee7595212..72fe230bd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -43,6 +43,7 @@ import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetMetricsCallback;
import org.apache.parquet.hadoop.util.HadoopCodecs;
// Internal use only
@@ -75,6 +76,7 @@ public class ParquetReadOptions {
private final Map<String, String> properties;
private final FileDecryptionProperties fileDecryptionProperties;
private final ParquetConfiguration conf;
+ private final ParquetMetricsCallback metricsCallback;
ParquetReadOptions(
boolean useSignedStringMinMax,
@@ -91,7 +93,8 @@ public class ParquetReadOptions {
ByteBufferAllocator allocator,
int maxAllocationSize,
Map<String, String> properties,
- FileDecryptionProperties fileDecryptionProperties) {
+ FileDecryptionProperties fileDecryptionProperties,
+ ParquetMetricsCallback metricsCallback) {
this(
useSignedStringMinMax,
useStatsFilter,
@@ -108,6 +111,7 @@ public class ParquetReadOptions {
maxAllocationSize,
properties,
fileDecryptionProperties,
+ metricsCallback,
new HadoopParquetConfiguration());
}
@@ -127,6 +131,7 @@ public class ParquetReadOptions {
int maxAllocationSize,
Map<String, String> properties,
FileDecryptionProperties fileDecryptionProperties,
+ ParquetMetricsCallback metricsCallback,
ParquetConfiguration conf) {
this.useSignedStringMinMax = useSignedStringMinMax;
this.useStatsFilter = useStatsFilter;
@@ -143,6 +148,7 @@ public class ParquetReadOptions {
this.maxAllocationSize = maxAllocationSize;
this.properties = Collections.unmodifiableMap(properties);
this.fileDecryptionProperties = fileDecryptionProperties;
+ this.metricsCallback = metricsCallback;
this.conf = conf;
}
@@ -210,6 +216,10 @@ public class ParquetReadOptions {
return fileDecryptionProperties;
}
+ public ParquetMetricsCallback getMetricsCallback() {
+ return metricsCallback;
+ }
+
public boolean isEnabled(String property, boolean defaultValue) {
Optional<String> propValue = Optional.ofNullable(properties.get(property));
return propValue.map(Boolean::parseBoolean).orElse(defaultValue);
@@ -245,6 +255,7 @@ public class ParquetReadOptions {
protected Map<String, String> properties = new HashMap<>();
protected FileDecryptionProperties fileDecryptionProperties = null;
protected ParquetConfiguration conf;
+ protected ParquetMetricsCallback metricsCallback;
public Builder() {
this(new HadoopParquetConfiguration());
@@ -391,6 +402,11 @@ public class ParquetReadOptions {
return this;
}
+ public Builder withMetricsCallback(ParquetMetricsCallback metricsCallback)
{
+ this.metricsCallback = metricsCallback;
+ return this;
+ }
+
public Builder set(String key, String value) {
properties.put(key, value);
return this;
@@ -407,6 +423,7 @@ public class ParquetReadOptions {
withAllocator(options.allocator);
withPageChecksumVerification(options.usePageChecksumVerification);
withDecryption(options.fileDecryptionProperties);
+ withMetricsCallback(options.metricsCallback);
conf = options.conf;
for (Map.Entry<String, String> keyValue : options.properties.entrySet())
{
set(keyValue.getKey(), keyValue.getValue());
@@ -439,6 +456,7 @@ public class ParquetReadOptions {
maxAllocationSize,
properties,
fileDecryptionProperties,
+ metricsCallback,
conf);
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 4905e94d2..5c376c8ce 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -156,11 +156,13 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
+ long start = System.nanoTime();
decompressor.decompress(
byteBuffer,
(int) compressedSize,
decompressedBuffer,
dataPageV1.getUncompressedSize());
+ setDecompressMetrics(bytes, start);
// HACKY: sometimes we need to do `flip` because the position of
output bytebuffer is
// not reset.
@@ -172,7 +174,9 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
if (null != blockDecryptor) {
bytes =
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
}
+ long start = System.nanoTime();
decompressed = decompressor.decompress(bytes,
dataPageV1.getUncompressedSize());
+ setDecompressMetrics(bytes, start);
}
final DataPageV1 decompressedPage;
@@ -234,8 +238,10 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
- dataPageV2.getRepetitionLevels().size());
ByteBuffer decompressedBuffer =
options.getAllocator().allocate(uncompressedSize);
+ long start = System.nanoTime();
decompressor.decompress(
byteBuffer, (int) compressedSize, decompressedBuffer,
uncompressedSize);
+ setDecompressMetrics(pageBytes, start);
// HACKY: sometimes we need to do `flip` because the position
of output bytebuffer is
// not reset.
@@ -255,7 +261,9 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
int uncompressedSize =
Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
- dataPageV2.getRepetitionLevels().size());
+ long start = System.nanoTime();
pageBytes = decompressor.decompress(pageBytes,
uncompressedSize);
+ setDecompressMetrics(pageBytes, start);
}
}
} catch (IOException e) {
@@ -293,6 +301,23 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
});
}
+ private void setDecompressMetrics(BytesInput bytes, long start) {
+ final ParquetMetricsCallback metricsCallback =
options.getMetricsCallback();
+ if (metricsCallback != null) {
+ long time = Math.max(System.nanoTime() - start, 0);
+ long len = bytes.size();
+ double throughput = ((double) len / time) * ((double) 1000_000_000L) /
(1024 * 1024);
+ LOG.debug(
+ "Decompress block: Length: {} MB, Time: {} msecs, throughput: {}
MB/s",
+ len / (1024 * 1024),
+ time / 1000_000L,
+ throughput);
+
metricsCallback.setDuration(ParquetFileReaderMetrics.DecompressTime.name(),
time);
+
metricsCallback.setValueLong(ParquetFileReaderMetrics.DecompressSize.name(),
len);
+
metricsCallback.setValueDouble(ParquetFileReaderMetrics.DecompressThroughput.name(),
throughput);
+ }
+ }
+
@Override
public DictionaryPage readDictionaryPage() {
if (compressedDictionaryPage == null) {
@@ -303,6 +328,10 @@ class ColumnChunkPageReadStore implements PageReadStore,
DictionaryPageReadStore
if (null != blockDecryptor) {
bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(),
dictionaryPageAAD));
}
+ long start = System.nanoTime();
+ BytesInput decompressed =
+ decompressor.decompress(bytes,
compressedDictionaryPage.getUncompressedSize());
+ setDecompressMetrics(bytes, start);
DictionaryPage decompressedPage = new DictionaryPage(
decompressor.decompress(bytes,
compressedDictionaryPage.getUncompressedSize()),
compressedDictionaryPage.getDictionarySize(),
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index ce154e0aa..91be68ed6 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -823,6 +823,38 @@ public class ParquetFileReader implements Closeable {
this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
+ /**
+ * @param conf the Hadoop Configuration
+ * @param file Path to a parquet file
+ * @param footer a {@link ParquetMetadata} footer already read from the file
+ * @param options {@link ParquetReadOptions}
+ * @throws IOException if the file can not be opened
+ */
+ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata
footer, ParquetReadOptions options)
+ throws IOException {
+ this.converter = new ParquetMetadataConverter(conf);
+ this.file = HadoopInputFile.fromPath(file, conf);
+ this.f = this.file.newStream();
+ this.fileMetaData = footer.getFileMetaData();
+ this.fileDecryptor = fileMetaData.getFileDecryptor();
+ this.options = options;
+ this.footer = footer;
+ try {
+ this.blocks = filterRowGroups(footer.getBlocks());
+ } catch (Exception e) {
+ // In case that filterRowGroups throws an exception in the constructor,
the new stream
+ // should be closed. Otherwise, there's no way to close this outside.
+ f.close();
+ throw e;
+ }
+ this.blockIndexStores = listWithNulls(this.blocks.size());
+ this.blockRowRanges = listWithNulls(this.blocks.size());
+ for (ColumnDescriptor col :
footer.getFileMetaData().getSchema().getColumns()) {
+ paths.put(ColumnPath.get(col.getPath()), col);
+ }
+ this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+ }
+
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws
IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
@@ -1003,7 +1035,7 @@ public class ParquetFileReader implements Closeable {
ColumnChunkPageReadStore rowGroup =
new ColumnChunkPageReadStore(block.getRowCount(),
block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
- List<ConsecutivePartList> allParts = new ArrayList<ConsecutivePartList>();
+ List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
@@ -1961,10 +1993,12 @@ public class ParquetFileReader implements Closeable {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}
+ long readStart = System.nanoTime();
for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
+ setReadMetrics(readStart);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
@@ -1974,6 +2008,24 @@ public class ParquetFileReader implements Closeable {
}
}
+ private void setReadMetrics(long startNs) {
+ ParquetMetricsCallback metricsCallback = options.getMetricsCallback();
+ if (metricsCallback != null) {
+ long totalFileReadTimeNs = Math.max(System.nanoTime() - startNs, 0);
+ double sizeInMb = ((double) length) / (1024 * 1024);
+ double timeInSec = ((double) totalFileReadTimeNs) / 1000_0000_0000L;
+ double throughput = sizeInMb / timeInSec;
+ LOG.debug(
+ "Parquet: File Read stats: Length: {} MB, Time: {} secs,
throughput: {} MB/sec ",
+ sizeInMb,
+ timeInSec,
+ throughput);
+ metricsCallback.setDuration(ParquetFileReaderMetrics.ReadTime.name(),
totalFileReadTimeNs);
+ metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(),
length);
+
metricsCallback.setValueDouble(ParquetFileReaderMetrics.ReadThroughput.name(),
throughput);
+ }
+ }
+
/**
* @return the position following the last byte of these chunks
*/
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
new file mode 100644
index 000000000..737e6abb9
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+public enum ParquetFileReaderMetrics {
+
+ // metrics
+ ReadTime("time spent in reading Parquet file from storage"),
+ SeekTime("time spent in seek when reading Parquet file from storage"),
+ ReadSize("read size when reading Parquet file from storage (MB)"),
+ ReadThroughput("read throughput when reading Parquet file from storage
(MB/sec)"),
+ DecompressTime("time spent in block decompression"),
+ DecompressSize("decompressed data size (MB)"),
+ DecompressThroughput("block decompression throughput (MB/sec)");
+
+ private final String desc;
+
+ ParquetFileReaderMetrics(String desc) {
+ this.desc = desc;
+ }
+
+ public String description() {
+ return desc;
+ }
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
new file mode 100644
index 000000000..6527fa1f7
--- /dev/null
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetMetricsCallback.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A simple interface to pass basic metric values by name to any
implementation. Typically, an
+ * implementation of this interface will serve as a bridge to pass metric
values on
+ * to the metrics system of a distributed engine (hadoop, spark, etc).
+ * <br>
+ * Development Note: This interface should provide a default implementation
for any new metric tracker
+ * added to allow for backward compatibility
+ * <br>
+ * e.g.
+ * <br>
+ * <code>default addMaximum(key, value) { } ; </code>
+ */
[email protected]
+public interface ParquetMetricsCallback {
+ void setValueInt(String name, int value);
+
+ void setValueLong(String name, long value);
+
+ void setValueFloat(String name, float value);
+
+ void setValueDouble(String name, double value);
+
+ void setDuration(String name, long value);
+}