This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new fcf655e19 Zero copy page decoding from bytes (#1810)
fcf655e19 is described below

commit fcf655e1939d7791fc0bd69299a704ad4bbe1f5a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jun 13 13:36:42 2022 +0100

    Zero copy page decoding from bytes (#1810)
---
 parquet/src/arrow/async_reader.rs     |  85 +++++++++++-
 parquet/src/file/serialized_reader.rs | 237 +++++++++++++++++++---------------
 2 files changed, 215 insertions(+), 107 deletions(-)

diff --git a/parquet/src/arrow/async_reader.rs 
b/parquet/src/arrow/async_reader.rs
index 541c98109..3f14114e3 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -77,7 +77,7 @@
 
 use std::collections::VecDeque;
 use std::fmt::Formatter;
-use std::io::SeekFrom;
+use std::io::{Cursor, SeekFrom};
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -86,6 +86,7 @@ use std::task::{Context, Poll};
 use bytes::{Buf, Bytes};
 use futures::future::{BoxFuture, FutureExt};
 use futures::stream::Stream;
+use parquet_format::PageType;
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
 
 use arrow::datatypes::SchemaRef;
@@ -96,11 +97,13 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
 use crate::arrow::schema::parquet_to_arrow_schema;
 use crate::arrow::ProjectionMask;
 use crate::basic::Compression;
-use crate::column::page::{PageIterator, PageReader};
+use crate::column::page::{Page, PageIterator, PageReader};
+use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::ParquetMetaData;
 use crate::file::reader::SerializedPageReader;
+use crate::file::serialized_reader::{decode_page, read_page_header};
 use crate::file::FOOTER_SIZE;
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
 
@@ -433,6 +436,7 @@ where
     }
 }
 
+/// An in-memory collection of column chunks
 struct InMemoryRowGroup {
     schema: SchemaDescPtr,
     column_chunks: Vec<Option<InMemoryColumnChunk>>,
@@ -459,6 +463,7 @@ impl RowGroupCollection for InMemoryRowGroup {
     }
 }
 
+/// Data for a single column chunk
 #[derive(Clone)]
 struct InMemoryColumnChunk {
     num_values: i64,
@@ -480,6 +485,82 @@ impl InMemoryColumnChunk {
     }
 }
 
+// A serialized implementation for Parquet [`PageReader`].
+struct InMemoryColumnChunkReader {
+    chunk: InMemoryColumnChunk,
+    decompressor: Option<Box<dyn Codec>>,
+    offset: usize,
+    seen_num_values: i64,
+}
+
+impl InMemoryColumnChunkReader {
+    /// Creates a new serialized page reader from file source.
+    pub fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
+        let decompressor = create_codec(chunk.compression)?;
+        let result = Self {
+            chunk,
+            decompressor,
+            offset: 0,
+            seen_num_values: 0,
+        };
+        Ok(result)
+    }
+}
+
+impl Iterator for InMemoryColumnChunkReader {
+    type Item = Result<Page>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.get_next_page().transpose()
+    }
+}
+
+impl PageReader for InMemoryColumnChunkReader {
+    fn get_next_page(&mut self) -> Result<Option<Page>> {
+        while self.seen_num_values < self.chunk.num_values {
+            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
+            let page_header = read_page_header(&mut cursor)?;
+            let compressed_size = page_header.compressed_page_size as usize;
+
+            self.offset += cursor.position() as usize;
+            let start_offset = self.offset;
+            let end_offset = self.offset + compressed_size;
+            self.offset = end_offset;
+
+            let buffer = self.chunk.data.slice(start_offset..end_offset);
+
+            let result = match page_header.type_ {
+                PageType::DataPage | PageType::DataPageV2 => {
+                    let decoded = decode_page(
+                        page_header,
+                        buffer.into(),
+                        self.chunk.physical_type,
+                        self.decompressor.as_mut(),
+                    )?;
+                    self.seen_num_values += decoded.num_values() as i64;
+                    decoded
+                }
+                PageType::DictionaryPage => decode_page(
+                    page_header,
+                    buffer.into(),
+                    self.chunk.physical_type,
+                    self.decompressor.as_mut(),
+                )?,
+                _ => {
+                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
+                    continue;
+                }
+            };
+
+            return Ok(Some(result));
+        }
+
+        // We are at the end of this column chunk and no more page left. 
Return None.
+        Ok(None)
+    }
+}
+
+/// Implements [`PageIterator`] for a single column chunk, yielding a single 
[`PageReader`]
 struct ColumnChunkIterator {
     schema: SchemaDescPtr,
     column_schema: ColumnDescPtr,
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 119429235..6ff73e041 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -358,6 +358,108 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for 
SerializedRowGroupReader<'
     }
 }
 
