tustvold commented on code in PR #2464: URL: https://github.com/apache/arrow-rs/pull/2464#discussion_r946981025
########## parquet/src/file/serialized_reader.rs: ########## @@ -471,234 +480,232 @@ pub(crate) fn decode_page( Ok(result) } -enum SerializedPages<T: Read> { - /// Read entire chunk - Chunk { buf: T }, - /// Read operate pages which can skip. +enum SerializedPageReaderState { + Values { + /// The current byte offset in the reader + offset: usize, + + /// The length of the chunk in bytes + remaining_bytes: usize, + }, Pages { - offset_index: Vec<PageLocation>, - seen_num_data_pages: usize, - has_dictionary_page_to_read: bool, - page_bufs: VecDeque<T>, + /// Remaining page locations + page_locations: VecDeque<PageLocation>, + /// Remaining dictionary location if any + dictionary_page: Option<PageLocation>, + /// The total number of rows in this column chunk + total_rows: usize, }, } /// A serialized implementation for Parquet [`PageReader`]. -pub struct SerializedPageReader<T: Read> { - // The file source buffer which references exactly the bytes for the column trunk - // to be read by this page reader. - buf: SerializedPages<T>, +pub struct SerializedPageReader<R: ChunkReader> { + /// The chunk reader + reader: Arc<R>, - // The compression codec for this column chunk. Only set for non-PLAIN codec. + /// The compression codec for this column chunk. Only set for non-PLAIN codec. decompressor: Option<Box<dyn Codec>>, - // The number of values we have seen so far. - seen_num_values: i64, - - // The number of total values in this column chunk. - total_num_values: i64, - - // Column chunk type. + /// Column chunk type. physical_type: Type, + + state: SerializedPageReaderState, } -impl<T: Read> SerializedPageReader<T> { - /// Creates a new serialized page reader from file source. +impl<R: ChunkReader> SerializedPageReader<R> { + /// Creates a new serialized page reader from a chunk reader and metadata pub fn new( - buf: T, - total_num_values: i64, - compression: Compression, - physical_type: Type, + reader: Arc<R>, + meta: &ColumnChunkMetaData, + total_rows: usize, + page_locations: Option<Vec<PageLocation>>, ) -> Result<Self> { - let decompressor = create_codec(compression)?; - let result = Self { - buf: SerializedPages::Chunk { buf }, - total_num_values, - seen_num_values: 0, - decompressor, - physical_type, - }; - Ok(result) - } + let decompressor = create_codec(meta.compression())?; + let (start, len) = meta.byte_range(); + + let state = match page_locations { + Some(locations) => { + let dictionary_page = match locations.first() { + Some(dict_offset) if dict_offset.offset as u64 != start => { + Some(PageLocation { + offset: start as i64, + compressed_page_size: (dict_offset.offset as u64 - start) + as i32, + first_row_index: 0, + }) + } + _ => None, + }; - /// Creates a new serialized page reader from file source. - pub fn new_with_page_offsets( - total_num_values: i64, - compression: Compression, - physical_type: Type, - offset_index: Vec<PageLocation>, - has_dictionary_page_to_read: bool, - page_bufs: VecDeque<T>, - ) -> Result<Self> { - let decompressor = create_codec(compression)?; - let result = Self { - buf: SerializedPages::Pages { - offset_index, - seen_num_data_pages: 0, - has_dictionary_page_to_read, - page_bufs, + SerializedPageReaderState::Pages { + page_locations: locations.into(), + dictionary_page, + total_rows, + } + } + None => SerializedPageReaderState::Values { + offset: start as usize, + remaining_bytes: len as usize, }, - total_num_values, - seen_num_values: 0, - decompressor, - physical_type, }; - Ok(result) + + Ok(Self { + reader, + decompressor, + state, + physical_type: meta.column_type(), + }) } } -impl<T: Read + Send> Iterator for SerializedPageReader<T> { +impl<R: ChunkReader> Iterator for SerializedPageReader<R> { type Item = Result<Page>; fn next(&mut self) -> Option<Self::Item> { self.get_next_page().transpose() } } -impl<T: Read + Send> PageReader for SerializedPageReader<T> { +impl<R: ChunkReader> PageReader for SerializedPageReader<R> { fn get_next_page(&mut self) -> Result<Option<Page>> { - let mut cursor; - let mut dictionary_cursor; - while self.seen_num_values < self.total_num_values { - match &mut self.buf { - SerializedPages::Chunk { buf } => { - cursor = buf; - } - SerializedPages::Pages { - offset_index, - seen_num_data_pages, - has_dictionary_page_to_read, - page_bufs, + loop { + let page = match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes: remaining, + .. } => { - if offset_index.len() <= *seen_num_data_pages { + if *remaining == 0 { return Ok(None); - } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read { - dictionary_cursor = page_bufs.pop_front().unwrap(); - cursor = &mut dictionary_cursor; - } else { - cursor = page_bufs.get_mut(*seen_num_data_pages).unwrap(); } - } - } - let page_header = read_page_header(cursor)?; + let mut read = self.reader.get_read(*offset as u64, *remaining)?; - let to_read = page_header.compressed_page_size as usize; - let mut buffer = Vec::with_capacity(to_read); - let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?; + let (header_len, header) = read_page_header_len(&mut read)?; + let data_len = header.compressed_page_size as usize; + *offset += header_len + data_len; + *remaining -= header_len + data_len; - if read != to_read { - return Err(eof_err!( - "Expected to read {} bytes of page, read only {}", - to_read, - read - )); - } + if header.type_ == PageType::IndexPage { + continue; + } + + let mut buffer = Vec::with_capacity(data_len); + let read = read.take(data_len as u64).read_to_end(&mut buffer)?; - let buffer = ByteBufferPtr::new(buffer); - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer, + if read != data_len { + return Err(eof_err!( + "Expected to read {} bytes of page, read only {}", + data_len, + read + )); + } + + decode_page( + header, + ByteBufferPtr::new(buffer), self.physical_type, self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - if let SerializedPages::Pages { - seen_num_data_pages, - .. - } = &mut self.buf - { - *seen_num_data_pages += 1; - } - decoded + )? } - PageType::DictionaryPage => { - if let SerializedPages::Pages { - has_dictionary_page_to_read, - .. - } = &mut self.buf + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + .. + } => { + let front = match dictionary_page + .take() + .or_else(|| page_locations.pop_front()) { - *has_dictionary_page_to_read = false; + Some(front) => front, + None => return Ok(None), + }; + + let page_len = front.compressed_page_size as usize; + + // TODO: Add ChunkReader get_bytes to potentially avoid copy + let mut buffer = Vec::with_capacity(page_len); + let read = self + .reader + .get_read(front.offset as u64, page_len)? + .read_to_end(&mut buffer)?; + + if read != page_len { + return Err(eof_err!( + "Expected to read {} bytes of page, read only {}", + page_len, + read + )); } + + let mut cursor = Cursor::new(buffer); + let header = read_page_header(&mut cursor)?; + let offset = cursor.position(); + + let bytes = Bytes::from(cursor.into_inner()).slice(offset as usize..); decode_page( - page_header, - buffer, + header, + bytes.into(), self.physical_type, self.decompressor.as_mut(), )? } - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } }; - return Ok(Some(result)); - } - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) + return Ok(Some(page)); + } } fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - match &mut self.buf { - SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")) } - SerializedPages::Pages { offset_index, seen_num_data_pages, has_dictionary_page_to_read, .. } => { - if *seen_num_data_pages >= offset_index.len() { - Ok(None) - } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read { - // Will set `has_dictionary_page_to_read` false in `get_next_page`, - // assume dictionary page must be read and cannot be skipped. - Ok(Some(PageMetadata { - num_rows: usize::MIN, - is_dict: true, + match &self.state { + SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")), + SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => { + if dictionary_page.is_some() { + Ok(Some(PageMetadata{ + num_rows: 0, + is_dict: true })) - } else { - let row_count = calculate_row_count( - offset_index, - *seen_num_data_pages, - self.total_num_values, - )?; - Ok(Some(PageMetadata { - num_rows: row_count, - is_dict: false, + } else if let Some(page) = page_locations.front() { + let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows); + + Ok(Some(PageMetadata{ + num_rows: next_rows - page.first_row_index as usize, + is_dict: false })) + } else { + Ok(None) } } } } fn skip_next_page(&mut self) -> Result<()> { - match &mut self.buf { - SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) } - SerializedPages::Pages { offset_index, seen_num_data_pages, .. } => { - if offset_index.len() <= *seen_num_data_pages { - Err(general_err!( - "seen_num_data_pages is out of bound in SerializedPageReader." - )) - } else { - *seen_num_data_pages += 1; - // Notice: maybe need 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page. - Ok(()) - } + match &mut self.state { + SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, Review Comment: It should now be possible to read the header, and then skip by just incrementing the offset @Ted-Jiang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org