This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b30336da8c Encapsulate encryption code more in readers (#7337)
b30336da8c is described below

commit b30336da8c1318f3f45e22f0a377ca21830fddac
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 27 16:43:59 2025 -0400

    Encapsulate encryption code more in readers (#7337)
---
 parquet/src/arrow/arrow_reader/mod.rs | 53 +++++++++++++++--------------------
 parquet/src/arrow/async_reader/mod.rs | 32 ++++++---------------
 parquet/src/file/serialized_reader.rs | 45 +++++++++++++++++++++++------
 3 files changed, 68 insertions(+), 62 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs 
b/parquet/src/arrow/arrow_reader/mod.rs
index d8c116f013..66780fcd60 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -33,7 +33,7 @@ use 
crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
 use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
 use crate::column::page::{PageIterator, PageReader};
 #[cfg(feature = "encryption")]
-use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
+use crate::encryption::decrypt::FileDecryptionProperties;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
 use crate::file::reader::{ChunkReader, SerializedPageReader};
@@ -682,13 +682,11 @@ struct ReaderPageIterator<T: ChunkReader> {
     metadata: Arc<ParquetMetaData>,
 }
 
-impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
-    type Item = Result<Box<dyn PageReader>>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        let rg_idx = self.row_groups.next()?;
+impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
+    /// Return the next SerializedPageReader
+    fn next_page_reader(&mut self, rg_idx: usize) -> 
Result<SerializedPageReader<T>> {
         let rg = self.metadata.row_group(rg_idx);
-        let meta = rg.column(self.column_idx);
+        let column_chunk_metadata = rg.column(self.column_idx);
         let offset_index = self.metadata.offset_index();
         // `offset_index` may not exist and `i[rg_idx]` will be empty.
         // To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out 
empty `i[rg_idx]`.
@@ -698,32 +696,25 @@ impl<T: ChunkReader + 'static> Iterator for 
ReaderPageIterator<T> {
         let total_rows = rg.num_rows() as usize;
         let reader = self.reader.clone();
 
-        #[cfg(feature = "encryption")]
-        let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
-            match meta.crypto_metadata() {
-                Some(crypto_metadata) => {
-                    match CryptoContext::for_column(
-                        file_decryptor,
-                        crypto_metadata,
-                        rg_idx,
-                        self.column_idx,
-                    ) {
-                        Ok(context) => Some(Arc::new(context)),
-                        Err(err) => return Some(Err(err)),
-                    }
-                }
-                None => None,
-            }
-        } else {
-            None
-        };
-
-        let ret = SerializedPageReader::new(reader, meta, total_rows, 
page_locations);
+        SerializedPageReader::new(reader, column_chunk_metadata, total_rows, 
page_locations)?
+            .add_crypto_context(
+                rg_idx,
+                self.column_idx,
+                self.metadata.as_ref(),
+                column_chunk_metadata,
+            )
+    }
+}
 
-        #[cfg(feature = "encryption")]
-        let ret = ret.map(|reader| reader.with_crypto_context(crypto_context));
+impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
+    type Item = Result<Box<dyn PageReader>>;
 
-        Some(ret.map(|x| Box::new(x) as _))
+    fn next(&mut self) -> Option<Self::Item> {
+        let rg_idx = self.row_groups.next()?;
+        let page_reader = self
+            .next_page_reader(rg_idx)
+            .map(|page_reader| Box::new(page_reader) as _);
+        Some(page_reader)
     }
 }
 
diff --git a/parquet/src/arrow/async_reader/mod.rs 
b/parquet/src/arrow/async_reader/mod.rs
index dc8880830a..b0d5b0a710 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -59,9 +59,6 @@ use crate::format::{BloomFilterAlgorithm, 
BloomFilterCompression, BloomFilterHas
 mod metadata;
 pub use metadata::*;
 
-#[cfg(feature = "encryption")]
-use crate::encryption::decrypt::CryptoContext;
-
 #[cfg(feature = "object_store")]
 mod store;
 
@@ -1026,6 +1023,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
         self.row_count
     }
 
