tustvold commented on code in PR #2473:
URL: https://github.com/apache/arrow-rs/pull/2473#discussion_r947646115


##########
object_store/src/local.rs:
##########
@@ -1068,6 +1068,7 @@ mod tests {
         integration.head(&path).await.unwrap();
     }
 
+    #[ignore]

Review Comment:
   ?



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
     }
+}
 
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
-    }
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {

Review Comment:
   It is worth noting this will currently represent a performance regression, 
as avoided a copy - 
https://github.com/apache/arrow-rs/pull/2473/files#diff-f6b1a106d47a16504d4a16d57a6632872ddf596f337ac0640a13523dccc2d4d4L615
   
   I will add a get_bytes method to ChunkReader to avoid this



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -423,4 +481,62 @@ mod tests {
             assert_eq!(a.and_then(&b), expected);
         }
     }
+
+    #[test]
+    fn test_page_mask() {
+        let selection = RowSelection::from(vec![
+            RowSelector::skip(10),
+            RowSelector::select(3),
+            RowSelector::skip(3),
+            RowSelector::select(4),
+            RowSelector::skip(5),
+            RowSelector::select(5),
+            RowSelector::skip(12),
+            RowSelector::select(12),
+            RowSelector::skip(12),
+        ]);
+
+        let index = vec![
+            PageLocation {
+                offset: 0,
+                compressed_page_size: 10,
+                first_row_index: 0,
+            },
+            PageLocation {
+                offset: 10,
+                compressed_page_size: 10,
+                first_row_index: 10,
+            },
+            PageLocation {
+                offset: 20,
+                compressed_page_size: 10,
+                first_row_index: 20,
+            },
+            PageLocation {
+                offset: 30,
+                compressed_page_size: 10,
+                first_row_index: 30,
+            },
+            PageLocation {
+                offset: 40,
+                compressed_page_size: 10,
+                first_row_index: 40,
+            },
+            PageLocation {
+                offset: 50,
+                compressed_page_size: 10,
+                first_row_index: 50,
+            },
+            PageLocation {
+                offset: 60,
+                compressed_page_size: 10,
+                first_row_index: 60,
+            },
+        ];
+
+        let (mask, ranges) = selection.page_mask(&index);
+
+        assert_eq!(mask, vec![false, true, true, false, true, true, false]);
+        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);

Review Comment:
   Could we get a test where the final PageLocation is selected?



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -116,6 +118,62 @@ impl RowSelection {
         Self { selectors }
     }
 
+    /// Given an offset index, return a mask indicating which pages are 
selected along with their locations by `self`
+    pub fn page_mask(
+        &self,
+        page_locations: &[PageLocation],
+    ) -> (Vec<bool>, Vec<Range<usize>>) {
+        let mut mask = vec![false; page_locations.len()];
+        let mut ranges = vec![];
+        let mut row_offset = 0;
+
+        let mut pages = page_locations.iter().enumerate().peekable();
+        let mut selectors = self.selectors.iter().cloned();
+        let mut current_selector = selectors.next();
+        let mut current_page = pages.next();
+
+        let mut current_page_included = false;
+
+        while let Some((selector, (mut page_idx, page))) =
+            current_selector.as_mut().zip(current_page)
+        {
+            if !selector.skip && !current_page_included && !mask[page_idx] {
+                mask[page_idx] = true;
+                let start = page.offset as usize;
+                let end = start + page.compressed_page_size as usize;
+                ranges.push(start..end);
+            }
+
+            if let Some((_, next_page)) = pages.peek() {
+                if row_offset + selector.row_count > next_page.first_row_index 
as usize {
+                    let remaining_in_page =
+                        next_page.first_row_index as usize - row_offset;
+                    selector.row_count -= remaining_in_page;
+                    row_offset += remaining_in_page;
+                    current_page = pages.next();
+
+                    continue;
+                } else {
+                    if row_offset + selector.row_count
+                        == next_page.first_row_index as usize
+                    {
+                        current_page = pages.next();
+                    }
+                    row_offset += selector.row_count;
+                    current_selector = selectors.next();
+                }
+            } else {
+                break;
+            }
+        }
+
+        (mask, ranges)
+    }
+
+    pub fn selectors(&self) -> &[RowSelector] {

Review Comment:
   This doesn't appear to be being used, and so I think can go. I've been 
trying to avoid exposing the internal layout of this type externally



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -116,6 +118,62 @@ impl RowSelection {
         Self { selectors }
     }
 
+    /// Given an offset index, return a mask indicating which pages are 
selected along with their locations by `self`
+    pub fn page_mask(
+        &self,
+        page_locations: &[PageLocation],
+    ) -> (Vec<bool>, Vec<Range<usize>>) {

Review Comment:
   It seems strange to me that this method would return `Vec<Range<usize>>` 
when it is called page_mask, and the caller clearly already has 
`&[PageLocation]` that can easily be combined with the mask...



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,

Review Comment:
   A comment explaining what these are would go a long way



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
     }
+}
 
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
-    }
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data

Review Comment:
   As data is sorted, you could consider 
https://doc.rust-lang.org/std/primitive.slice.html#method.binary_search or 
friends



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
     }
+}
 
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
-    }
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data
+                .iter()
+                .find(|(offset, bytes)| {
+                    *offset <= start as usize && (start as usize - *offset) < 
bytes.len()
+                })
+                .map(|(_, bytes)| bytes.slice(0..length).reader())

Review Comment:
   The line above allows offset to be greater than start, but this won't return 
the correct slice in such a case?



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
     }
+}
 
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
-    }
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data
+                .iter()
+                .find(|(offset, bytes)| {
+                    *offset <= start as usize && (start as usize - *offset) < 
bytes.len()

Review Comment:
   Perhaps we should do an exact match? I _think_ this should work?



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -286,7 +288,8 @@ where
 
         let meta = self.metadata.row_group(row_group_idx);
         let mut row_group = InMemoryRowGroup {
-            schema: meta.schema_descr_ptr(),
+            metadata: meta.clone(),

Review Comment:
   You could add a lifetime to `InMemoryRowGroup` and borrow the metadata onto 
it. I don't think anything is requiring it to have a static lifetime



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];

Review Comment:
   ```suggestion
               let mut page_start_offsets: Vec<Vec<usize>> = vec![];
   ```
   
   Or something to make it clearer what they are



##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -116,6 +118,62 @@ impl RowSelection {
         Self { selectors }
     }
 
+    /// Given an offset index, return a mask indicating which pages are 
selected along with their locations by `self`
+    pub fn page_mask(

Review Comment:
   ```suggestion
       pub(crate) fn page_mask(
   ```
   
   I don't think this likely to be useful outside the crate



##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),

Review Comment:
   Could we store `Arc<ColumnChunkData>` and avoid this clone?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to