+/// Reads a [`PageHeader`] from the provided [`Read`]
+pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
+    let mut prot = TCompactInputProtocol::new(input);
+    let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
+    Ok(page_header)
+}
+
+/// Decodes a [`Page`] from the provided `buffer`
+pub(crate) fn decode_page(
+    page_header: PageHeader,
+    buffer: ByteBufferPtr,
+    physical_type: Type,
+    decompressor: Option<&mut Box<dyn Codec>>,
+) -> Result<Page> {
+    // When processing data page v2, depending on enabled compression for the
+    // page, we should account for uncompressed data ('offset') of
+    // repetition and definition levels.
+    //
+    // We always use 0 offset for other pages other than v2, `true` flag means
+    // that compression will be applied if decompressor is defined
+    let mut offset: usize = 0;
+    let mut can_decompress = true;
+
+    if let Some(ref header_v2) = page_header.data_page_header_v2 {
+        offset = (header_v2.definition_levels_byte_length
+            + header_v2.repetition_levels_byte_length) as usize;
+        // When is_compressed flag is missing the page is considered compressed
+        can_decompress = header_v2.is_compressed.unwrap_or(true);
+    }
+
+    // TODO: page header could be huge because of statistics. We should set a
+    // maximum page header size and abort if that is exceeded.
+    let buffer = match decompressor {
+        Some(decompressor) if can_decompress => {
+            let uncompressed_size = page_header.uncompressed_page_size as 
usize;
+            let mut decompressed = Vec::with_capacity(uncompressed_size);
+            let compressed = &buffer.as_ref()[offset..];
+            decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
+            decompressor.decompress(compressed, &mut decompressed)?;
+
+            if decompressed.len() != uncompressed_size {
+                return Err(general_err!(
+                    "Actual decompressed size doesn't match the expected one 
({} vs {})",
+                    decompressed.len(),
+                    uncompressed_size
+                ));
+            }
+
+            ByteBufferPtr::new(decompressed)
+        }
+        _ => buffer,
+    };
+
+    let result = match page_header.type_ {
+        PageType::DictionaryPage => {
+            assert!(page_header.dictionary_page_header.is_some());
+            let dict_header = 
page_header.dictionary_page_header.as_ref().unwrap();
+            let is_sorted = dict_header.is_sorted.unwrap_or(false);
+            Page::DictionaryPage {
+                buf: buffer,
+                num_values: dict_header.num_values as u32,
+                encoding: Encoding::from(dict_header.encoding),
+                is_sorted,
+            }
+        }
+        PageType::DataPage => {
+            assert!(page_header.data_page_header.is_some());
+            let header = page_header.data_page_header.unwrap();
+            Page::DataPage {
+                buf: buffer,
+                num_values: header.num_values as u32,
+                encoding: Encoding::from(header.encoding),
+                def_level_encoding: 
Encoding::from(header.definition_level_encoding),
+                rep_level_encoding: 
Encoding::from(header.repetition_level_encoding),
+                statistics: statistics::from_thrift(physical_type, 
header.statistics),
+            }
+        }
+        PageType::DataPageV2 => {
+            assert!(page_header.data_page_header_v2.is_some());
+            let header = page_header.data_page_header_v2.unwrap();
+            let is_compressed = header.is_compressed.unwrap_or(true);
+            Page::DataPageV2 {
+                buf: buffer,
+                num_values: header.num_values as u32,
+                encoding: Encoding::from(header.encoding),
+                num_nulls: header.num_nulls as u32,
+                num_rows: header.num_rows as u32,
+                def_levels_byte_len: header.definition_levels_byte_length as 
u32,
+                rep_levels_byte_len: header.repetition_levels_byte_length as 
u32,
+                is_compressed,
+                statistics: statistics::from_thrift(physical_type, 
header.statistics),
+            }
+        }
+        _ => {
+            // For unknown page type (e.g., INDEX_PAGE), skip and read next.
+            unimplemented!("Page type {:?} is not supported", 
page_header.type_)
+        }
+    };
+
+    Ok(result)
+}
+
 /// A serialized implementation for Parquet [`PageReader`].
 pub struct SerializedPageReader<T: Read> {
     // The file source buffer which references exactly the bytes for the 
column trunk
@@ -395,13 +497,6 @@ impl<T: Read> SerializedPageReader<T> {
         };
         Ok(result)
     }
