This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new efe7bfd8b3 Support decryption of Parquet column and offset indexes 
(#7399)
efe7bfd8b3 is described below

commit efe7bfd8b327ed0cef61908ce0875f1e6b489485
Author: Adam Reeve <[email protected]>
AuthorDate: Fri Apr 18 01:24:50 2025 +1200

    Support decryption of Parquet column and offset indexes (#7399)
    
    * Add initial test for applying decryption to page index.
    
    * Add initial code for column_index read
    
    * Handle non-uniform encryption
    
    * Verify column and offset index are read
    
    * Extend tests
    
    * Tidy ups
    
    ---------
    
    Co-authored-by: Corwin Joy <[email protected]>
---
 parquet/src/encryption/decrypt.rs            |  24 ++++++
 parquet/src/file/metadata/reader.rs          | 111 +++++++++++++++++++++++++--
 parquet/tests/encryption/encryption.rs       |  52 ++++++++++++-
 parquet/tests/encryption/encryption_async.rs |  56 +++++++++++++-
 parquet/tests/encryption/encryption_util.rs  |  32 ++++++++
 5 files changed, 266 insertions(+), 9 deletions(-)

diff --git a/parquet/src/encryption/decrypt.rs 
b/parquet/src/encryption/decrypt.rs
index 0927421344..6a51f1a657 100644
--- a/parquet/src/encryption/decrypt.rs
+++ b/parquet/src/encryption/decrypt.rs
@@ -220,6 +220,26 @@ impl CryptoContext {
         )
     }
 
+    pub(crate) fn create_column_index_aad(&self) -> Result<Vec<u8>> {
+        create_module_aad(
+            self.file_aad(),
+            ModuleType::ColumnIndex,
+            self.row_group_idx,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
+    pub(crate) fn create_offset_index_aad(&self) -> Result<Vec<u8>> {
+        create_module_aad(
+            self.file_aad(),
+            ModuleType::OffsetIndex,
+            self.row_group_idx,
+            self.column_ordinal,
+            self.page_ordinal,
+        )
+    }
+
     pub(crate) fn for_dictionary_page(&self) -> Self {
         Self {
             row_group_idx: self.row_group_idx,
@@ -236,6 +256,10 @@ impl CryptoContext {
         &self.data_decryptor
     }
 
+    pub(crate) fn metadata_decryptor(&self) -> &Arc<dyn BlockDecryptor> {
+        &self.metadata_decryptor
+    }
+
     pub(crate) fn file_aad(&self) -> &Vec<u8> {
         &self.file_aad
     }
diff --git a/parquet/src/file/metadata/reader.rs 
b/parquet/src/file/metadata/reader.rs
index aebf1a8906..00b6b2d4f5 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -27,7 +27,7 @@ use crate::encryption::{
 };
 
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
+use crate::file::metadata::{ColumnChunkMetaData, FileMetaData, 
ParquetMetaData, RowGroupMetaData};
 use crate::file::page_index::index::Index;
 use crate::file::page_index::index_reader::{acc_range, decode_column_index, 
decode_offset_index};
 use crate::file::reader::ChunkReader;
@@ -41,6 +41,9 @@ use crate::thrift::{TCompactSliceInputProtocol, 
TSerializable};
 
 #[cfg(all(feature = "async", feature = "arrow"))]
 use crate::arrow::async_reader::{MetadataFetch, MetadataSuffixFetch};
+#[cfg(feature = "encryption")]
+use crate::encryption::decrypt::CryptoContext;
+use crate::file::page_index::offset_index::OffsetIndexMetaData;
 
 /// Reads the [`ParquetMetaData`] from a byte stream.
 ///
@@ -511,14 +514,22 @@ impl ParquetMetaDataReader {
             let index = metadata
                 .row_groups()
                 .iter()
-                .map(|x| {
+                .enumerate()
+                .map(|(rg_idx, x)| {
                     x.columns()
                         .iter()
-                        .map(|c| match c.column_index_range() {
+                        .enumerate()
+                        .map(|(col_idx, c)| match c.column_index_range() {
                             Some(r) => {
                                 let r_start = usize::try_from(r.start - 
start_offset)?;
                                 let r_end = usize::try_from(r.end - 
start_offset)?;
-                                decode_column_index(&bytes[r_start..r_end], 
c.column_type())
+                                Self::parse_single_column_index(
+                                    &bytes[r_start..r_end],
+                                    metadata,
+                                    c,
+                                    rg_idx,
+                                    col_idx,
+                                )
                             }
                             None => Ok(Index::NONE),
                         })
@@ -530,20 +541,67 @@ impl ParquetMetaDataReader {
         Ok(())
     }
 
+    #[cfg(feature = "encryption")]
+    fn parse_single_column_index(
+        bytes: &[u8],
+        metadata: &ParquetMetaData,
+        column: &ColumnChunkMetaData,
+        row_group_index: usize,
+        col_index: usize,
+    ) -> Result<Index> {
+        match &column.column_crypto_metadata {
+            Some(crypto_metadata) => {
+                let file_decryptor = 
metadata.file_decryptor.as_ref().ok_or_else(|| {
+                    general_err!("Cannot decrypt column index, no file 
decryptor set")
+                })?;
+                let crypto_context = CryptoContext::for_column(
+                    file_decryptor,
+                    crypto_metadata,
+                    row_group_index,
+                    col_index,
+                )?;
+                let column_decryptor = crypto_context.metadata_decryptor();
+                let aad = crypto_context.create_column_index_aad()?;
+                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+                decode_column_index(&plaintext, column.column_type())
+            }
+            None => decode_column_index(bytes, column.column_type()),
+        }
+    }
+
+    #[cfg(not(feature = "encryption"))]
+    fn parse_single_column_index(
+        bytes: &[u8],
+        _metadata: &ParquetMetaData,
+        column: &ColumnChunkMetaData,
+        _row_group_index: usize,
+        _col_index: usize,
+    ) -> Result<Index> {
+        decode_column_index(bytes, column.column_type())
+    }
+
     fn parse_offset_index(&mut self, bytes: &Bytes, start_offset: u64) -> 
Result<()> {
         let metadata = self.metadata.as_mut().unwrap();
         if self.offset_index {
             let index = metadata
                 .row_groups()
                 .iter()
-                .map(|x| {
+                .enumerate()
+                .map(|(rg_idx, x)| {
                     x.columns()
                         .iter()
-                        .map(|c| match c.offset_index_range() {
+                        .enumerate()
+                        .map(|(col_idx, c)| match c.offset_index_range() {
                             Some(r) => {
                                 let r_start = usize::try_from(r.start - 
start_offset)?;
                                 let r_end = usize::try_from(r.end - 
start_offset)?;
-                                decode_offset_index(&bytes[r_start..r_end])
+                                Self::parse_single_offset_index(
+                                    &bytes[r_start..r_end],
+                                    metadata,
+                                    c,
+                                    rg_idx,
+                                    col_idx,
+                                )
                             }
                             None => Err(general_err!("missing offset index")),
                         })
@@ -556,6 +614,45 @@ impl ParquetMetaDataReader {
         Ok(())
     }
 
+    #[cfg(feature = "encryption")]
+    fn parse_single_offset_index(
+        bytes: &[u8],
+        metadata: &ParquetMetaData,
+        column: &ColumnChunkMetaData,
+        row_group_index: usize,
+        col_index: usize,
+    ) -> Result<OffsetIndexMetaData> {
+        match &column.column_crypto_metadata {
+            Some(crypto_metadata) => {
+                let file_decryptor = 
metadata.file_decryptor.as_ref().ok_or_else(|| {
+                    general_err!("Cannot decrypt offset index, no file 
decryptor set")
+                })?;
+                let crypto_context = CryptoContext::for_column(
+                    file_decryptor,
+                    crypto_metadata,
+                    row_group_index,
+                    col_index,
+                )?;
+                let column_decryptor = crypto_context.metadata_decryptor();
+                let aad = crypto_context.create_offset_index_aad()?;
+                let plaintext = column_decryptor.decrypt(bytes, &aad)?;
+                decode_offset_index(&plaintext)
+            }
+            None => decode_offset_index(bytes),
+        }
+    }
+
+    #[cfg(not(feature = "encryption"))]
+    fn parse_single_offset_index(
+        bytes: &[u8],
+        _metadata: &ParquetMetaData,
+        _column: &ColumnChunkMetaData,
+        _row_group_index: usize,
+        _col_index: usize,
+    ) -> Result<OffsetIndexMetaData> {
+        decode_offset_index(bytes)
+    }
+
     fn range_for_page_index(&self) -> Option<Range<u64>> {
         // sanity check
         self.metadata.as_ref()?;
diff --git a/parquet/tests/encryption/encryption.rs 
b/parquet/tests/encryption/encryption.rs
index 86a148be2b..664850e507 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -17,7 +17,9 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
Arrow API
 
-use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use crate::encryption_util::{
+    verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
+};
 use arrow::array::*;
 use arrow::error::Result as ArrowResult;
 use arrow_array::{Int32Array, RecordBatch};
@@ -29,6 +31,7 @@ use parquet::arrow::ArrowWriter;
 use parquet::data_type::{ByteArray, ByteArrayType};
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::encryption::encrypt::FileEncryptionProperties;
+use parquet::errors::ParquetError;
 use parquet::file::metadata::ParquetMetaData;
 use parquet::file::properties::WriterProperties;
 use parquet::file::writer::SerializedFileWriter;
@@ -726,6 +729,53 @@ pub fn 
test_retrieve_row_group_statistics_after_encrypted_write() {
     );
 }
 
+#[test]
+fn test_decrypt_page_index_uniform() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+
+    let key_code: &[u8] = "0123456789012345".as_bytes();
+    let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
+        .build()
+        .unwrap();
+
+    test_decrypt_page_index(&path, decryption_properties).unwrap();
+}
+
+#[test]
+fn test_decrypt_page_index_non_uniform() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let footer_key = "0123456789012345".as_bytes().to_vec();
+    let column_1_key = "1234567890123450".as_bytes().to_vec();
+    let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    test_decrypt_page_index(&path, decryption_properties).unwrap();
+}
+
+fn test_decrypt_page_index(
+    path: &str,
+    decryption_properties: FileDecryptionProperties,
+) -> Result<(), ParquetError> {
+    let file = File::open(path)?;
+    let options = ArrowReaderOptions::default()
+        .with_file_decryption_properties(decryption_properties)
+        .with_page_index(true);
+
+    let arrow_metadata = ArrowReaderMetadata::load(&file, options)?;
+
+    verify_column_indexes(arrow_metadata.metadata());
+
+    Ok(())
+}
+
 fn read_and_roundtrip_to_encrypted_file(
     path: &str,
     decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/encryption/encryption_async.rs 
b/parquet/tests/encryption/encryption_async.rs
index 11448207c6..e0fbbcdfaf 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -17,7 +17,9 @@
 
 //! This module contains tests for reading encrypted Parquet files with the 
async Arrow API
 
-use crate::encryption_util::{verify_encryption_test_data, TestKeyRetriever};
+use crate::encryption_util::{
+    verify_column_indexes, verify_encryption_test_data, TestKeyRetriever,
+};
 use futures::TryStreamExt;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::arrow_writer::ArrowWriterOptions;
@@ -382,6 +384,58 @@ async fn test_uniform_encryption_with_key_retriever() {
         .unwrap();
 }
 
+#[tokio::test]
+async fn test_decrypt_page_index_uniform() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = format!("{test_data}/uniform_encryption.parquet.encrypted");
+
+    let key_code: &[u8] = "0123456789012345".as_bytes();
+    let decryption_properties = 
FileDecryptionProperties::builder(key_code.to_vec())
+        .build()
+        .unwrap();
+
+    test_decrypt_page_index(&path, decryption_properties)
+        .await
+        .unwrap();
+}
+
+#[tokio::test]
+async fn test_decrypt_page_index_non_uniform() {
+    let test_data = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{test_data}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let footer_key = "0123456789012345".as_bytes().to_vec();
+    let column_1_key = "1234567890123450".as_bytes().to_vec();
+    let column_2_key = "1234567890123451".as_bytes().to_vec();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.to_vec())
+        .with_column_key("double_field", column_1_key)
+        .with_column_key("float_field", column_2_key)
+        .build()
+        .unwrap();
+
+    test_decrypt_page_index(&path, decryption_properties)
+        .await
+        .unwrap();
+}
+
+async fn test_decrypt_page_index(
+    path: &str,
+    decryption_properties: FileDecryptionProperties,
+) -> Result<(), ParquetError> {
+    let mut file = File::open(&path).await?;
+
+    let options = ArrowReaderOptions::new()
+        .with_file_decryption_properties(decryption_properties)
+        .with_page_index(true);
+
+    let arrow_metadata = ArrowReaderMetadata::load_async(&mut file, 
options).await?;
+
+    verify_column_indexes(arrow_metadata.metadata());
+
+    Ok(())
+}
+
 async fn verify_encryption_test_file_read_async(
     file: &mut tokio::fs::File,
     decryption_properties: FileDecryptionProperties,
diff --git a/parquet/tests/encryption/encryption_util.rs 
b/parquet/tests/encryption/encryption_util.rs
index f0d1152fb0..382193d258 100644
--- a/parquet/tests/encryption/encryption_util.rs
+++ b/parquet/tests/encryption/encryption_util.rs
@@ -88,6 +88,38 @@ pub fn verify_encryption_test_data(record_batches: 
Vec<RecordBatch>, metadata: &
     assert_eq!(row_count, file_metadata.num_rows() as usize);
 }
 
+/// Verifies that the column and offset indexes were successfully read from an
+/// encrypted test file.
+pub fn verify_column_indexes(metadata: &ParquetMetaData) {
+    let offset_index = metadata.offset_index().unwrap();
+    // 1 row group, 8 columns
+    assert_eq!(offset_index.len(), 1);
+    assert_eq!(offset_index[0].len(), 8);
+    // Check float column, which is encrypted in the non-uniform test file
+    let float_col_idx = 4;
+    let offset_index = &offset_index[0][float_col_idx];
+    assert_eq!(offset_index.page_locations.len(), 1);
+    assert!(offset_index.page_locations[0].offset > 0);
+
+    let column_index = metadata.column_index().unwrap();
+    assert_eq!(column_index.len(), 1);
+    assert_eq!(column_index[0].len(), 8);
+    let column_index = &column_index[0][float_col_idx];
+
+    match column_index {
+        parquet::file::page_index::index::Index::FLOAT(float_index) => {
+            assert_eq!(float_index.indexes.len(), 1);
+            assert_eq!(float_index.indexes[0].min, Some(0.0f32));
+            assert!(float_index.indexes[0]
+                .max
+                .is_some_and(|max| (max - 53.9).abs() < 1e-6));
+        }
+        _ => {
+            panic!("Expected a float column index for column {}", 
float_col_idx);
+        }
+    };
+}
+
 /// A KeyRetriever to use in Parquet encryption tests,
 /// which stores a map from key names/metadata to encryption key bytes.
 pub struct TestKeyRetriever {

Reply via email to