This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b30336da8c Encapsulate encryption code more in readers (#7337)
b30336da8c is described below
commit b30336da8c1318f3f45e22f0a377ca21830fddac
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 27 16:43:59 2025 -0400
Encapsulate encryption code more in readers (#7337)
---
parquet/src/arrow/arrow_reader/mod.rs | 53 +++++++++++++++--------------------
parquet/src/arrow/async_reader/mod.rs | 32 ++++++---------------
parquet/src/file/serialized_reader.rs | 45 +++++++++++++++++++++++------
3 files changed, 68 insertions(+), 62 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index d8c116f013..66780fcd60 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -33,7 +33,7 @@ use
crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use crate::column::page::{PageIterator, PageReader};
#[cfg(feature = "encryption")]
-use crate::encryption::decrypt::{CryptoContext, FileDecryptionProperties};
+use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use crate::file::reader::{ChunkReader, SerializedPageReader};
@@ -682,13 +682,11 @@ struct ReaderPageIterator<T: ChunkReader> {
metadata: Arc<ParquetMetaData>,
}
-impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
- type Item = Result<Box<dyn PageReader>>;
-
- fn next(&mut self) -> Option<Self::Item> {
- let rg_idx = self.row_groups.next()?;
+impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
+ /// Return the next SerializedPageReader
+ fn next_page_reader(&mut self, rg_idx: usize) ->
Result<SerializedPageReader<T>> {
let rg = self.metadata.row_group(rg_idx);
- let meta = rg.column(self.column_idx);
+ let column_chunk_metadata = rg.column(self.column_idx);
let offset_index = self.metadata.offset_index();
// `offset_index` may not exist and `i[rg_idx]` will be empty.
// To avoid `i[rg_idx][self.column_idx`] panic, we need to filter out
empty `i[rg_idx]`.
@@ -698,32 +696,25 @@ impl<T: ChunkReader + 'static> Iterator for
ReaderPageIterator<T> {
let total_rows = rg.num_rows() as usize;
let reader = self.reader.clone();
- #[cfg(feature = "encryption")]
- let crypto_context = if let Some(file_decryptor) =
self.metadata.file_decryptor() {
- match meta.crypto_metadata() {
- Some(crypto_metadata) => {
- match CryptoContext::for_column(
- file_decryptor,
- crypto_metadata,
- rg_idx,
- self.column_idx,
- ) {
- Ok(context) => Some(Arc::new(context)),
- Err(err) => return Some(Err(err)),
- }
- }
- None => None,
- }
- } else {
- None
- };
-
- let ret = SerializedPageReader::new(reader, meta, total_rows,
page_locations);
+ SerializedPageReader::new(reader, column_chunk_metadata, total_rows,
page_locations)?
+ .add_crypto_context(
+ rg_idx,
+ self.column_idx,
+ self.metadata.as_ref(),
+ column_chunk_metadata,
+ )
+ }
+}
- #[cfg(feature = "encryption")]
- let ret = ret.map(|reader| reader.with_crypto_context(crypto_context));
+impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
+ type Item = Result<Box<dyn PageReader>>;
- Some(ret.map(|x| Box::new(x) as _))
+ fn next(&mut self) -> Option<Self::Item> {
+ let rg_idx = self.row_groups.next()?;
+ let page_reader = self
+ .next_page_reader(rg_idx)
+ .map(|page_reader| Box::new(page_reader) as _);
+ Some(page_reader)
}
}
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index dc8880830a..b0d5b0a710 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -59,9 +59,6 @@ use crate::format::{BloomFilterAlgorithm,
BloomFilterCompression, BloomFilterHas
mod metadata;
pub use metadata::*;
-#[cfg(feature = "encryption")]
-use crate::encryption::decrypt::CryptoContext;
-
#[cfg(feature = "object_store")]
mod store;
@@ -1026,6 +1023,7 @@ impl RowGroups for InMemoryRowGroup<'_> {
self.row_count
}
+ /// Return chunks for column i
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
match &self.column_chunks[i] {
None => Err(ParquetError::General(format!(
@@ -1037,31 +1035,19 @@ impl RowGroups for InMemoryRowGroup<'_> {
// filter out empty offset indexes (old versions specified
Some(vec![]) when no present)
.filter(|index| !index.is_empty())
.map(|index| index[i].page_locations.clone());
- let column_metadata =
self.metadata.row_group(self.row_group_idx).column(i);
+ let column_chunk_metadata =
self.metadata.row_group(self.row_group_idx).column(i);
let page_reader = SerializedPageReader::new(
data.clone(),
- column_metadata,
+ column_chunk_metadata,
self.row_count,
page_locations,
)?;
-
- #[cfg(feature = "encryption")]
- let crypto_context = if let Some(file_decryptor) =
self.metadata.file_decryptor() {
- match column_metadata.crypto_metadata() {
- Some(crypto_metadata) =>
Some(Arc::new(CryptoContext::for_column(
- file_decryptor,
- crypto_metadata,
- self.row_group_idx,
- i,
- )?)),
- None => None,
- }
- } else {
- None
- };
-
- #[cfg(feature = "encryption")]
- let page_reader =
page_reader.with_crypto_context(crypto_context);
+ let page_reader = page_reader.add_crypto_context(
+ self.row_group_idx,
+ i,
+ self.metadata,
+ column_chunk_metadata,
+ )?;
let page_reader: Box<dyn PageReader> = Box::new(page_reader);
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 2673f4ac52..183c481b24 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -571,22 +571,51 @@ impl<R: ChunkReader> SerializedPageReader<R> {
/// Creates a new serialized page reader from a chunk reader and metadata
pub fn new(
reader: Arc<R>,
- meta: &ColumnChunkMetaData,
+ column_chunk_metadata: &ColumnChunkMetaData,
total_rows: usize,
page_locations: Option<Vec<PageLocation>>,
) -> Result<Self> {
let props = Arc::new(ReaderProperties::builder().build());
- SerializedPageReader::new_with_properties(reader, meta, total_rows,
page_locations, props)
+ SerializedPageReader::new_with_properties(
+ reader,
+ column_chunk_metadata,
+ total_rows,
+ page_locations,
+ props,
+ )
}
- /// Adds cryptographical information to the reader.
+ /// Stub No-op implementation when encryption is disabled.
+ #[cfg(all(feature = "arrow", not(feature = "encryption")))]
+ pub(crate) fn add_crypto_context(
+ self,
+ _rg_idx: usize,
+ _column_idx: usize,
+ _parquet_meta_data: &ParquetMetaData,
+ _column_chunk_metadata: &ColumnChunkMetaData,
+ ) -> Result<SerializedPageReader<R>> {
+ Ok(self)
+ }
+
+ /// Adds any necessary crypto context to this page reader, if encryption
is enabled.
#[cfg(feature = "encryption")]
- pub(crate) fn with_crypto_context(
+ pub(crate) fn add_crypto_context(
mut self,
- crypto_context: Option<Arc<CryptoContext>>,
- ) -> Self {
- self.crypto_context = crypto_context;
- self
+ rg_idx: usize,
+ column_idx: usize,
+ parquet_meta_data: &ParquetMetaData,
+ column_chunk_metadata: &ColumnChunkMetaData,
+ ) -> Result<SerializedPageReader<R>> {
+ let Some(file_decryptor) = parquet_meta_data.file_decryptor() else {
+ return Ok(self);
+ };
+ let Some(crypto_metadata) = column_chunk_metadata.crypto_metadata()
else {
+ return Ok(self);
+ };
+ let crypto_context =
+ CryptoContext::for_column(file_decryptor, crypto_metadata, rg_idx,
column_idx)?;
+ self.crypto_context = Some(Arc::new(crypto_context));
+ Ok(self)
}
/// Creates a new serialized page with custom options.