+    /// Return chunks for column i
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
         match &self.column_chunks[i] {
             None => Err(ParquetError::General(format!(
@@ -1037,31 +1035,19 @@ impl RowGroups for InMemoryRowGroup<'_> {
                     // filter out empty offset indexes (old versions specified 
Some(vec![]) when no present)
                     .filter(|index| !index.is_empty())
                     .map(|index| index[i].page_locations.clone());
-                let column_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
+                let column_chunk_metadata = 
self.metadata.row_group(self.row_group_idx).column(i);
                 let page_reader = SerializedPageReader::new(
                     data.clone(),
-                    column_metadata,
+                    column_chunk_metadata,
                     self.row_count,
                     page_locations,
                 )?;
-
-                #[cfg(feature = "encryption")]
-                let crypto_context = if let Some(file_decryptor) = 
self.metadata.file_decryptor() {
-                    match column_metadata.crypto_metadata() {
-                        Some(crypto_metadata) => 
Some(Arc::new(CryptoContext::for_column(
-                            file_decryptor,
-                            crypto_metadata,
-                            self.row_group_idx,
-                            i,
-                        )?)),
-                        None => None,
-                    }
-                } else {
-                    None
-                };
-
-                #[cfg(feature = "encryption")]
-                let page_reader = 
page_reader.with_crypto_context(crypto_context);
+                let page_reader = page_reader.add_crypto_context(
+                    self.row_group_idx,
+                    i,
+                    self.metadata,
+                    column_chunk_metadata,
+                )?;
 
                 let page_reader: Box<dyn PageReader> = Box::new(page_reader);
 
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 2673f4ac52..183c481b24 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -571,22 +571,51 @@ impl<R: ChunkReader> SerializedPageReader<R> {
     /// Creates a new serialized page reader from a chunk reader and metadata
     pub fn new(
         reader: Arc<R>,
-        meta: &ColumnChunkMetaData,
+        column_chunk_metadata: &ColumnChunkMetaData,
         total_rows: usize,
         page_locations: Option<Vec<PageLocation>>,
     ) -> Result<Self> {
         let props = Arc::new(ReaderProperties::builder().build());
-        SerializedPageReader::new_with_properties(reader, meta, total_rows, 
page_locations, props)
+        SerializedPageReader::new_with_properties(
+            reader,
+            column_chunk_metadata,
+            total_rows,
+            page_locations,
+            props,
+        )
     }
 
-    /// Adds cryptographical information to the reader.
+    /// Stub No-op implementation when encryption is disabled.
+    #[cfg(all(feature = "arrow", not(feature = "encryption")))]
+    pub(crate) fn add_crypto_context(
+        self,
+        _rg_idx: usize,
+        _column_idx: usize,
+        _parquet_meta_data: &ParquetMetaData,
+        _column_chunk_metadata: &ColumnChunkMetaData,
+    ) -> Result<SerializedPageReader<R>> {
+        Ok(self)
+    }
+
+    /// Adds any necessary crypto context to this page reader, if encryption 
is enabled.
     #[cfg(feature = "encryption")]
-    pub(crate) fn with_crypto_context(
+    pub(crate) fn add_crypto_context(
         mut self,
-        crypto_context: Option<Arc<CryptoContext>>,
-    ) -> Self {
-        self.crypto_context = crypto_context;
-        self
+        rg_idx: usize,
+        column_idx: usize,
+        parquet_meta_data: &ParquetMetaData,
+        column_chunk_metadata: &ColumnChunkMetaData,
+    ) -> Result<SerializedPageReader<R>> {
+        let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
+            return Ok(self);
+        };
+        let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata() 
else {
+            return Ok(self);
+        };
+        let crypto_context =
+            CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx, 
column_idx)?;
+        self.crypto_context = Some(Arc::new(crypto_context));
+        Ok(self)
     }
 
     /// Creates a new serialized page with custom options.

Reply via email to