This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch 57_maintenance
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/57_maintenance by this push:
     new 9fc2fbb72e [57_maintenance[Parquet] Provide only encrypted column 
stats in plaintext footer (#8305) (#9310)
9fc2fbb72e is described below

commit 9fc2fbb72e4a7e6dd1c339a69ff3a0f4ca5ec0c1
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Feb 2 10:04:50 2026 -0500

    [57_maintenance[Parquet] Provide only encrypted column stats in plaintext 
footer (#8305) (#9310)
    
    - Part of https://github.com/apache/arrow-rs/issues/9240
    - Related to https://github.com/apache/arrow-rs/issues/8304
    
    This is a backport of the following PR to the 57 line
    - https://github.com/apache/arrow-rs/pull/8305 from @rok
    
    Co-authored-by: Rok Mihevc <[email protected]>
    Co-authored-by: Adam Reeve <[email protected]>
---
 parquet/src/file/metadata/mod.rs               |   7 +
 parquet/src/file/metadata/thrift/encryption.rs |  12 +-
 parquet/src/file/metadata/thrift/mod.rs        | 116 +++++++------
 parquet/src/file/metadata/writer.rs            |  54 ++++---
 parquet/tests/encryption/encryption.rs         | 215 ++++++++++++++++++++++++-
 5 files changed, 331 insertions(+), 73 deletions(-)

diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 6bd426ee67..046ee4ca61 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -836,6 +836,11 @@ pub struct ColumnChunkMetaData {
     column_crypto_metadata: Option<Box<ColumnCryptoMetaData>>,
     #[cfg(feature = "encryption")]
     encrypted_column_metadata: Option<Vec<u8>>,
+    /// When true, indicates the footer is plaintext (not encrypted).
+    /// This affects how column metadata is serialized when 
`encrypted_column_metadata` is present.
+    /// This field is only used at write time and is not needed when reading 
metadata.
+    #[cfg(feature = "encryption")]
+    plaintext_footer_mode: bool,
 }
 
 /// Histograms for repetition and definition levels.
@@ -1238,6 +1243,8 @@ impl ColumnChunkMetaDataBuilder {
             column_crypto_metadata: None,
             #[cfg(feature = "encryption")]
             encrypted_column_metadata: None,
+            #[cfg(feature = "encryption")]
+            plaintext_footer_mode: false,
         })
     }
 
diff --git a/parquet/src/file/metadata/thrift/encryption.rs 
b/parquet/src/file/metadata/thrift/encryption.rs
index 9713cf936d..37e91ba99c 100644
--- a/parquet/src/file/metadata/thrift/encryption.rs
+++ b/parquet/src/file/metadata/thrift/encryption.rs
@@ -145,10 +145,18 @@ fn row_group_from_encrypted_thrift(
                 }
                 
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
                     let column_name = crypto_metadata.path_in_schema.join(".");
-                    decryptor.get_column_metadata_decryptor(
+                    // Try to get the decryptor - if it fails, we don't have 
the key
+                    match decryptor.get_column_metadata_decryptor(
                         column_name.as_str(),
                         crypto_metadata.key_metadata.as_deref(),
-                    )?
+                    ) {
+                        Ok(dec) => dec,
+                        Err(_) => {
+                            // We don't have the key for this column, so we 
can't decrypt its metadata.
+                            columns.push(c);
+                            continue;
+                        }
+                    }
                 }
                 Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
                     decryptor.get_footer_decryptor()?
diff --git a/parquet/src/file/metadata/thrift/mod.rs 
b/parquet/src/file/metadata/thrift/mod.rs
index 95ad67da6d..e9b950f2b6 100644
--- a/parquet/src/file/metadata/thrift/mod.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -1281,6 +1281,19 @@ impl PageHeader {
 /////////////////////////////////////////////////
 // helper functions for writing file meta data
 
+#[cfg(feature = "encryption")]
+fn should_write_column_stats(column_chunk: &ColumnChunkMetaData) -> bool {
+    // If there is encrypted column metadata present,
+    // the column is encrypted with a different key to the footer or a 
plaintext footer is used,
+    // so the statistics are sensitive and shouldn't be written.
+    column_chunk.encrypted_column_metadata.is_none()
+}
+
+#[cfg(not(feature = "encryption"))]
+fn should_write_column_stats(_column_chunk: &ColumnChunkMetaData) -> bool {
+    true
+}
+
 // serialize the bits of the column chunk needed for a thrift ColumnMetaData
 // struct ColumnMetaData {
 //   1: required Type type
@@ -1331,48 +1344,51 @@ pub(super) fn serialize_column_meta_data<W: Write>(
     if let Some(dictionary_page_offset) = column_chunk.dictionary_page_offset {
         last_field_id = dictionary_page_offset.write_thrift_field(w, 11, 
last_field_id)?;
     }
-    // PageStatistics is the same as thrift Statistics, but writable
-    let stats = page_stats_to_thrift(column_chunk.statistics());
-    if let Some(stats) = stats {
-        last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
-    }
-    if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
-        last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
-    }
-    if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
-        last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
-    }
-    if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
-        last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
-    }
 
