thinkharderdev commented on code in PR #2473:
URL: https://github.com/apache/arrow-rs/pull/2473#discussion_r947766617


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -482,210 +480,170 @@ impl InMemoryRowGroup {
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let (_mask, ranges) = 
selection.page_mask(&page_locations[idx]);
+                        offsets.push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut offsets = offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(page_offsets) = offsets.next() {
+                    let mut chunks = Vec::with_capacity(page_offsets.len());
+                    for _ in 0..page_offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
page_offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    })
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    });
+                }
             }
         }
+
         Ok(())
     }
 }
 
 impl RowGroupCollection for InMemoryRowGroup {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
         self.row_count
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        Arc::new(data.clone()),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
     }
+}
 
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
-    }
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data
+                .iter()
+                .find(|(offset, bytes)| {
+                    *offset <= start as usize && (start as usize - *offset) < 
bytes.len()

Review Comment:
   I wasn't sure whether there is ever a case in which we fetch some subset of 
the page. Thinking about it more I don't believe that would ever be a valid use 
case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to