This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 63ab69e5b Push ChunkReader into SerializedPageReader (#2463) (#2464)
63ab69e5b is described below
commit 63ab69e5bf5b97a7ff9ae58a6f20f6065a1ab932
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Aug 17 10:01:33 2022 +0100
Push ChunkReader into SerializedPageReader (#2463) (#2464)
---
parquet/src/column/writer/mod.rs | 46 ++--
parquet/src/file/reader.rs | 2 +-
parquet/src/file/serialized_reader.rs | 383 +++++++++++++++++-----------------
parquet/src/file/writer.rs | 25 ++-
parquet/src/util/mod.rs | 1 -
parquet/src/util/page_util.rs | 96 ---------
6 files changed, 238 insertions(+), 315 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 669cacee6..c7518c89e 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1082,6 +1082,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
#[cfg(test)]
mod tests {
+ use bytes::Bytes;
use parquet_format::BoundaryOrder;
use rand::distributions::uniform::SampleUniform;
use std::sync::Arc;
@@ -1096,7 +1097,7 @@ mod tests {
writer::SerializedPageWriter,
};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as
SchemaType};
- use crate::util::{io::FileSource,
test_common::rand_gen::random_numbers_range};
+ use crate::util::test_common::rand_gen::random_numbers_range;
use super::*;
@@ -1645,7 +1646,7 @@ mod tests {
)
.unwrap();
- let (_, _, metadata, _, _) = writer.close().unwrap();
+ let (_, rows_written, metadata, _, _) = writer.close().unwrap();
let stats = metadata.statistics().unwrap();
assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
@@ -1654,10 +1655,10 @@ mod tests {
assert!(stats.distinct_count().is_none());
let reader = SerializedPageReader::new(
- std::io::Cursor::new(buf),
- 7,
- Compression::UNCOMPRESSED,
- Type::INT32,
+ Arc::new(Bytes::from(buf)),
+ &metadata,
+ rows_written as usize,
+ None,
)
.unwrap();
@@ -1690,14 +1691,14 @@ mod tests {
.write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
.unwrap();
- let (_, _, metadata, _, _) = writer.close().unwrap();
+ let (_, rows_written, metadata, _, _) = writer.close().unwrap();
assert!(metadata.statistics().is_none());
let reader = SerializedPageReader::new(
- std::io::Cursor::new(buf),
- 6,
- Compression::UNCOMPRESSED,
- Type::INT32,
+ Arc::new(Bytes::from(buf)),
+ &metadata,
+ rows_written as usize,
+ None,
)
.unwrap();
@@ -1818,16 +1819,15 @@ mod tests {
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0,
0, props);
writer.write_batch(data, None, None).unwrap();
- let (bytes_written, _, _, _, _) = writer.close().unwrap();
+ let (_, rows_written, metadata, _, _) = writer.close().unwrap();
// Read pages and check the sequence
- let source = FileSource::new(&file, 0, bytes_written as usize);
let mut page_reader = Box::new(
SerializedPageReader::new(
- source,
- data.len() as i64,
- Compression::UNCOMPRESSED,
- Int32Type::get_physical_type(),
+ Arc::new(file),
+ &metadata,
+ rows_written as usize,
+ None,
)
.unwrap(),
);
@@ -2201,16 +2201,14 @@ mod tests {
let values_written = writer.write_batch(values, def_levels,
rep_levels).unwrap();
assert_eq!(values_written, values.len());
- let (bytes_written, rows_written, column_metadata, _, _) =
- writer.close().unwrap();
+ let (_, rows_written, column_metadata, _, _) = writer.close().unwrap();
- let source = FileSource::new(&file, 0, bytes_written as usize);
let page_reader = Box::new(
SerializedPageReader::new(
- source,
- column_metadata.num_values(),
- column_metadata.compression(),
- T::get_physical_type(),
+ Arc::new(file),
+ &column_metadata,
+ rows_written as usize,
+ None,
)
.unwrap(),
);
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index d75227365..2d7c6c5e4 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -45,7 +45,7 @@ pub trait Length {
/// For an object store reader, each read can be mapped to a range request.
pub trait ChunkReader: Length + Send + Sync {
type T: Read + Send;
- /// get a serialy readeable slice of the current reader
+ /// Get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
}
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index e8ef025ad..1a6a9026d 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -18,14 +18,15 @@
//! Contains implementations of the reader traits FileReader, RowGroupReader
and PageReader
//! Also contains implementations of the ChunkReader for files (with
buffering) and byte arrays (RAM)
-use bytes::{Buf, Bytes};
use std::collections::VecDeque;
+use std::io::Cursor;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
+use bytes::{Buf, Bytes};
use parquet_format::{PageHeader, PageLocation, PageType};
use thrift::protocol::TCompactInputProtocol;
-use crate::basic::{Compression, Encoding, Type};
+use crate::basic::{Encoding, Type};
use crate::column::page::{Page, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
@@ -34,9 +35,7 @@ use crate::file::{footer, metadata::*, reader::*, statistics};
use crate::record::reader::RowIter;
use crate::record::Row;
use crate::schema::types::Type as SchemaType;
-use crate::util::page_util::{calculate_row_count, get_pages_readable_slices};
use crate::util::{io::TryClone, memory::ByteBufferPtr};
-
// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
pub use crate::util::io::FileSource;
@@ -335,33 +334,19 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for
SerializedRowGroupReader<'
// TODO: fix PARQUET-816
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
- let (col_start, col_length) = col.byte_range();
- let page_reader = if let Some(offset_index) =
self.metadata.page_offset_index() {
- let col_chunk_offset_index = &offset_index[i];
- let (page_bufs, has_dict) = get_pages_readable_slices(
- col_chunk_offset_index,
- col_start,
- self.chunk_reader.clone(),
- )?;
- SerializedPageReader::new_with_page_offsets(
- col.num_values(),
- col.compression(),
- col.column_descr().physical_type(),
- col_chunk_offset_index.clone(),
- has_dict,
- page_bufs,
- )?
- } else {
- let file_chunk =
- self.chunk_reader.get_read(col_start, col_length as usize)?;
- SerializedPageReader::new(
- file_chunk,
- col.num_values(),
- col.compression(),
- col.column_descr().physical_type(),
- )?
- };
- Ok(Box::new(page_reader))
+
+ let page_locations = self
+ .metadata
+ .page_offset_index()
+ .as_ref()
+ .map(|x| x[i].clone());
+
+ Ok(Box::new(SerializedPageReader::new(
+ Arc::clone(&self.chunk_reader),
+ col,
+ self.metadata.num_rows() as usize,
+ page_locations,
+ )?))
}
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
@@ -376,6 +361,30 @@ pub(crate) fn read_page_header<T: Read>(input: &mut T) ->
Result<PageHeader> {
Ok(page_header)
}
+/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of
bytes read
+fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)>
{
+ /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
+ struct TrackedRead<R> {
+ inner: R,
+ bytes_read: usize,
+ }
+
+ impl<R: Read> Read for TrackedRead<R> {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ let v = self.inner.read(buf)?;
+ self.bytes_read += v;
+ Ok(v)
+ }
+ }
+
+ let mut tracked = TrackedRead {
+ inner: input,
+ bytes_read: 0,
+ };
+ let header = read_page_header(&mut tracked)?;
+ Ok((tracked.bytes_read, header))
+}
+
/// Decodes a [`Page`] from the provided `buffer`
pub(crate) fn decode_page(
page_header: PageHeader,
@@ -471,83 +480,85 @@ pub(crate) fn decode_page(
Ok(result)
}
-enum SerializedPages<T: Read> {
- /// Read entire chunk
- Chunk { buf: T },
- /// Read operate pages which can skip.
+enum SerializedPageReaderState {
+ Values {
+ /// The current byte offset in the reader
+ offset: usize,
+
+ /// The length of the chunk in bytes
+ remaining_bytes: usize,
+ },
Pages {
- offset_index: Vec<PageLocation>,
- seen_num_data_pages: usize,
- has_dictionary_page_to_read: bool,
- page_bufs: VecDeque<T>,
+ /// Remaining page locations
+ page_locations: VecDeque<PageLocation>,
+ /// Remaining dictionary location if any
+ dictionary_page: Option<PageLocation>,
+ /// The total number of rows in this column chunk
+ total_rows: usize,
},
}
/// A serialized implementation for Parquet [`PageReader`].
-pub struct SerializedPageReader<T: Read> {
- // The file source buffer which references exactly the bytes for the
column trunk
- // to be read by this page reader.
- buf: SerializedPages<T>,
+pub struct SerializedPageReader<R: ChunkReader> {
+ /// The chunk reader
+ reader: Arc<R>,
- // The compression codec for this column chunk. Only set for non-PLAIN
codec.
+ /// The compression codec for this column chunk. Only set for non-PLAIN
codec.
decompressor: Option<Box<dyn Codec>>,
- // The number of values we have seen so far.
- seen_num_values: i64,
-
- // The number of total values in this column chunk.
- total_num_values: i64,
-
- // Column chunk type.
+ /// Column chunk type.
physical_type: Type,
+
+ state: SerializedPageReaderState,
}
-impl<T: Read> SerializedPageReader<T> {
- /// Creates a new serialized page reader from file source.
+impl<R: ChunkReader> SerializedPageReader<R> {
+ /// Creates a new serialized page reader from a chunk reader and metadata
pub fn new(
- buf: T,
- total_num_values: i64,
- compression: Compression,
- physical_type: Type,
+ reader: Arc<R>,
+ meta: &ColumnChunkMetaData,
+ total_rows: usize,
+ page_locations: Option<Vec<PageLocation>>,
) -> Result<Self> {
- let decompressor = create_codec(compression)?;
- let result = Self {
- buf: SerializedPages::Chunk { buf },
- total_num_values,
- seen_num_values: 0,
- decompressor,
- physical_type,
- };
- Ok(result)
- }
+ let decompressor = create_codec(meta.compression())?;
+ let (start, len) = meta.byte_range();
+
+ let state = match page_locations {
+ Some(locations) => {
+ let dictionary_page = match locations.first() {
+ Some(dict_offset) if dict_offset.offset as u64 != start =>
{
+ Some(PageLocation {
+ offset: start as i64,
+ compressed_page_size: (dict_offset.offset as u64 -
start)
+ as i32,
+ first_row_index: 0,
+ })
+ }
+ _ => None,
+ };
- /// Creates a new serialized page reader from file source.
- pub fn new_with_page_offsets(
- total_num_values: i64,
- compression: Compression,
- physical_type: Type,
- offset_index: Vec<PageLocation>,
- has_dictionary_page_to_read: bool,
- page_bufs: VecDeque<T>,
- ) -> Result<Self> {
- let decompressor = create_codec(compression)?;
- let result = Self {
- buf: SerializedPages::Pages {
- offset_index,
- seen_num_data_pages: 0,
- has_dictionary_page_to_read,
- page_bufs,
+ SerializedPageReaderState::Pages {
+ page_locations: locations.into(),
+ dictionary_page,
+ total_rows,
+ }
+ }
+ None => SerializedPageReaderState::Values {
+ offset: start as usize,
+ remaining_bytes: len as usize,
},
- total_num_values,
- seen_num_values: 0,
- decompressor,
- physical_type,
};
- Ok(result)
+
+ Ok(Self {
+ reader,
+ decompressor,
+ state,
+ physical_type: meta.column_type(),
+ })
}
}
-impl<T: Read + Send> Iterator for SerializedPageReader<T> {
+impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
type Item = Result<Page>;
fn next(&mut self) -> Option<Self::Item> {
@@ -555,133 +566,126 @@ impl<T: Read + Send> Iterator for
SerializedPageReader<T> {
}
}
-impl<T: Read + Send> PageReader for SerializedPageReader<T> {
+impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
- let mut cursor;
- let mut dictionary_cursor;
- while self.seen_num_values < self.total_num_values {
- match &mut self.buf {
- SerializedPages::Chunk { buf } => {
- cursor = buf;
- }
- SerializedPages::Pages {
- offset_index,
- seen_num_data_pages,
- has_dictionary_page_to_read,
- page_bufs,
+ loop {
+ let page = match &mut self.state {
+ SerializedPageReaderState::Values {
+ offset,
+ remaining_bytes: remaining,
+ ..
} => {
- if offset_index.len() <= *seen_num_data_pages {
+ if *remaining == 0 {
return Ok(None);
- } else if *seen_num_data_pages == 0 &&
*has_dictionary_page_to_read {
- dictionary_cursor = page_bufs.pop_front().unwrap();
- cursor = &mut dictionary_cursor;
- } else {
- cursor =
page_bufs.get_mut(*seen_num_data_pages).unwrap();
}
- }
- }
- let page_header = read_page_header(cursor)?;
+ let mut read = self.reader.get_read(*offset as u64,
*remaining)?;
- let to_read = page_header.compressed_page_size as usize;
- let mut buffer = Vec::with_capacity(to_read);
- let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?;
+ let (header_len, header) = read_page_header_len(&mut
read)?;
+ let data_len = header.compressed_page_size as usize;
+ *offset += header_len + data_len;
+ *remaining -= header_len + data_len;
- if read != to_read {
- return Err(eof_err!(
- "Expected to read {} bytes of page, read only {}",
- to_read,
- read
- ));
- }
+ if header.type_ == PageType::IndexPage {
+ continue;
+ }
+
+ let mut buffer = Vec::with_capacity(data_len);
+ let read = read.take(data_len as u64).read_to_end(&mut
buffer)?;
- let buffer = ByteBufferPtr::new(buffer);
- let result = match page_header.type_ {
- PageType::DataPage | PageType::DataPageV2 => {
- let decoded = decode_page(
- page_header,
- buffer,
+ if read != data_len {
+ return Err(eof_err!(
+ "Expected to read {} bytes of page, read only {}",
+ data_len,
+ read
+ ));
+ }
+
+ decode_page(
+ header,
+ ByteBufferPtr::new(buffer),
self.physical_type,
self.decompressor.as_mut(),
- )?;
- self.seen_num_values += decoded.num_values() as i64;
- if let SerializedPages::Pages {
- seen_num_data_pages,
- ..
- } = &mut self.buf
- {
- *seen_num_data_pages += 1;
- }
- decoded
+ )?
}
- PageType::DictionaryPage => {
- if let SerializedPages::Pages {
- has_dictionary_page_to_read,
- ..
- } = &mut self.buf
+ SerializedPageReaderState::Pages {
+ page_locations,
+ dictionary_page,
+ ..
+ } => {
+ let front = match dictionary_page
+ .take()
+ .or_else(|| page_locations.pop_front())
{
- *has_dictionary_page_to_read = false;
+ Some(front) => front,
+ None => return Ok(None),
+ };
+
+ let page_len = front.compressed_page_size as usize;
+
+ // TODO: Add ChunkReader get_bytes to potentially avoid
copy
+ let mut buffer = Vec::with_capacity(page_len);
+ let read = self
+ .reader
+ .get_read(front.offset as u64, page_len)?
+ .read_to_end(&mut buffer)?;
+
+ if read != page_len {
+ return Err(eof_err!(
+ "Expected to read {} bytes of page, read only {}",
+ page_len,
+ read
+ ));
}
+
+ let mut cursor = Cursor::new(buffer);
+ let header = read_page_header(&mut cursor)?;
+ let offset = cursor.position();
+
+ let bytes = Bytes::from(cursor.into_inner()).slice(offset
as usize..);
decode_page(
- page_header,
- buffer,
+ header,
+ bytes.into(),
self.physical_type,
self.decompressor.as_mut(),
)?
}
- _ => {
- // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
- continue;
- }
};
- return Ok(Some(result));
- }
- // We are at the end of this column chunk and no more page left.
Return None.
- Ok(None)
+ return Ok(Some(page));
+ }
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
- match &mut self.buf {
- SerializedPages::Chunk { .. } => { Err(general_err!("Must set
page_offset_index when using peek_next_page in SerializedPageReader.")) }
- SerializedPages::Pages { offset_index, seen_num_data_pages,
has_dictionary_page_to_read, .. } => {
- if *seen_num_data_pages >= offset_index.len() {
- Ok(None)
- } else if *seen_num_data_pages == 0 &&
*has_dictionary_page_to_read {
- // Will set `has_dictionary_page_to_read` false in
`get_next_page`,
- // assume dictionary page must be read and cannot be
skipped.
- Ok(Some(PageMetadata {
- num_rows: usize::MIN,
- is_dict: true,
+ match &self.state {
+ SerializedPageReaderState::Values {..} => Err(general_err!("Must
set page_offset_index when using peek_next_page in SerializedPageReader.")),
+ SerializedPageReaderState::Pages { page_locations,
dictionary_page, total_rows } => {
+ if dictionary_page.is_some() {
+ Ok(Some(PageMetadata{
+ num_rows: 0,
+ is_dict: true
}))
- } else {
- let row_count = calculate_row_count(
- offset_index,
- *seen_num_data_pages,
- self.total_num_values,
- )?;
- Ok(Some(PageMetadata {
- num_rows: row_count,
- is_dict: false,
+ } else if let Some(page) = page_locations.front() {
+ let next_rows = page_locations.get(1).map(|x|
x.first_row_index as usize).unwrap_or(*total_rows);
+
+ Ok(Some(PageMetadata{
+ num_rows: next_rows - page.first_row_index as usize,
+ is_dict: false
}))
+ } else {
+ Ok(None)
}
}
}
}
fn skip_next_page(&mut self) -> Result<()> {
- match &mut self.buf {
- SerializedPages::Chunk { .. } => { Err(general_err!("Must set
page_offset_index when using skip_next_page in SerializedPageReader.")) }
- SerializedPages::Pages { offset_index, seen_num_data_pages, .. }
=> {
- if offset_index.len() <= *seen_num_data_pages {
- Err(general_err!(
- "seen_num_data_pages is out of bound in
SerializedPageReader."
- ))
- } else {
- *seen_num_data_pages += 1;
- // Notice: maybe need 'self.seen_num_values += xxx', for
now we can not get skip values in skip_next_page.
- Ok(())
- }
+ match &mut self.state {
+ SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must
set page_offset_index when using skip_next_page in SerializedPageReader.")) },
+ SerializedPageReaderState::Pages { page_locations, .. } => {
+ page_locations.pop_front();
+
+ Ok(())
}
}
}
@@ -689,7 +693,10 @@ impl<T: Read + Send> PageReader for
SerializedPageReader<T> {
#[cfg(test)]
mod tests {
- use super::*;
+ use std::sync::Arc;
+
+ use parquet_format::BoundaryOrder;
+
use crate::basic::{self, ColumnOrder};
use crate::data_type::private::ParquetValueType;
use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
@@ -697,8 +704,8 @@ mod tests {
use crate::schema::parser::parse_message_type;
use crate::util::bit_util::from_le_slice;
use crate::util::test_common::file_util::{get_test_file, get_test_path};
- use parquet_format::BoundaryOrder;
- use std::sync::Arc;
+
+ use super::*;
#[test]
fn test_cursor_and_file_has_the_same_behaviour() {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 863ccf854..d3d1f8809 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -663,7 +663,7 @@ mod tests {
use super::*;
use bytes::Bytes;
- use std::{fs::File, io::Cursor};
+ use std::fs::File;
use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
use crate::column::page::PageReader;
@@ -675,6 +675,7 @@ mod tests {
statistics::{from_thrift, to_thrift, Statistics},
};
use crate::record::RowAccessor;
+ use crate::schema::types::{ColumnDescriptor, ColumnPath};
use crate::util::memory::ByteBufferPtr;
#[test]
@@ -1062,11 +1063,25 @@ mod tests {
page_writer.close().unwrap();
}
{
+ let reader = bytes::Bytes::from(buffer);
+
+ let t = types::Type::primitive_type_builder("t", physical_type)
+ .build()
+ .unwrap();
+
+ let desc = ColumnDescriptor::new(Arc::new(t), 0, 0,
ColumnPath::new(vec![]));
+ let meta = ColumnChunkMetaData::builder(Arc::new(desc))
+ .set_compression(codec)
+ .set_total_compressed_size(reader.len() as i64)
+ .set_num_values(total_num_values)
+ .build()
+ .unwrap();
+
let mut page_reader = SerializedPageReader::new(
- Cursor::new(&buffer),
- total_num_values,
- codec,
- physical_type,
+ Arc::new(reader),
+ &meta,
+ total_num_values as usize,
+ None,
)
.unwrap();
diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs
index d8ad739db..5f4302394 100644
--- a/parquet/src/util/mod.rs
+++ b/parquet/src/util/mod.rs
@@ -21,7 +21,6 @@ pub mod memory;
pub mod bit_util;
mod bit_pack;
pub(crate) mod interner;
-pub(crate) mod page_util;
#[cfg(any(test, feature = "test_common"))]
pub(crate) mod test_common;
diff --git a/parquet/src/util/page_util.rs b/parquet/src/util/page_util.rs
deleted file mode 100644
index 7716b7116..000000000
--- a/parquet/src/util/page_util.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::VecDeque;
-use std::io::Read;
-use std::sync::Arc;
-use crate::errors::Result;
-use parquet_format::PageLocation;
-use crate::file::reader::ChunkReader;
-
-/// Use column chunk's offset index to get the `page_num` page row count.
-pub(crate) fn calculate_row_count(indexes: &[PageLocation], page_num: usize,
total_row_count: i64) -> Result<usize> {
- if page_num == indexes.len() - 1 {
- // first_row_index start with 0, so no need to plus one additional.
- Ok((total_row_count - indexes[page_num].first_row_index) as usize)
- } else {
- Ok((indexes[page_num + 1].first_row_index -
indexes[page_num].first_row_index) as usize)
- }
-}
-
-/// Use column chunk's offset index to get each page serially readable slice
-/// and a flag indicates whether having one dictionary page in this column
chunk.
-pub(crate) fn get_pages_readable_slices<T: Read + Send, R:
ChunkReader<T=T>>(col_chunk_offset_index: &[PageLocation], col_start: u64,
chunk_reader: Arc<R>) -> Result<(VecDeque<T>, bool)> {
- let first_data_page_offset = col_chunk_offset_index[0].offset as u64;
- let has_dictionary_page = first_data_page_offset != col_start;
- let mut page_readers =
VecDeque::with_capacity(col_chunk_offset_index.len() + 1);
-
- if has_dictionary_page {
- let length = (first_data_page_offset - col_start) as usize;
- let reader: T = chunk_reader.get_read(col_start, length)?;
- page_readers.push_back(reader);
- }
-
- for index in col_chunk_offset_index {
- let start = index.offset as u64;
- let length = index.compressed_page_size as usize;
- let reader: T = chunk_reader.get_read(start, length)?;
- page_readers.push_back(reader)
- }
- Ok((page_readers, has_dictionary_page))
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- /**
- parquet-tools meta ./test.parquet got:
-
- file schema: test_schema
-
--------------------------------------------------------------------------------
- leaf: REQUIRED INT64 R:0 D:
-
- row group 1: RC:256 TS:2216 OFFSET:4
-
--------------------------------------------------------------------------------
- leaf: INT64 UNCOMPRESSED DO:0 FPO:4 SZ:2216/2216/1.00 VC:256
ENC:PLAIN,RLE ST:[min: 0, max: 255, num_nulls not defined
-
- parquet-tools column-index -c leaf ./test.parquet got:
-
- offset index for column leaf:
- offset compressed size first row index
- page-0 4 554 0
- page-1 558 554
64
- page-2 1112 554
128
- page-3 1666 554
192
-
- **/
- #[test]
- fn test_calculate_row_count() {
- let total_row_count = 256;
- let mut indexes = vec![];
- indexes.push(PageLocation::new(4, 554, 0));
- indexes.push(PageLocation::new(558, 554, 64));
- indexes.push(PageLocation::new(1112, 554, 128));
- indexes.push(PageLocation::new(1666, 554, 192));
- for i in 0..4 {
- // each page should has 64 rows.
- assert_eq!(64, calculate_row_count(indexes.as_slice(), i,
total_row_count).unwrap());
- }
-
- }
-}