tustvold commented on code in PR #2473: URL: https://github.com/apache/arrow-rs/pull/2473#discussion_r947646115
########## object_store/src/local.rs: ########## @@ -1068,6 +1068,7 @@ mod tests { integration.head(&path).await.unwrap(); } + #[ignore] Review Comment: ? ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result<Option<Page>> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) } +} - fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader<Bytes>; - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; + fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { Review Comment: It is worth noting this will currently represent a performance regression, as avoided a copy - https://github.com/apache/arrow-rs/pull/2473/files#diff-f6b1a106d47a16504d4a16d57a6632872ddf596f337ac0640a13523dccc2d4d4L615 I will add a get_bytes method to ChunkReader to avoid this ########## parquet/src/arrow/arrow_reader/selection.rs: ########## @@ -423,4 +481,62 @@ mod tests { assert_eq!(a.and_then(&b), expected); } } + + #[test] + fn test_page_mask() { + let selection = RowSelection::from(vec![ + RowSelector::skip(10), + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + RowSelector::skip(5), + RowSelector::select(5), + RowSelector::skip(12), + RowSelector::select(12), + RowSelector::skip(12), + ]); + + let index = vec![ + PageLocation { + offset: 0, + compressed_page_size: 10, + first_row_index: 0, + }, + PageLocation { + offset: 10, + compressed_page_size: 10, + first_row_index: 10, + }, + PageLocation { + offset: 20, + compressed_page_size: 10, + first_row_index: 20, + }, + PageLocation { + offset: 30, + compressed_page_size: 10, + first_row_index: 30, + }, + PageLocation { + offset: 40, + compressed_page_size: 10, + first_row_index: 40, + }, + PageLocation { + offset: 50, + compressed_page_size: 10, + first_row_index: 50, + }, + PageLocation { + offset: 60, + compressed_page_size: 10, + first_row_index: 60, + }, + ]; + + let (mask, ranges) = selection.page_mask(&index); + + assert_eq!(mask, vec![false, true, true, false, true, true, false]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]); Review Comment: Could we get a test where the final PageLocation is selected? ########## parquet/src/arrow/arrow_reader/selection.rs: ########## @@ -116,6 +118,62 @@ impl RowSelection { Self { selectors } } + /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` + pub fn page_mask( + &self, + page_locations: &[PageLocation], + ) -> (Vec<bool>, Vec<Range<usize>>) { + let mut mask = vec![false; page_locations.len()]; + let mut ranges = vec![]; + let mut row_offset = 0; + + let mut pages = page_locations.iter().enumerate().peekable(); + let mut selectors = self.selectors.iter().cloned(); + let mut current_selector = selectors.next(); + let mut current_page = pages.next(); + + let mut current_page_included = false; + + while let Some((selector, (mut page_idx, page))) = + current_selector.as_mut().zip(current_page) + { + if !selector.skip && !current_page_included && !mask[page_idx] { + mask[page_idx] = true; + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + } + + if let Some((_, next_page)) = pages.peek() { + if row_offset + selector.row_count > next_page.first_row_index as usize { + let remaining_in_page = + next_page.first_row_index as usize - row_offset; + selector.row_count -= remaining_in_page; + row_offset += remaining_in_page; + current_page = pages.next(); + + continue; + } else { + if row_offset + selector.row_count + == next_page.first_row_index as usize + { + current_page = pages.next(); + } + row_offset += selector.row_count; + current_selector = selectors.next(); + } + } else { + break; + } + } + + (mask, ranges) + } + + pub fn selectors(&self) -> &[RowSelector] { Review Comment: This doesn't appear to be being used, and so I think can go. I've been trying to avoid exposing the internal layout of this type externally ########## parquet/src/arrow/arrow_reader/selection.rs: ########## @@ -116,6 +118,62 @@ impl RowSelection { Self { selectors } } + /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` + pub fn page_mask( + &self, + page_locations: &[PageLocation], + ) -> (Vec<bool>, Vec<Range<usize>>) { Review Comment: It seems strange to me that this method would return `Vec<Range<usize>>` when it is called page_mask, and the caller clearly already has `&[PageLocation]` that can easily be combined with the mask... ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, Review Comment: A comment explaining what these are would go a long way ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result<Option<Page>> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) } +} - fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader<Bytes>; - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; + fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { + match &self { + ColumnChunkData::Sparse { data, .. } => data Review Comment: As data is sorted, you could consider https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search or friends ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result<Option<Page>> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) } +} - fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader<Bytes>; - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; + fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .iter() + .find(|(offset, bytes)| { + *offset <= start as usize && (start as usize - *offset) < bytes.len() + }) + .map(|(_, bytes)| bytes.slice(0..length).reader()) Review Comment: The line above allows offset to be greater than start, but this won't return the correct slice in such a case? ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), + self.metadata.column(i), + self.row_count, + page_locations, + )?); + + Ok(Box::new(ColumnChunkIterator { + schema: self.metadata.schema_descr_ptr(), + column_schema: self.metadata.schema_descr_ptr().columns()[i].clone(), + reader: Some(Ok(page_reader)), + })) + } + } } } -/// Data for a single column chunk +/// An in-memory column chunk #[derive(Clone)] -struct InMemoryColumnChunk { - num_values: i64, - compression: Compression, - physical_type: crate::basic::Type, - data: Bytes, -} - -impl InMemoryColumnChunk { - fn pages(&self) -> Result<Box<dyn PageReader>> { - let page_reader = InMemoryColumnChunkReader::new(self.clone())?; - Ok(Box::new(page_reader)) - } -} - -// A serialized implementation for Parquet [`PageReader`]. -struct InMemoryColumnChunkReader { - chunk: InMemoryColumnChunk, - decompressor: Option<Box<dyn Codec>>, - offset: usize, - seen_num_values: i64, - // If the next page header has already been "peeked", we will cache it here - next_page_header: Option<PageHeader>, -} - -impl InMemoryColumnChunkReader { - /// Creates a new serialized page reader from file source. - fn new(chunk: InMemoryColumnChunk) -> Result<Self> { - let decompressor = create_codec(chunk.compression)?; - let result = Self { - chunk, - decompressor, - offset: 0, - seen_num_values: 0, - next_page_header: None, - }; - Ok(result) - } -} - -impl Iterator for InMemoryColumnChunkReader { - type Item = Result<Page>; - - fn next(&mut self) -> Option<Self::Item> { - self.get_next_page().transpose() - } +enum ColumnChunkData { + /// Column chunk data representing only a subset of data pages + Sparse { + /// Length of the full column chunk + length: usize, + data: Vec<(usize, Bytes)>, + }, + /// Full column chunk + Dense { offset: usize, data: Bytes }, } -impl PageReader for InMemoryColumnChunkReader { - fn get_next_page(&mut self) -> Result<Option<Page>> { - while self.seen_num_values < self.chunk.num_values { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = if let Some(page_header) = self.next_page_header.take() { - // The next page header has already been peeked, so use the cached value - page_header - } else { - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - page_header - }; - - let compressed_size = page_header.compressed_page_size as usize; - - let start_offset = self.offset; - let end_offset = self.offset + compressed_size; - self.offset = end_offset; - - let buffer = self.chunk.data.slice(start_offset..end_offset); - - let result = match page_header.type_ { - PageType::DataPage | PageType::DataPageV2 => { - let decoded = decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?; - self.seen_num_values += decoded.num_values() as i64; - decoded - } - PageType::DictionaryPage => decode_page( - page_header, - buffer.into(), - self.chunk.physical_type, - self.decompressor.as_mut(), - )?, - _ => { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - } - }; - - return Ok(Some(result)); +impl Length for ColumnChunkData { + fn len(&self) -> u64 { + match &self { + ColumnChunkData::Sparse { length, .. } => *length as u64, + ColumnChunkData::Dense { data, .. } => data.len() as u64, } - - // We are at the end of this column chunk and no more page left. Return None. - Ok(None) } +} - fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> { - while self.seen_num_values < self.chunk.num_values { - return if let Some(buffered_header) = self.next_page_header.as_ref() { - if let Ok(page_metadata) = buffered_header.try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - self.next_page_header = None; - continue; - } - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - - let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() { - Ok(Some(page_metadata)) - } else { - // For unknown page type (e.g., INDEX_PAGE), skip and read next. - continue; - }; - - self.next_page_header = Some(page_header); - page_metadata - }; - } - - Ok(None) - } +impl ChunkReader for ColumnChunkData { + type T = bytes::buf::Reader<Bytes>; - fn skip_next_page(&mut self) -> Result<()> { - if let Some(buffered_header) = self.next_page_header.take() { - // The next page header has already been peeked, so just advance the offset - self.offset += buffered_header.compressed_page_size as usize; - } else { - let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); - let page_header = read_page_header(&mut cursor)?; - self.offset += cursor.position() as usize; - self.offset += page_header.compressed_page_size as usize; + fn get_read(&self, start: u64, length: usize) -> Result<Self::T> { + match &self { + ColumnChunkData::Sparse { data, .. } => data + .iter() + .find(|(offset, bytes)| { + *offset <= start as usize && (start as usize - *offset) < bytes.len() Review Comment: Perhaps we should do an exact match? I _think_ this should work? ########## parquet/src/arrow/async_reader.rs: ########## @@ -286,7 +288,8 @@ where let meta = self.metadata.row_group(row_group_idx); let mut row_group = InMemoryRowGroup { - schema: meta.schema_descr_ptr(), + metadata: meta.clone(), Review Comment: You could add a lifetime to `InMemoryRowGroup` and borrow the metadata onto it. I don't think anything is requiring it to have a static lifetime ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; Review Comment: ```suggestion let mut page_start_offsets: Vec<Vec<usize>> = vec![]; ``` Or something to make it clearer what they are ########## parquet/src/arrow/arrow_reader/selection.rs: ########## @@ -116,6 +118,62 @@ impl RowSelection { Self { selectors } } + /// Given an offset index, return a mask indicating which pages are selected along with their locations by `self` + pub fn page_mask( Review Comment: ```suggestion pub(crate) fn page_mask( ``` I don't think this likely to be useful outside the crate ########## parquet/src/arrow/async_reader.rs: ########## @@ -482,210 +480,170 @@ impl InMemoryRowGroup { async fn fetch<T: AsyncFileReader + Send>( &mut self, input: &mut T, - metadata: &RowGroupMetaData, projection: &ProjectionMask, - _selection: Option<&RowSelection>, + selection: Option<&RowSelection>, ) -> Result<()> { - // TODO: Use OffsetIndex and selection to prune pages - - let fetch_ranges = self - .column_chunks - .iter() - .enumerate() - .into_iter() - .filter_map(|(idx, chunk)| { - (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let column = metadata.column(idx); - let (start, length) = column.byte_range(); - start as usize..(start + length) as usize + if let Some((selection, page_locations)) = + selection.zip(self.metadata.page_offset_index().as_ref()) + { + // If we have a `RowSelection` and an `OffsetIndex` then only fetch pages required for the + // `RowSelection` + let mut offsets: Vec<Vec<usize>> = vec![]; + + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let (_mask, ranges) = selection.page_mask(&page_locations[idx]); + offsets.push(ranges.iter().map(|range| range.start).collect()); + ranges + }) }) - }) - .collect(); + .flatten() + .collect(); - let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut offsets = offsets.into_iter(); - for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { - if chunk.is_some() || !projection.leaf_included(idx) { - continue; + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(page_offsets) = offsets.next() { + let mut chunks = Vec::with_capacity(page_offsets.len()); + for _ in 0..page_offsets.len() { + chunks.push(chunk_data.next().unwrap()); + } + + *chunk = Some(ColumnChunkData::Sparse { + length: self.metadata.column(idx).byte_range().1 as usize, + data: page_offsets.into_iter().zip(chunks.into_iter()).collect(), + }) + } } + } else { + let fetch_ranges = self + .column_chunks + .iter() + .enumerate() + .into_iter() + .filter_map(|(idx, chunk)| { + (chunk.is_none() && projection.leaf_included(idx)).then(|| { + let column = self.metadata.column(idx); + let (start, length) = column.byte_range(); + start as usize..(start + length) as usize + }) + }) + .collect(); - let column = metadata.column(idx); + let mut chunk_data = input.get_byte_ranges(fetch_ranges).await?.into_iter(); - if let Some(data) = chunk_data.next() { - *chunk = Some(InMemoryColumnChunk { - num_values: column.num_values(), - compression: column.compression(), - physical_type: column.column_type(), - data, - }); + for (idx, chunk) in self.column_chunks.iter_mut().enumerate() { + if chunk.is_some() || !projection.leaf_included(idx) { + continue; + } + + if let Some(data) = chunk_data.next() { + *chunk = Some(ColumnChunkData::Dense { + offset: self.metadata.column(idx).byte_range().0 as usize, + data, + }); + } } } + Ok(()) } } impl RowGroupCollection for InMemoryRowGroup { fn schema(&self) -> SchemaDescPtr { - self.schema.clone() + self.metadata.schema_descr_ptr() } fn num_rows(&self) -> usize { self.row_count } fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> { - let page_reader = self.column_chunks[i].as_ref().unwrap().pages(); - - Ok(Box::new(ColumnChunkIterator { - schema: self.schema.clone(), - column_schema: self.schema.columns()[i].clone(), - reader: Some(page_reader), - })) + match &self.column_chunks[i] { + None => Err(ParquetError::General(format!( + "Invalid column index {}, column was not fetched", + i + ))), + Some(data) => { + let page_locations = self + .metadata + .page_offset_index() + .as_ref() + .map(|index| index[i].clone()); + let page_reader: Box<dyn PageReader> = + Box::new(SerializedPageReader::new( + Arc::new(data.clone()), Review Comment: Could we store `Arc<ColumnChunkData>` and avoid this clone? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org