This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new fcf655e19 Zero copy page decoding from bytes (#1810)
fcf655e19 is described below
commit fcf655e1939d7791fc0bd69299a704ad4bbe1f5a
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Jun 13 13:36:42 2022 +0100
Zero copy page decoding from bytes (#1810)
---
parquet/src/arrow/async_reader.rs | 85 +++++++++++-
parquet/src/file/serialized_reader.rs | 237 +++++++++++++++++++---------------
2 files changed, 215 insertions(+), 107 deletions(-)
diff --git a/parquet/src/arrow/async_reader.rs
b/parquet/src/arrow/async_reader.rs
index 541c98109..3f14114e3 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -77,7 +77,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
-use std::io::SeekFrom;
+use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
@@ -86,6 +86,7 @@ use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
+use parquet_format::PageType;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow::datatypes::SchemaRef;
@@ -96,11 +97,13 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
-use crate::column::page::{PageIterator, PageReader};
+use crate::column::page::{Page, PageIterator, PageReader};
+use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
+use crate::file::serialized_reader::{decode_page, read_page_header};
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
@@ -433,6 +436,7 @@ where
}
}
+/// An in-memory collection of column chunks
struct InMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<InMemoryColumnChunk>>,
@@ -459,6 +463,7 @@ impl RowGroupCollection for InMemoryRowGroup {
}
}
+/// Data for a single column chunk
#[derive(Clone)]
struct InMemoryColumnChunk {
num_values: i64,
@@ -480,6 +485,82 @@ impl InMemoryColumnChunk {
}
}
+// A serialized implementation for Parquet [`PageReader`].
+struct InMemoryColumnChunkReader {
+ chunk: InMemoryColumnChunk,
+ decompressor: Option<Box<dyn Codec>>,
+ offset: usize,
+ seen_num_values: i64,
+}
+
+impl InMemoryColumnChunkReader {
+ /// Creates a new serialized page reader from file source.
+ pub fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
+ let decompressor = create_codec(chunk.compression)?;
+ let result = Self {
+ chunk,
+ decompressor,
+ offset: 0,
+ seen_num_values: 0,
+ };
+ Ok(result)
+ }
+}
+
+impl Iterator for InMemoryColumnChunkReader {
+ type Item = Result<Page>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get_next_page().transpose()
+ }
+}
+
+impl PageReader for InMemoryColumnChunkReader {
+ fn get_next_page(&mut self) -> Result<Option<Page>> {
+ while self.seen_num_values < self.chunk.num_values {
+ let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
+ let page_header = read_page_header(&mut cursor)?;
+ let compressed_size = page_header.compressed_page_size as usize;
+
+ self.offset += cursor.position() as usize;
+ let start_offset = self.offset;
+ let end_offset = self.offset + compressed_size;
+ self.offset = end_offset;
+
+ let buffer = self.chunk.data.slice(start_offset..end_offset);
+
+ let result = match page_header.type_ {
+ PageType::DataPage | PageType::DataPageV2 => {
+ let decoded = decode_page(
+ page_header,
+ buffer.into(),
+ self.chunk.physical_type,
+ self.decompressor.as_mut(),
+ )?;
+ self.seen_num_values += decoded.num_values() as i64;
+ decoded
+ }
+ PageType::DictionaryPage => decode_page(
+ page_header,
+ buffer.into(),
+ self.chunk.physical_type,
+ self.decompressor.as_mut(),
+ )?,
+ _ => {
+ // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
+ continue;
+ }
+ };
+
+ return Ok(Some(result));
+ }
+
+ // We are at the end of this column chunk and no more page left.
Return None.
+ Ok(None)
+ }
+}
+
+/// Implements [`PageIterator`] for a single column chunk, yielding a single
[`PageReader`]
struct ColumnChunkIterator {
schema: SchemaDescPtr,
column_schema: ColumnDescPtr,
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 119429235..6ff73e041 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -358,6 +358,108 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for
SerializedRowGroupReader<'
}
}
+/// Reads a [`PageHeader`] from the provided [`Read`]
+pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
+ let mut prot = TCompactInputProtocol::new(input);
+ let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
+ Ok(page_header)
+}
+
+/// Decodes a [`Page`] from the provided `buffer`
+pub(crate) fn decode_page(
+ page_header: PageHeader,
+ buffer: ByteBufferPtr,
+ physical_type: Type,
+ decompressor: Option<&mut Box<dyn Codec>>,
+) -> Result<Page> {
+ // When processing data page v2, depending on enabled compression for the
+ // page, we should account for uncompressed data ('offset') of
+ // repetition and definition levels.
+ //
+ // We always use 0 offset for other pages other than v2, `true` flag means
+ // that compression will be applied if decompressor is defined
+ let mut offset: usize = 0;
+ let mut can_decompress = true;
+
+ if let Some(ref header_v2) = page_header.data_page_header_v2 {
+ offset = (header_v2.definition_levels_byte_length
+ + header_v2.repetition_levels_byte_length) as usize;
+ // When is_compressed flag is missing the page is considered compressed
+ can_decompress = header_v2.is_compressed.unwrap_or(true);
+ }
+
+ // TODO: page header could be huge because of statistics. We should set a
+ // maximum page header size and abort if that is exceeded.
+ let buffer = match decompressor {
+ Some(decompressor) if can_decompress => {
+ let uncompressed_size = page_header.uncompressed_page_size as
usize;
+ let mut decompressed = Vec::with_capacity(uncompressed_size);
+ let compressed = &buffer.as_ref()[offset..];
+ decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
+ decompressor.decompress(compressed, &mut decompressed)?;
+
+ if decompressed.len() != uncompressed_size {
+ return Err(general_err!(
+ "Actual decompressed size doesn't match the expected one
({} vs {})",
+ decompressed.len(),
+ uncompressed_size
+ ));
+ }
+
+ ByteBufferPtr::new(decompressed)
+ }
+ _ => buffer,
+ };
+
+ let result = match page_header.type_ {
+ PageType::DictionaryPage => {
+ assert!(page_header.dictionary_page_header.is_some());
+ let dict_header =
page_header.dictionary_page_header.as_ref().unwrap();
+ let is_sorted = dict_header.is_sorted.unwrap_or(false);
+ Page::DictionaryPage {
+ buf: buffer,
+ num_values: dict_header.num_values as u32,
+ encoding: Encoding::from(dict_header.encoding),
+ is_sorted,
+ }
+ }
+ PageType::DataPage => {
+ assert!(page_header.data_page_header.is_some());
+ let header = page_header.data_page_header.unwrap();
+ Page::DataPage {
+ buf: buffer,
+ num_values: header.num_values as u32,
+ encoding: Encoding::from(header.encoding),
+ def_level_encoding:
Encoding::from(header.definition_level_encoding),
+ rep_level_encoding:
Encoding::from(header.repetition_level_encoding),
+ statistics: statistics::from_thrift(physical_type,
header.statistics),
+ }
+ }
+ PageType::DataPageV2 => {
+ assert!(page_header.data_page_header_v2.is_some());
+ let header = page_header.data_page_header_v2.unwrap();
+ let is_compressed = header.is_compressed.unwrap_or(true);
+ Page::DataPageV2 {
+ buf: buffer,
+ num_values: header.num_values as u32,
+ encoding: Encoding::from(header.encoding),
+ num_nulls: header.num_nulls as u32,
+ num_rows: header.num_rows as u32,
+ def_levels_byte_len: header.definition_levels_byte_length as
u32,
+ rep_levels_byte_len: header.repetition_levels_byte_length as
u32,
+ is_compressed,
+ statistics: statistics::from_thrift(physical_type,
header.statistics),
+ }
+ }
+ _ => {
+ // For unknown page type (e.g., INDEX_PAGE), skip and read next.
+ unimplemented!("Page type {:?} is not supported",
page_header.type_)
+ }
+ };
+
+ Ok(result)
+}
+
/// A serialized implementation for Parquet [`PageReader`].
pub struct SerializedPageReader<T: Read> {
// The file source buffer which references exactly the bytes for the
column trunk
@@ -395,13 +497,6 @@ impl<T: Read> SerializedPageReader<T> {
};
Ok(result)
}
-
- /// Reads Page header from Thrift.
- fn read_page_header(&mut self) -> Result<PageHeader> {
- let mut prot = TCompactInputProtocol::new(&mut self.buf);
- let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
- Ok(page_header)
- }
}
impl<T: Read + Send> Iterator for SerializedPageReader<T> {
@@ -415,108 +510,40 @@ impl<T: Read + Send> Iterator for
SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
- let page_header = self.read_page_header()?;
-
- // When processing data page v2, depending on enabled compression
for the
- // page, we should account for uncompressed data ('offset') of
- // repetition and definition levels.
- //
- // We always use 0 offset for other pages other than v2, `true`
flag means
- // that compression will be applied if decompressor is defined
- let mut offset: usize = 0;
- let mut can_decompress = true;
-
- if let Some(ref header_v2) = page_header.data_page_header_v2 {
- offset = (header_v2.definition_levels_byte_length
- + header_v2.repetition_levels_byte_length)
- as usize;
- // When is_compressed flag is missing the page is considered
compressed
- can_decompress = header_v2.is_compressed.unwrap_or(true);
- }
-
- let compressed_len = page_header.compressed_page_size as usize -
offset;
- let uncompressed_len = page_header.uncompressed_page_size as usize
- offset;
- // We still need to read all bytes from buffered stream
- let mut buffer = vec![0; offset + compressed_len];
- self.buf.read_exact(&mut buffer)?;
-
- // TODO: page header could be huge because of statistics. We
should set a
- // maximum page header size and abort if that is exceeded.
- if let Some(decompressor) = self.decompressor.as_mut() {
- if can_decompress {
- let mut decompressed_buffer =
Vec::with_capacity(uncompressed_len);
- let decompressed_size = decompressor
- .decompress(&buffer[offset..], &mut
decompressed_buffer)?;
- if decompressed_size != uncompressed_len {
- return Err(general_err!(
- "Actual decompressed size doesn't match the expected one ({} vs
{})",
- decompressed_size,
- uncompressed_len
- ));
- }
- if offset == 0 {
- buffer = decompressed_buffer;
- } else {
- // Prepend saved offsets to the buffer
- buffer.truncate(offset);
- buffer.append(&mut decompressed_buffer);
- }
- }
+ let page_header = read_page_header(&mut self.buf)?;
+
+ let to_read = page_header.compressed_page_size as usize;
+ let mut buffer = Vec::with_capacity(to_read);
+ let read = (&mut self.buf)
+ .take(to_read as u64)
+ .read_to_end(&mut buffer)?;
+
+ if read != to_read {
+ return Err(eof_err!(
+ "Expected to read {} bytes of page, read only {}",
+ to_read,
+ read
+ ));
}
+ let buffer = ByteBufferPtr::new(buffer);
let result = match page_header.type_ {
- PageType::DictionaryPage => {
- assert!(page_header.dictionary_page_header.is_some());
- let dict_header =
- page_header.dictionary_page_header.as_ref().unwrap();
- let is_sorted = dict_header.is_sorted.unwrap_or(false);
- Page::DictionaryPage {
- buf: ByteBufferPtr::new(buffer),
- num_values: dict_header.num_values as u32,
- encoding: Encoding::from(dict_header.encoding),
- is_sorted,
- }
- }
- PageType::DataPage => {
- assert!(page_header.data_page_header.is_some());
- let header = page_header.data_page_header.unwrap();
- self.seen_num_values += header.num_values as i64;
- Page::DataPage {
- buf: ByteBufferPtr::new(buffer),
- num_values: header.num_values as u32,
- encoding: Encoding::from(header.encoding),
- def_level_encoding: Encoding::from(
- header.definition_level_encoding,
- ),
- rep_level_encoding: Encoding::from(
- header.repetition_level_encoding,
- ),
- statistics: statistics::from_thrift(
- self.physical_type,
- header.statistics,
- ),
- }
- }
- PageType::DataPageV2 => {
- assert!(page_header.data_page_header_v2.is_some());
- let header = page_header.data_page_header_v2.unwrap();
- let is_compressed = header.is_compressed.unwrap_or(true);
- self.seen_num_values += header.num_values as i64;
- Page::DataPageV2 {
- buf: ByteBufferPtr::new(buffer),
- num_values: header.num_values as u32,
- encoding: Encoding::from(header.encoding),
- num_nulls: header.num_nulls as u32,
- num_rows: header.num_rows as u32,
- def_levels_byte_len:
header.definition_levels_byte_length as u32,
- rep_levels_byte_len:
header.repetition_levels_byte_length as u32,
- is_compressed,
- statistics: statistics::from_thrift(
- self.physical_type,
- header.statistics,
- ),
- }
+ PageType::DataPage | PageType::DataPageV2 => {
+ let decoded = decode_page(
+ page_header,
+ buffer,
+ self.physical_type,
+ self.decompressor.as_mut(),
+ )?;
+ self.seen_num_values += decoded.num_values() as i64;
+ decoded
}
+ PageType::DictionaryPage => decode_page(
+ page_header,
+ buffer,
+ self.physical_type,
+ self.decompressor.as_mut(),
+ )?,
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read
next.
continue;