This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3f3feed9b4 [Parquet] Reduce one copy in `SerializedPageReader` (#8745)
3f3feed9b4 is described below
commit 3f3feed9b45c9be4367ed1a874fd2d48df77e5c7
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Sat Nov 1 05:42:52 2025 -0500
[Parquet] Reduce one copy in `SerializedPageReader` (#8745)
This was originally found by @MikeWalrus
Basically the ChunkReader for the async reader is `ColumnChunkData`:
https://github.com/apache/arrow-rs/blob/2eabb595d20e691cf0c9c3ccf6a5e1b67472b07b/parquet/src/arrow/in_memory_row_group.rs#L282-L292
Which by itself is `Bytes`. The original implementation will copy the
data from it and later only to make it a new `Bytes`.
This PR removes it.
Normally this should mean performance improvements across the board, but
here're the nuances:
1. Zero-copy means we need to hold the underlying buffer longer
2. Original implementation "accidentally" (I'm not sure) gc'ed the
buffer
3. To show meaningful performance difference, we need to use a proper
allocator, i.e., mimalloc
tldr: with mimalloc, it will always improve performance, or at least as
fast as the original implementation, tested locally with
`arrow_reader_clickbench`
cc @tustvold and @alamb who might know this better
---
parquet/src/file/serialized_reader.rs | 18 +++++-------------
1 file changed, 5 insertions(+), 13 deletions(-)
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 3f95ea9d49..ef71b4b6ac 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -397,9 +397,9 @@ pub(crate) fn decode_page(
}
let decompressed_size = uncompressed_page_size - offset;
let mut decompressed = Vec::with_capacity(uncompressed_page_size);
- decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
+ decompressed.extend_from_slice(&buffer[..offset]);
if decompressed_size > 0 {
- let compressed = &buffer.as_ref()[offset..];
+ let compressed = &buffer[offset..];
decompressor.decompress(compressed, &mut decompressed,
Some(decompressed_size))?;
}
@@ -897,6 +897,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R>
{
*remaining,
)?;
let data_len = header.compressed_page_size as usize;
+ let data_start = *offset;
*offset += data_len as u64;
*remaining -= data_len as u64;
@@ -904,16 +905,7 @@ impl<R: ChunkReader> PageReader for
SerializedPageReader<R> {
continue;
}
- let mut buffer = Vec::with_capacity(data_len);
- let read = read.take(data_len as u64).read_to_end(&mut
buffer)?;
-
- if read != data_len {
- return Err(eof_err!(
- "Expected to read {} bytes of page, read only {}",
- data_len,
- read
- ));
- }
+ let buffer = self.reader.get_bytes(data_start, data_len)?;
let buffer =
self.context
@@ -921,7 +913,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R>
{
let page = decode_page(
header,
- Bytes::from(buffer),
+ buffer,
self.physical_type,
self.decompressor.as_mut(),
)?;