-    // SizeStatistics
-    let size_stats = if column_chunk.unencoded_byte_array_data_bytes.is_some()
-        || column_chunk.repetition_level_histogram.is_some()
-        || column_chunk.definition_level_histogram.is_some()
-    {
-        let repetition_level_histogram = column_chunk
-            .repetition_level_histogram()
-            .map(|hist| hist.clone().into_inner());
-
-        let definition_level_histogram = column_chunk
-            .definition_level_histogram()
-            .map(|hist| hist.clone().into_inner());
-
-        Some(SizeStatistics {
-            unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
-            repetition_level_histogram,
-            definition_level_histogram,
-        })
-    } else {
-        None
-    };
-    if let Some(size_stats) = size_stats {
-        last_field_id = size_stats.write_thrift_field(w, 16, last_field_id)?;
-    }
+    if should_write_column_stats(column_chunk) {
+        // PageStatistics is the same as thrift Statistics, but writable
+        let stats = page_stats_to_thrift(column_chunk.statistics());
+        if let Some(stats) = stats {
+            last_field_id = stats.write_thrift_field(w, 12, last_field_id)?;
+        }
+        if let Some(page_encoding_stats) = column_chunk.page_encoding_stats() {
+            last_field_id = page_encoding_stats.write_thrift_field(w, 13, 
last_field_id)?;
+        }
+        if let Some(bloom_filter_offset) = column_chunk.bloom_filter_offset {
+            last_field_id = bloom_filter_offset.write_thrift_field(w, 14, 
last_field_id)?;
+        }
+        if let Some(bloom_filter_length) = column_chunk.bloom_filter_length {
+            last_field_id = bloom_filter_length.write_thrift_field(w, 15, 
last_field_id)?;
+        }
 
-    if let Some(geo_stats) = column_chunk.geo_statistics() {
-        geo_stats.write_thrift_field(w, 17, last_field_id)?;
+        // SizeStatistics
+        let size_stats = if 
column_chunk.unencoded_byte_array_data_bytes.is_some()
+            || column_chunk.repetition_level_histogram.is_some()
+            || column_chunk.definition_level_histogram.is_some()
+        {
+            let repetition_level_histogram = column_chunk
+                .repetition_level_histogram()
+                .map(|hist| hist.clone().into_inner());
+
+            let definition_level_histogram = column_chunk
+                .definition_level_histogram()
+                .map(|hist| hist.clone().into_inner());
+
+            Some(SizeStatistics {
+                unencoded_byte_array_data_bytes: 
column_chunk.unencoded_byte_array_data_bytes,
+                repetition_level_histogram,
+                definition_level_histogram,
+            })
+        } else {
+            None
+        };
+        if let Some(size_stats) = size_stats {
+            last_field_id = size_stats.write_thrift_field(w, 16, 
last_field_id)?;
+        }
+
+        if let Some(geo_stats) = column_chunk.geo_statistics() {
+            geo_stats.write_thrift_field(w, 17, last_field_id)?;
+        }
     }
 
     w.write_struct_end()
@@ -1592,17 +1608,17 @@ impl WriteThrift for ColumnChunkMetaData {
             .write_thrift_field(writer, 2, last_field_id)?;
 
         #[cfg(feature = "encryption")]
-        {
-            // only write the ColumnMetaData if we haven't already encrypted it
-            if self.encrypted_column_metadata.is_none() {
-                writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
-                serialize_column_meta_data(self, writer)?;
-                last_field_id = 3;
-            }
-        }
+        let write_meta_data =
+            self.encrypted_column_metadata.is_none() || 
self.plaintext_footer_mode;
         #[cfg(not(feature = "encryption"))]
-        {
-            // always write the ColumnMetaData
+        let write_meta_data = true;
+
+        // When the footer is encrypted and encrypted_column_metadata is 
present,
+        // skip writing the plaintext meta_data field to reduce footer size.
+        // When the footer is plaintext (plaintext_footer_mode=true), we still 
write
+        // meta_data for backward compatibility with readers that expect it, 
but with
+        // sensitive fields (statistics, bloom filter info, etc.) stripped out.
+        if write_meta_data {
             writer.write_field_begin(FieldType::Struct, 3, last_field_id)?;
             serialize_column_meta_data(self, writer)?;
             last_field_id = 3;
diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index 38215f5ecd..275b4ff28e 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -818,34 +818,48 @@ impl MetadataObjectWriter {
     ) -> Result<ColumnChunkMetaData> {
         // Column crypto metadata should have already been set when the column 
was created.
         // Here we apply the encryption by encrypting the column metadata if 
required.
-        match column_chunk.column_crypto_metadata.as_deref() {
-            None => {}
+        let encryptor = match column_chunk.column_crypto_metadata.as_deref() {
+            None => None,
             Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
+                let is_footer_encrypted = 
file_encryptor.properties().encrypt_footer();
+
                 // When uniform encryption is used the footer is already 
encrypted,
                 // so the column chunk does not need additional encryption.
+                // Except if we're in plaintext footer mode, then we need to 
encrypt
+                // the column metadata here.
+                if !is_footer_encrypted {
+                    Some(file_encryptor.get_footer_encryptor()?)
+                } else {
+                    None
+                }
             }
             Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) => 
{
-                use crate::file::metadata::thrift::serialize_column_meta_data;
-
                 let column_path = col_key.path_in_schema.join(".");
-                let mut column_encryptor = 
file_encryptor.get_column_encryptor(&column_path)?;
-                let aad = create_module_aad(
-                    file_encryptor.file_aad(),
-                    ModuleType::ColumnMetaData,
-                    row_group_index,
-                    column_index,
-                    None,
-                )?;
-                // create temp ColumnMetaData that we can encrypt
-                let mut buffer: Vec<u8> = vec![];
-                {
-                    let mut prot = ThriftCompactOutputProtocol::new(&mut 
buffer);
-                    serialize_column_meta_data(&column_chunk, &mut prot)?;
-                }
-                let ciphertext = column_encryptor.encrypt(&buffer, &aad)?;
+                Some(file_encryptor.get_column_encryptor(&column_path)?)
+            }
+        };
+
+        if let Some(mut encryptor) = encryptor {
+            use crate::file::metadata::thrift::serialize_column_meta_data;
 
-                column_chunk.encrypted_column_metadata = Some(ciphertext);
+            let aad = create_module_aad(
+                file_encryptor.file_aad(),
+                ModuleType::ColumnMetaData,
+                row_group_index,
+                column_index,
+                None,
+            )?;
+            // create temp ColumnMetaData that we can encrypt
+            let mut buffer: Vec<u8> = vec![];
+            {
+                let mut prot = ThriftCompactOutputProtocol::new(&mut buffer);
+                serialize_column_meta_data(&column_chunk, &mut prot)?;
             }
+            let ciphertext = encryptor.encrypt(&buffer, &aad)?;
+            column_chunk.encrypted_column_metadata = Some(ciphertext);
+            // Track whether the footer is plaintext, which affects how we 
serialize
+            // the column metadata (we need to write stripped metadata for 
backward compatibility)
+            column_chunk.plaintext_footer_mode = 
!file_encryptor.properties().encrypt_footer();
         }
 
         Ok(column_chunk)
diff --git a/parquet/tests/encryption/encryption.rs 
b/parquet/tests/encryption/encryption.rs
index f999abab95..5ea49d8663 100644
--- a/parquet/tests/encryption/encryption.rs
+++ b/parquet/tests/encryption/encryption.rs
@@ -34,7 +34,7 @@ use parquet::data_type::{ByteArray, ByteArrayType};
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::encryption::encrypt::FileEncryptionProperties;
 use parquet::errors::ParquetError;
-use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::{ColumnChunkMetaData, ParquetMetaData};
 use parquet::file::properties::WriterProperties;
 use parquet::file::writer::SerializedFileWriter;
 use parquet::schema::parser::parse_message_type;
@@ -719,6 +719,219 @@ fn test_write_uniform_encryption_plaintext_footer() {
     );
 }
 
+#[test]
+pub fn test_column_statistics_with_plaintext_footer() {
+    let footer_key = b"0123456789012345".to_vec();
+    let column_key = b"1234567890123450".to_vec();
+
+    // Encrypt with a plaintext footer and column-specific keys
+    let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
+        .with_plaintext_footer(true)
+        .with_column_key("x", column_key.clone())
+        .with_column_key("y", column_key.clone())
+        .with_column_key("s", column_key.clone())
+        .build()
+        .unwrap();
+
+    // Read with only the footer key and the key for one column
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("x", column_key.clone())
+        .build()
+        .unwrap();
+
+    // Reader can read plaintext stats from the unencrypted column z
+    // and column x for which the key is provided, but not columns y and s
+    // for which no key is provided.
+    write_and_read_stats(
+        Arc::clone(&encryption_properties),
+        Some(decryption_properties),
+        &[true, false, true, false],
+    );
+
+    // Read without any decryption properties.
+    // Reader can only read plaintext stats from the unencrypted column z.
+    write_and_read_stats(encryption_properties, None, &[false, false, true, 
false]);
+}
+
+#[test]
+pub fn test_column_statistics_with_plaintext_footer_and_uniform_encryption() {
+    let footer_key = b"0123456789012345".to_vec();
+
+    // Write with uniform encryption and a plaintext footer.
+    let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
+        .with_plaintext_footer(true)
+        .build()
+        .unwrap();
+
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .build()
+        .unwrap();
+
+    // Reader can read stats from plaintext footer metadata if a footer key is 
provided
+    write_and_read_stats(
+        Arc::clone(&encryption_properties),
+        Some(decryption_properties),
+        &[true, true, true, true],
+    );
+
+    // Reader can not read stats from plaintext footer metadata if no key is 
provided
+    write_and_read_stats(encryption_properties, None, &[false, false, false, 
false]);
+}
+
+#[test]
+pub fn test_column_statistics_with_encrypted_footer() {
+    let footer_key = b"0123456789012345".to_vec();
+    let column_key = b"1234567890123450".to_vec();
+
+    // Encrypt with an encrypted footer and column-specific keys
+    let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
+        .with_plaintext_footer(false)
+        .with_column_key("x", column_key.clone())
+        .with_column_key("y", column_key.clone())
+        .with_column_key("s", column_key.clone())
+        .build()
+        .unwrap();
+
+    // Read with only the footer key and the key for one column
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .with_column_key("x", column_key.clone())
+        .build()
+        .unwrap();
+
+    // Reader can read plaintext stats from the unencrypted column z
+    // and column x for which the key is provided, but not columns y and s
+    // for which no key is provided.
+    write_and_read_stats(
+        encryption_properties,
+        Some(decryption_properties),
+        &[true, false, true, false],
+    );
+}
+
+#[test]
+pub fn test_column_statistics_with_encrypted_footer_and_uniform_encryption() {
+    let footer_key = b"0123456789012345".to_vec();
+
+    // Encrypt with an encrypted footer and uniform encryption
+    let encryption_properties = 
FileEncryptionProperties::builder(footer_key.clone())
+        .with_plaintext_footer(false)
+        .build()
+        .unwrap();
+
+    // Read with the footer key
+    let decryption_properties = 
FileDecryptionProperties::builder(footer_key.clone())
+        .build()
+        .unwrap();
+
+    // Reader can read stats for all columns.
+    write_and_read_stats(
+        encryption_properties,
+        Some(decryption_properties),
+        &[true, true, true, true],
+    );
+}
+
+/// Write a file with encryption and then verify whether statistics are 
readable with the provided decryption properties.
+fn write_and_read_stats(
+    encryption_properties: Arc<FileEncryptionProperties>,
+    decryption_properties: Option<Arc<FileDecryptionProperties>>,
+    expect_stats: &[bool],
+) {
+    use parquet::basic::Type;
+
+    let int_values = Int32Array::from(vec![8, 3, 4, 19, 5]);
+    let int_values = Arc::new(int_values);
+    let string_values: StringArray = vec![
+        None,
+        Some("parquet"),
+        Some("encryption"),
+        Some("test"),
+        None,
+    ]
+    .into();
+    let string_values = Arc::new(string_values);
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("x", int_values.data_type().clone(), true),
+        Field::new("y", int_values.data_type().clone(), true),
+        Field::new("z", int_values.data_type().clone(), true),
+        Field::new("s", string_values.data_type().clone(), true),
+    ]));
+    let record_batches = vec![
+        RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                int_values.clone(),
+                int_values.clone(),
+                int_values.clone(),
+                string_values.clone(),
+            ],
+        )
+        .unwrap(),
+    ];
+
+    let props = WriterProperties::builder()
+        .with_file_encryption_properties(encryption_properties)
+        .set_bloom_filter_enabled(true)
+        .build();
+
+    let temp_file = tempfile::tempfile().unwrap();
+    let mut writer = ArrowWriter::try_new(&temp_file, schema.clone(), 
Some(props)).unwrap();
+    for batch in record_batches.clone() {
+        writer.write(&batch).unwrap();
+    }
+    let metadata = writer.close().unwrap();
+
+    let expected_min = 3i32.to_le_bytes();
+    let expected_max = 19i32.to_le_bytes();
+
+    let check_column_stats = |column: &ColumnChunkMetaData, expect_stats: 
bool| {
+        let is_byte_array = column.column_type() == Type::BYTE_ARRAY;
+        if expect_stats {
+            assert!(column.page_encoding_stats().is_some());
+            assert!(column.statistics().is_some());
+            if is_byte_array {
+                // Size statistics for BYTE_ARRAY columns
+                assert!(column.unencoded_byte_array_data_bytes().is_some());
+            } else {
+                let column_stats = column.statistics().unwrap();
+                assert_eq!(column_stats.min_bytes_opt(), 
Some(expected_min.as_slice()));
+                assert_eq!(column_stats.max_bytes_opt(), 
Some(expected_max.as_slice()));
+            }
+            assert!(column.bloom_filter_offset().is_some());
+            assert!(column.bloom_filter_length().is_some());
+        } else {
+            assert!(column.statistics().is_none());
+            assert!(column.page_encoding_stats().is_none());
+            assert!(column.bloom_filter_offset().is_none());
+            assert!(column.bloom_filter_length().is_none());
+            // Size statistics should also be stripped
+            if is_byte_array {
+                assert!(column.unencoded_byte_array_data_bytes().is_none());
+            }
+        }
+    };
+
+    // Check column statistics produced at write time are available in full
+    let row_group = metadata.row_group(0);
+    for column in row_group.columns().iter() {
+        check_column_stats(column, true);
+    }
+
+    // Verify the presence or not of statistics per-column when reading with 
the provided decryption properties
+    let mut options = 
ArrowReaderOptions::default().with_encoding_stats_as_mask(false);
+    if let Some(decryption_properties) = decryption_properties {
+        options = 
options.with_file_decryption_properties(decryption_properties);
+    }
+    let reader_metadata = ArrowReaderMetadata::load(&temp_file, 
options).unwrap();
+    let metadata = reader_metadata.metadata();
+    let row_group = metadata.row_group(0);
+
+    for (column, stats_expected) in 
row_group.columns().iter().zip(expect_stats) {
+        check_column_stats(column, *stats_expected);
+    }
+}
+
 #[test]
 fn test_write_uniform_encryption() {
     let testdata = arrow::util::test_util::parquet_test_data();

Reply via email to