thinkharderdev commented on code in PR #2473: URL: https://github.com/apache/arrow-rs/pull/2473#discussion_r947766617
########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result<Option<Page>> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) } +} - fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader<Bytes>; - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; + fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .iter() + .find(|(offset, bytes)| { + *offset <= start as usize && (start as usize - *offset) < bytes.len() Review Comment: I wasn't sure whether there is ever a case in which we fetch some subset of the page. Thinking about it more I don't believe that would ever be a valid use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org