-
-    /// Reads Page header from Thrift.
-    fn read_page_header(&mut self) -> Result<PageHeader> {
-        let mut prot = TCompactInputProtocol::new(&mut self.buf);
-        let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
-        Ok(page_header)
-    }
 }
 
 impl<T: Read + Send> Iterator for SerializedPageReader<T> {
@@ -415,108 +510,40 @@ impl<T: Read + Send> Iterator for 
SerializedPageReader<T> {
 impl<T: Read + Send> PageReader for SerializedPageReader<T> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
         while self.seen_num_values < self.total_num_values {
-            let page_header = self.read_page_header()?;
-
-            // When processing data page v2, depending on enabled compression 
for the
-            // page, we should account for uncompressed data ('offset') of
-            // repetition and definition levels.
-            //
-            // We always use 0 offset for other pages other than v2, `true` 
flag means
-            // that compression will be applied if decompressor is defined
-            let mut offset: usize = 0;
-            let mut can_decompress = true;
-
-            if let Some(ref header_v2) = page_header.data_page_header_v2 {
-                offset = (header_v2.definition_levels_byte_length
-                    + header_v2.repetition_levels_byte_length)
-                    as usize;
-                // When is_compressed flag is missing the page is considered 
compressed
-                can_decompress = header_v2.is_compressed.unwrap_or(true);
-            }
-
-            let compressed_len = page_header.compressed_page_size as usize - 
offset;
-            let uncompressed_len = page_header.uncompressed_page_size as usize 
- offset;
-            // We still need to read all bytes from buffered stream
-            let mut buffer = vec![0; offset + compressed_len];
-            self.buf.read_exact(&mut buffer)?;
-
-            // TODO: page header could be huge because of statistics. We 
should set a
-            // maximum page header size and abort if that is exceeded.
-            if let Some(decompressor) = self.decompressor.as_mut() {
-                if can_decompress {
-                    let mut decompressed_buffer = 
Vec::with_capacity(uncompressed_len);
-                    let decompressed_size = decompressor
-                        .decompress(&buffer[offset..], &mut 
decompressed_buffer)?;
-                    if decompressed_size != uncompressed_len {
-                        return Err(general_err!(
-              "Actual decompressed size doesn't match the expected one ({} vs 
{})",
-              decompressed_size,
-              uncompressed_len
-            ));
-                    }
-                    if offset == 0 {
-                        buffer = decompressed_buffer;
-                    } else {
-                        // Prepend saved offsets to the buffer
-                        buffer.truncate(offset);
-                        buffer.append(&mut decompressed_buffer);
-                    }
-                }
+            let page_header = read_page_header(&mut self.buf)?;
+
+            let to_read = page_header.compressed_page_size as usize;
+            let mut buffer = Vec::with_capacity(to_read);
+            let read = (&mut self.buf)
+                .take(to_read as u64)
+                .read_to_end(&mut buffer)?;
+
+            if read != to_read {
+                return Err(eof_err!(
+                    "Expected to read {} bytes of page, read only {}",
+                    to_read,
+                    read
+                ));
             }
 
+            let buffer = ByteBufferPtr::new(buffer);
             let result = match page_header.type_ {
-                PageType::DictionaryPage => {
-                    assert!(page_header.dictionary_page_header.is_some());
-                    let dict_header =
-                        page_header.dictionary_page_header.as_ref().unwrap();
-                    let is_sorted = dict_header.is_sorted.unwrap_or(false);
-                    Page::DictionaryPage {
-                        buf: ByteBufferPtr::new(buffer),
-                        num_values: dict_header.num_values as u32,
-                        encoding: Encoding::from(dict_header.encoding),
-                        is_sorted,
-                    }
-                }
-                PageType::DataPage => {
-                    assert!(page_header.data_page_header.is_some());
-                    let header = page_header.data_page_header.unwrap();
-                    self.seen_num_values += header.num_values as i64;
-                    Page::DataPage {
-                        buf: ByteBufferPtr::new(buffer),
-                        num_values: header.num_values as u32,
-                        encoding: Encoding::from(header.encoding),
-                        def_level_encoding: Encoding::from(
-                            header.definition_level_encoding,
-                        ),
-                        rep_level_encoding: Encoding::from(
-                            header.repetition_level_encoding,
-                        ),
-                        statistics: statistics::from_thrift(
-                            self.physical_type,
-                            header.statistics,
-                        ),
-                    }
-                }
-                PageType::DataPageV2 => {
-                    assert!(page_header.data_page_header_v2.is_some());
-                    let header = page_header.data_page_header_v2.unwrap();
-                    let is_compressed = header.is_compressed.unwrap_or(true);
-                    self.seen_num_values += header.num_values as i64;
-                    Page::DataPageV2 {
-                        buf: ByteBufferPtr::new(buffer),
-                        num_values: header.num_values as u32,
-                        encoding: Encoding::from(header.encoding),
-                        num_nulls: header.num_nulls as u32,
-                        num_rows: header.num_rows as u32,
-                        def_levels_byte_len: 
header.definition_levels_byte_length as u32,
-                        rep_levels_byte_len: 
header.repetition_levels_byte_length as u32,
-                        is_compressed,
-                        statistics: statistics::from_thrift(
-                            self.physical_type,
-                            header.statistics,
-                        ),
-                    }
+                PageType::DataPage | PageType::DataPageV2 => {
+                    let decoded = decode_page(
+                        page_header,
+                        buffer,
+                        self.physical_type,
+                        self.decompressor.as_mut(),
+                    )?;
+                    self.seen_num_values += decoded.num_values() as i64;
+                    decoded
                 }
+                PageType::DictionaryPage => decode_page(
+                    page_header,
+                    buffer,
+                    self.physical_type,
+                    self.decompressor.as_mut(),
+                )?,
                 _ => {
                     // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
                     continue;

Reply via email to