This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f3feed9b4 [Parquet] Reduce one copy in `SerializedPageReader` (#8745)
3f3feed9b4 is described below

commit 3f3feed9b45c9be4367ed1a874fd2d48df77e5c7
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Sat Nov 1 05:42:52 2025 -0500

    [Parquet] Reduce one copy in `SerializedPageReader` (#8745)
    
    This was originally found by @MikeWalrus
    
    Basically the ChunkReader for the async reader is `ColumnChunkData`:
    
https://github.com/apache/arrow-rs/blob/2eabb595d20e691cf0c9c3ccf6a5e1b67472b07b/parquet/src/arrow/in_memory_row_group.rs#L282-L292
    
    Which by itself is `Bytes`. The original implementation will copy the
    data from it and later only to make it a new `Bytes`.
    This PR removes it.
    
    Normally this should mean performance improvements across the board, but
    here're the nuances:
    1. Zero-copy means we need to hold the underlying buffer longer
    2. Original implementation "accidentally" (I'm not sure) gc'ed the
    buffer
    3. To show meaningful performance difference, we need to use a proper
    allocator, i.e., mimalloc
    
    tldr: with mimalloc, it will always improve performance, or at least as
    fast as the original implementation, tested locally with
    `arrow_reader_clickbench`
    
    cc @tustvold and @alamb who might know this better
---
 parquet/src/file/serialized_reader.rs | 18 +++++-------------
 1 file changed, 5 insertions(+), 13 deletions(-)

diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 3f95ea9d49..ef71b4b6ac 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -397,9 +397,9 @@ pub(crate) fn decode_page(
             }
             let decompressed_size = uncompressed_page_size - offset;
             let mut decompressed = Vec::with_capacity(uncompressed_page_size);
-            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
+            decompressed.extend_from_slice(&buffer[..offset]);
             if decompressed_size > 0 {
-                let compressed = &buffer.as_ref()[offset..];
+                let compressed = &buffer[offset..];
                 decompressor.decompress(compressed, &mut decompressed, 
Some(decompressed_size))?;
             }
 
@@ -897,6 +897,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
                         *remaining,
                     )?;
                     let data_len = header.compressed_page_size as usize;
+                    let data_start = *offset;
                     *offset += data_len as u64;
                     *remaining -= data_len as u64;
 
@@ -904,16 +905,7 @@ impl<R: ChunkReader> PageReader for 
SerializedPageReader<R> {
                         continue;
                     }
 
-                    let mut buffer = Vec::with_capacity(data_len);
-                    let read = read.take(data_len as u64).read_to_end(&mut 
buffer)?;
-
-                    if read != data_len {
-                        return Err(eof_err!(
-                            "Expected to read {} bytes of page, read only {}",
-                            data_len,
-                            read
-                        ));
-                    }
+                    let buffer = self.reader.get_bytes(data_start, data_len)?;
 
                     let buffer =
                         self.context
@@ -921,7 +913,7 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
 
                     let page = decode_page(
                         header,
-                        Bytes::from(buffer),
+                        buffer,
                         self.physical_type,
                         self.decompressor.as_mut(),
                     )?;

Reply via email to