This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 891d31d280 [thrift-remodel] Refactor Thrift encryption and store
encodings as bitmask (#8587)
891d31d280 is described below
commit 891d31d280a4403a25c3158968584e9a678ba5b7
Author: Ed Seidl <[email protected]>
AuthorDate: Mon Oct 13 12:36:04 2025 -0700
[thrift-remodel] Refactor Thrift encryption and store encodings as bitmask
(#8587)
# Which issue does this PR close?
- Part of #5853.
- Part of #8518.
- Closes #8588.
# Rationale for this change
Reduce code clutter (too many `[cfg(feature = "encryption")]`s).
Also reduces the number of vector allocations using a strategy proposed
by @jhorstmann
(https://github.com/apache/arrow-rs/issues/8518#issuecomment-3355199067).
# What changes are included in this PR?
Refactors (most) of the Thrift decoding involving encryption into a new
module.
Replaces the `encodings` vector in `ColumnChunkMetaData` with a bitmask.
# Are these changes tested?
Covered by existing and new tests.
# Are there any user-facing changes?
Yes. This changes the return type of `ColumnChunkMetaData::encodings`
from `&Vec<Encoding>` to `Vec<Encoding>` (the vector is now created on
demand rather than stored).
---
parquet/src/basic.rs | 182 +++++++++++
parquet/src/column/writer/mod.rs | 19 +-
parquet/src/file/metadata/encryption.rs | 544 ++++++++++++++++++++++++++++++++
parquet/src/file/metadata/mod.rs | 38 ++-
parquet/src/file/metadata/parser.rs | 2 +-
parquet/src/file/metadata/thrift_gen.rs | 404 +-----------------------
parquet/src/file/metadata/writer.rs | 2 +-
parquet/src/file/writer.rs | 3 +-
parquet/src/schema/printer.rs | 6 +-
9 files changed, 784 insertions(+), 416 deletions(-)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index dc6402e886..0220b1b55d 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -25,6 +25,7 @@ use std::str::FromStr;
use std::{fmt, str};
pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
+use crate::file::metadata::HeapSize;
use crate::parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol,
WriteThrift, WriteThriftField,
@@ -724,6 +725,132 @@ impl FromStr for Encoding {
}
}
+/// A bitmask representing the [`Encoding`]s employed while encoding a Parquet
column chunk.
+///
+/// The Parquet [`ColumnMetaData`] struct contains an array that indicates
what encodings were
+/// used when writing that column chunk. For memory and performance reasons,
this crate reduces
+/// that array to bitmask, where each bit position represents a different
[`Encoding`]. This
+/// struct contains that bitmask, and provides methods to interact with the
data.
+///
+/// # Example
+/// ```no_run
+/// # use parquet::file::metadata::ParquetMetaDataReader;
+/// # use parquet::basic::Encoding;
+/// # fn open_parquet_file(path: &str) -> std::fs::File { unimplemented!(); }
+/// // read parquet metadata from a file
+/// let file = open_parquet_file("some_path.parquet");
+/// let mut reader = ParquetMetaDataReader::new();
+/// reader.try_parse(&file).unwrap();
+/// let metadata = reader.finish().unwrap();
+///
+/// // find the encodings used by the first column chunk in the first row group
+/// let col_meta = metadata.row_group(0).column(0);
+/// let encodings = col_meta.encodings_mask();
+///
+/// // check to see if a particular encoding was used
+/// let used_rle = encodings.is_set(Encoding::RLE);
+///
+/// // check to see if all of a set of encodings were used
+/// let used_all = encodings.all_set([Encoding::RLE, Encoding::PLAIN].iter());
+///
+/// // convert mask to a Vec<Encoding>
+/// let encodings_vec = encodings.encodings().collect::<Vec<_>>();
+/// ```
+///
+/// [`ColumnMetaData`]:
https://github.com/apache/parquet-format/blob/9fd57b59e0ce1a82a69237dcf8977d3e72a2965d/src/main/thrift/parquet.thrift#L875
+#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
+pub struct EncodingMask(i32);
+
+impl EncodingMask {
+ /// Highest valued discriminant in the [`Encoding`] enum
+ const MAX_ENCODING: i32 = Encoding::BYTE_STREAM_SPLIT as i32;
+ /// A mask consisting of unused bit positions, used for validation. This
includes the never
+ /// used GROUP_VAR_INT encoding value of `1`.
+ const ALLOWED_MASK: u32 =
+ !(1u32 << (EncodingMask::MAX_ENCODING as u32 + 1)).wrapping_sub(1) | 1
<< 1;
+
+ /// Attempt to create a new `EncodingMask` from an integer.
+ ///
+ /// This will return an error if a bit outside the allowable range is set.
+ pub fn try_new(val: i32) -> Result<Self> {
+ if val as u32 & Self::ALLOWED_MASK != 0 {
+ return Err(general_err!("Attempt to create invalid mask: 0x{:x}",
val));
+ }
+ Ok(Self(val))
+ }
+
+ /// Return an integer representation of this `EncodingMask`.
+ pub fn as_i32(&self) -> i32 {
+ self.0
+ }
+
+ /// Create a new `EncodingMask` from a collection of [`Encoding`]s.
+ pub fn new_from_encodings<'a>(encodings: impl Iterator<Item = &'a
Encoding>) -> Self {
+ let mut mask = 0;
+ for &e in encodings {
+ mask |= 1 << (e as i32);
+ }
+ Self(mask)
+ }
+
+ /// Test if a given [`Encoding`] is present in this mask.
+ pub fn is_set(&self, val: Encoding) -> bool {
+ self.0 & (1 << (val as i32)) != 0
+ }
+
+ /// Test if all [`Encoding`]s in a given set are present in this mask.
+ pub fn all_set<'a>(&self, mut encodings: impl Iterator<Item = &'a
Encoding>) -> bool {
+ encodings.all(|&e| self.is_set(e))
+ }
+
+ /// Return an iterator over all [`Encoding`]s present in this mask.
+ pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
+ Self::mask_to_encodings_iter(self.0)
+ }
+
+ fn mask_to_encodings_iter(mask: i32) -> impl Iterator<Item = Encoding> {
+ (0..=Self::MAX_ENCODING)
+ .filter(move |i| mask & (1 << i) != 0)
+ .map(i32_to_encoding)
+ }
+}
+
+impl HeapSize for EncodingMask {
+ fn heap_size(&self) -> usize {
+ 0 // no heap allocations
+ }
+}
+
+impl<'a, R: ThriftCompactInputProtocol<'a>> ReadThrift<'a, R> for EncodingMask
{
+ fn read_thrift(prot: &mut R) -> Result<Self> {
+ let mut mask = 0;
+
+ // This reads a Thrift `list<Encoding>` and turns it into a bitmask
+ let list_ident = prot.read_list_begin()?;
+ for _ in 0..list_ident.size {
+ let val = Encoding::read_thrift(prot)?;
+ mask |= 1 << val as i32;
+ }
+ Ok(Self(mask))
+ }
+}
+
+#[allow(deprecated)]
+fn i32_to_encoding(val: i32) -> Encoding {
+ match val {
+ 0 => Encoding::PLAIN,
+ 2 => Encoding::PLAIN_DICTIONARY,
+ 3 => Encoding::RLE,
+ 4 => Encoding::BIT_PACKED,
+ 5 => Encoding::DELTA_BINARY_PACKED,
+ 6 => Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ 7 => Encoding::DELTA_BYTE_ARRAY,
+ 8 => Encoding::RLE_DICTIONARY,
+ 9 => Encoding::BYTE_STREAM_SPLIT,
+ _ => panic!("Impossible encoding {val}"),
+ }
+}
+
// ----------------------------------------------------------------------
// Mirrors thrift enum `CompressionCodec`
@@ -2409,4 +2536,59 @@ mod tests {
assert_eq!(EdgeInterpolationAlgorithm::ANDOYER.to_string(), "ANDOYER");
assert_eq!(EdgeInterpolationAlgorithm::KARNEY.to_string(), "KARNEY");
}
+
+ fn encodings_roundtrip(mut encodings: Vec<Encoding>) {
+ encodings.sort();
+ let mask = EncodingMask::new_from_encodings(encodings.iter());
+ assert!(mask.all_set(encodings.iter()));
+ let v = mask.encodings().collect::<Vec<_>>();
+ assert_eq!(v, encodings);
+ }
+
+ #[test]
+ fn test_encoding_roundtrip() {
+ encodings_roundtrip(
+ [
+ Encoding::RLE,
+ Encoding::PLAIN,
+ Encoding::DELTA_BINARY_PACKED,
+ ]
+ .into(),
+ );
+ encodings_roundtrip([Encoding::RLE_DICTIONARY,
Encoding::PLAIN_DICTIONARY].into());
+ encodings_roundtrip([].into());
+ let encodings = [
+ Encoding::PLAIN,
+ Encoding::BIT_PACKED,
+ Encoding::RLE,
+ Encoding::DELTA_BINARY_PACKED,
+ Encoding::DELTA_BYTE_ARRAY,
+ Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ Encoding::PLAIN_DICTIONARY,
+ Encoding::RLE_DICTIONARY,
+ Encoding::BYTE_STREAM_SPLIT,
+ ];
+ encodings_roundtrip(encodings.into());
+ }
+
+ #[test]
+ fn test_invalid_encoding_mask() {
+ // any set bits higher than the max should trigger an error
+ let res = EncodingMask::try_new(-1);
+ assert!(res.is_err());
+ let err = res.unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: Attempt to create invalid mask: 0xffffffff"
+ );
+
+ // test that GROUP_VAR_INT is disallowed
+ let res = EncodingMask::try_new(2);
+ assert!(res.is_err());
+ let err = res.unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: Attempt to create invalid mask: 0x2"
+ );
+ }
}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 298a0633a9..fdb94962b6 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -27,7 +27,7 @@ use std::collections::{BTreeSet, VecDeque};
use std::str;
use crate::basic::{
- BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType,
PageType, Type,
+ BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask,
LogicalType, PageType, Type,
};
use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::{ColumnValueEncoder,
ColumnValueEncoderImpl, ColumnValues};
@@ -1190,7 +1190,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
- .set_encodings(self.encodings.iter().cloned().collect())
+
.set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
.set_page_encoding_stats(self.encoding_stats.clone())
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
@@ -1734,7 +1734,10 @@ mod tests {
assert_eq!(r.rows_written, 4);
let metadata = r.metadata;
- assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN,
Encoding::RLE]);
+ assert_eq!(
+ metadata.encodings().collect::<Vec<_>>(),
+ vec![Encoding::PLAIN, Encoding::RLE]
+ );
assert_eq!(metadata.num_values(), 4); // just values
assert_eq!(metadata.dictionary_page_offset(), None);
}
@@ -2095,8 +2098,8 @@ mod tests {
let metadata = r.metadata;
assert_eq!(
- metadata.encodings(),
- &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
+ metadata.encodings().collect::<Vec<_>>(),
+ vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 4);
assert_eq!(metadata.compressed_size(), 20);
@@ -2221,8 +2224,8 @@ mod tests {
let metadata = r.metadata;
assert_eq!(
- metadata.encodings(),
- &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
+ metadata.encodings().collect::<Vec<_>>(),
+ vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
);
assert_eq!(metadata.num_values(), 4);
assert_eq!(metadata.compressed_size(), 20);
@@ -4100,7 +4103,7 @@ mod tests {
.build();
let meta = column_write_and_get_metadata::<T>(props, data);
assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
- assert_eq!(meta.encodings(), encodings);
+ assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
}
diff --git a/parquet/src/file/metadata/encryption.rs
b/parquet/src/file/metadata/encryption.rs
new file mode 100644
index 0000000000..4c97640b67
--- /dev/null
+++ b/parquet/src/file/metadata/encryption.rs
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// a collection of generated structs used to parse thrift metadata
+
+use std::io::Write;
+
+use crate::{
+ basic::{Compression, EncodingMask},
+ encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
+ errors::{ParquetError, Result},
+ file::{
+ column_crypto_metadata::ColumnCryptoMetaData,
+ metadata::{
+ HeapSize, LevelHistogram, PageEncodingStats, ParquetMetaData,
RowGroupMetaData,
+ thrift_gen::{
+ GeospatialStatistics, SizeStatistics, Statistics,
convert_geo_stats, convert_stats,
+ parquet_metadata_from_bytes,
+ },
+ },
+ },
+ parquet_thrift::{
+ ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
+ ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
WriteThriftField,
+ read_thrift_vec,
+ },
+ thrift_struct, thrift_union,
+};
+
+thrift_struct!(
+pub(crate) struct AesGcmV1 {
+ /// AAD prefix
+ 1: optional binary aad_prefix
+
+ /// Unique file identifier part of AAD suffix
+ 2: optional binary aad_file_unique
+
+ /// In files encrypted with AAD prefix without storing it,
+ /// readers must supply the prefix
+ 3: optional bool supply_aad_prefix
+}
+);
+
+impl HeapSize for AesGcmV1 {
+ fn heap_size(&self) -> usize {
+ self.aad_prefix.heap_size()
+ + self.aad_file_unique.heap_size()
+ + self.supply_aad_prefix.heap_size()
+ }
+}
+
+thrift_struct!(
+pub(crate) struct AesGcmCtrV1 {
+ /// AAD prefix
+ 1: optional binary aad_prefix
+
+ /// Unique file identifier part of AAD suffix
+ 2: optional binary aad_file_unique
+
+ /// In files encrypted with AAD prefix without storing it,
+ /// readers must supply the prefix
+ 3: optional bool supply_aad_prefix
+}
+);
+
+impl HeapSize for AesGcmCtrV1 {
+ fn heap_size(&self) -> usize {
+ self.aad_prefix.heap_size()
+ + self.aad_file_unique.heap_size()
+ + self.supply_aad_prefix.heap_size()
+ }
+}
+
+thrift_union!(
+union EncryptionAlgorithm {
+ 1: (AesGcmV1) AES_GCM_V1
+ 2: (AesGcmCtrV1) AES_GCM_CTR_V1
+}
+);
+
+impl HeapSize for EncryptionAlgorithm {
+ fn heap_size(&self) -> usize {
+ match self {
+ Self::AES_GCM_V1(gcm) => gcm.heap_size(),
+ Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
+ }
+ }
+}
+
+thrift_struct!(
+/// Crypto metadata for files with encrypted footer
+pub(crate) struct FileCryptoMetaData<'a> {
+ /// Encryption algorithm. This field is only used for files
+ /// with encrypted footer. Files with plaintext footer store algorithm id
+ /// inside footer (FileMetaData structure).
+ 1: required EncryptionAlgorithm encryption_algorithm
+
+ /// Retrieval metadata of key used for encryption of footer,
+ /// and (possibly) columns.
+ 2: optional binary<'a> key_metadata
+}
+);
+
+fn row_group_from_encrypted_thrift(
+ mut rg: RowGroupMetaData,
+ decryptor: Option<&FileDecryptor>,
+) -> Result<RowGroupMetaData> {
+ let schema_descr = rg.schema_descr;
+
+ if schema_descr.num_columns() != rg.columns.len() {
+ return Err(general_err!(
+ "Column count mismatch. Schema has {} columns while Row Group has
{}",
+ schema_descr.num_columns(),
+ rg.columns.len()
+ ));
+ }
+ let total_byte_size = rg.total_byte_size;
+ let num_rows = rg.num_rows;
+ let mut columns = vec![];
+
+ for (i, (mut c, d)) in rg
+ .columns
+ .drain(0..)
+ .zip(schema_descr.columns())
+ .enumerate()
+ {
+ // Read encrypted metadata if it's present and we have a decryptor.
+ if let (true, Some(decryptor)) =
(c.encrypted_column_metadata.is_some(), decryptor) {
+ let column_decryptor = match c.crypto_metadata() {
+ None => {
+ return Err(general_err!(
+ "No crypto_metadata is set for column '{}', which has
encrypted metadata",
+ d.path().string()
+ ));
+ }
+
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
+ let column_name = crypto_metadata.path_in_schema.join(".");
+ decryptor.get_column_metadata_decryptor(
+ column_name.as_str(),
+ crypto_metadata.key_metadata.as_deref(),
+ )?
+ }
+ Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
+ decryptor.get_footer_decryptor()?
+ }
+ };
+
+ let column_aad = crate::encryption::modules::create_module_aad(
+ decryptor.file_aad(),
+ crate::encryption::modules::ModuleType::ColumnMetaData,
+ rg.ordinal.unwrap() as usize,
+ i,
+ None,
+ )?;
+
+ // Take the encrypted column metadata as it is no longer needed.
+ let encrypted_column_metadata = c.encrypted_column_metadata.take();
+ let buf = encrypted_column_metadata.unwrap();
+ let decrypted_cc_buf = column_decryptor
+ .decrypt(&buf, column_aad.as_ref())
+ .map_err(|_| {
+ general_err!(
+ "Unable to decrypt column '{}', perhaps the column key
is wrong?",
+ d.path().string()
+ )
+ })?;
+
+ // parse decrypted buffer and then replace fields in 'c'
+ let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;
+
+ let (
+ unencoded_byte_array_data_bytes,
+ repetition_level_histogram,
+ definition_level_histogram,
+ ) = if let Some(size_stats) = col_meta.size_statistics {
+ (
+ size_stats.unencoded_byte_array_data_bytes,
+ size_stats.repetition_level_histogram,
+ size_stats.definition_level_histogram,
+ )
+ } else {
+ (None, None, None)
+ };
+
+ let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
+ let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
+
+ c.encodings = col_meta.encodings;
+ c.compression = col_meta.codec;
+ c.num_values = col_meta.num_values;
+ c.total_uncompressed_size = col_meta.total_uncompressed_size;
+ c.total_compressed_size = col_meta.total_compressed_size;
+ c.data_page_offset = col_meta.data_page_offset;
+ c.index_page_offset = col_meta.index_page_offset;
+ c.dictionary_page_offset = col_meta.dictionary_page_offset;
+ c.statistics = convert_stats(d.physical_type(),
col_meta.statistics)?;
+ c.encoding_stats = col_meta.encoding_stats;
+ c.bloom_filter_offset = col_meta.bloom_filter_offset;
+ c.bloom_filter_length = col_meta.bloom_filter_length;
+ c.unencoded_byte_array_data_bytes =
unencoded_byte_array_data_bytes;
+ c.repetition_level_histogram = repetition_level_histogram;
+ c.definition_level_histogram = definition_level_histogram;
+ c.geo_statistics =
convert_geo_stats(col_meta.geospatial_statistics);
+
+ columns.push(c);
+ } else {
+ columns.push(c);
+ }
+ }
+
+ let sorting_columns = rg.sorting_columns;
+ let file_offset = rg.file_offset;
+ let ordinal = rg.ordinal;
+
+ Ok(RowGroupMetaData {
+ columns,
+ num_rows,
+ sorting_columns,
+ total_byte_size,
+ schema_descr,
+ file_offset,
+ ordinal,
+ })
+}
+
+/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata
that may be encrypted.
+///
+/// Typically this is used to decode the metadata from the end of a parquet
+/// file. The format of `buf` is the Thrift compact binary protocol, as
specified
+/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
+/// ciphers as specfied in the [Parquet Encryption Spec].
+///
+/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
+/// [Parquet Encryption Spec]:
https://parquet.apache.org/docs/file-format/data-pages/encryption/
+pub(crate) fn parquet_metadata_with_encryption(
+ file_decryption_properties: Option<&FileDecryptionProperties>,
+ encrypted_footer: bool,
+ buf: &[u8],
+) -> Result<ParquetMetaData> {
+ use crate::file::metadata::ParquetMetaDataBuilder;
+
+ let mut buf = buf;
+ let mut file_decryptor = None;
+ let decrypted_fmd_buf;
+
+ if encrypted_footer {
+ let mut prot = ThriftSliceInputProtocol::new(buf);
+ if let Some(file_decryption_properties) = file_decryption_properties {
+ let t_file_crypto_metadata: FileCryptoMetaData =
+ FileCryptoMetaData::read_thrift(&mut prot)
+ .map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
+ let supply_aad_prefix = match
&t_file_crypto_metadata.encryption_algorithm {
+ EncryptionAlgorithm::AES_GCM_V1(algo) =>
algo.supply_aad_prefix,
+ _ => Some(false),
+ }
+ .unwrap_or(false);
+ if supply_aad_prefix &&
file_decryption_properties.aad_prefix().is_none() {
+ return Err(general_err!(
+ "Parquet file was encrypted with an AAD prefix that is not
stored in the file, \
+ but no AAD prefix was provided in the file decryption
properties"
+ ));
+ }
+ let decryptor = get_file_decryptor(
+ t_file_crypto_metadata.encryption_algorithm,
+ t_file_crypto_metadata.key_metadata,
+ file_decryption_properties,
+ )?;
+ let footer_decryptor = decryptor.get_footer_decryptor();
+ let aad_footer =
crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
+
+ decrypted_fmd_buf = footer_decryptor?
+ .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
+ .map_err(|_| {
+ general_err!(
+ "Provided footer key and AAD were unable to decrypt
parquet footer"
+ )
+ })?;
+
+ buf = &decrypted_fmd_buf;
+ file_decryptor = Some(decryptor);
+ } else {
+ return Err(general_err!(
+ "Parquet file has an encrypted footer but decryption
properties were not provided"
+ ));
+ }
+ }
+
+ let parquet_meta = parquet_metadata_from_bytes(buf)
+ .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
+
+ let ParquetMetaData {
+ mut file_metadata,
+ row_groups,
+ column_index: _,
+ offset_index: _,
+ file_decryptor: _,
+ } = parquet_meta;
+
+ // Take the encryption algorithm and footer signing key metadata as they
are no longer
+ // needed after this.
+ if let (Some(algo), Some(file_decryption_properties)) = (
+ file_metadata.encryption_algorithm.take(),
+ file_decryption_properties,
+ ) {
+ let footer_signing_key_metadata =
file_metadata.footer_signing_key_metadata.take();
+
+ // File has a plaintext footer but encryption algorithm is set
+ let file_decryptor_value = get_file_decryptor(
+ *algo,
+ footer_signing_key_metadata.as_deref(),
+ file_decryption_properties,
+ )?;
+ if file_decryption_properties.check_plaintext_footer_integrity() &&
!encrypted_footer {
+ file_decryptor_value.verify_plaintext_footer_signature(buf)?;
+ }
+ file_decryptor = Some(file_decryptor_value);
+ }
+
+ // decrypt column chunk info
+ let row_groups = row_groups
+ .into_iter()
+ .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
+ .collect::<Result<Vec<_>>>()?;
+
+ let metadata = ParquetMetaDataBuilder::new(file_metadata)
+ .set_row_groups(row_groups)
+ .set_file_decryptor(file_decryptor)
+ .build();
+
+ Ok(metadata)
+}
+
+fn get_file_decryptor(
+ encryption_algorithm: EncryptionAlgorithm,
+ footer_key_metadata: Option<&[u8]>,
+ file_decryption_properties: &FileDecryptionProperties,
+) -> Result<FileDecryptor> {
+ match encryption_algorithm {
+ EncryptionAlgorithm::AES_GCM_V1(algo) => {
+ let aad_file_unique = algo
+ .aad_file_unique
+ .ok_or_else(|| general_err!("AAD unique file identifier is not
set"))?;
+ let aad_prefix = if let Some(aad_prefix) =
file_decryption_properties.aad_prefix() {
+ aad_prefix.clone()
+ } else {
+ algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
+ };
+ let aad_file_unique = aad_file_unique.to_vec();
+
+ FileDecryptor::new(
+ file_decryption_properties,
+ footer_key_metadata,
+ aad_file_unique,
+ aad_prefix,
+ )
+ }
+ EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
+ "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
+ )),
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+struct ColumnMetaData<'a> {
+ encodings: EncodingMask,
+ codec: Compression,
+ num_values: i64,
+ total_uncompressed_size: i64,
+ total_compressed_size: i64,
+ data_page_offset: i64,
+ index_page_offset: Option<i64>,
+ dictionary_page_offset: Option<i64>,
+ statistics: Option<Statistics<'a>>,
+ encoding_stats: Option<Vec<PageEncodingStats>>,
+ bloom_filter_offset: Option<i64>,
+ bloom_filter_length: Option<i32>,
+ size_statistics: Option<SizeStatistics>,
+ geospatial_statistics: Option<GeospatialStatistics>,
+}
+
+fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
+ let mut prot = ThriftSliceInputProtocol::new(buf);
+
+ let mut encodings: Option<EncodingMask> = None;
+ let mut codec: Option<Compression> = None;
+ let mut num_values: Option<i64> = None;
+ let mut total_uncompressed_size: Option<i64> = None;
+ let mut total_compressed_size: Option<i64> = None;
+ let mut data_page_offset: Option<i64> = None;
+ let mut index_page_offset: Option<i64> = None;
+ let mut dictionary_page_offset: Option<i64> = None;
+ let mut statistics: Option<Statistics> = None;
+ let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
+ let mut bloom_filter_offset: Option<i64> = None;
+ let mut bloom_filter_length: Option<i32> = None;
+ let mut size_statistics: Option<SizeStatistics> = None;
+ let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+
+ // `ColumnMetaData`. Read inline for performance sake.
+ // struct ColumnMetaData {
+ // 1: required Type type
+ // 2: required list<Encoding> encodings
+ // 3: required list<string> path_in_schema
+ // 4: required CompressionCodec codec
+ // 5: required i64 num_values
+ // 6: required i64 total_uncompressed_size
+ // 7: required i64 total_compressed_size
+ // 8: optional list<KeyValue> key_value_metadata
+ // 9: required i64 data_page_offset
+ // 10: optional i64 index_page_offset
+ // 11: optional i64 dictionary_page_offset
+ // 12: optional Statistics statistics;
+ // 13: optional list<PageEncodingStats> encoding_stats;
+ // 14: optional i64 bloom_filter_offset;
+ // 15: optional i32 bloom_filter_length;
+ // 16: optional SizeStatistics size_statistics;
+ // 17: optional GeospatialStatistics geospatial_statistics;
+ // }
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ // 1: type is never used, we can use the column descriptor
+ 2 => {
+ let val = EncodingMask::read_thrift(&mut prot)?;
+ encodings = Some(val);
+ }
+ // 3: path_in_schema is redundant
+ 4 => {
+ codec = Some(Compression::read_thrift(&mut prot)?);
+ }
+ 5 => {
+ num_values = Some(i64::read_thrift(&mut prot)?);
+ }
+ 6 => {
+ total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
+ }
+ 7 => {
+ total_compressed_size = Some(i64::read_thrift(&mut prot)?);
+ }
+ // 8: we don't expose this key value
+ 9 => {
+ data_page_offset = Some(i64::read_thrift(&mut prot)?);
+ }
+ 10 => {
+ index_page_offset = Some(i64::read_thrift(&mut prot)?);
+ }
+ 11 => {
+ dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
+ }
+ 12 => {
+ statistics = Some(Statistics::read_thrift(&mut prot)?);
+ }
+ 13 => {
+ let val =
+ read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(&mut prot)?;
+ encoding_stats = Some(val);
+ }
+ 14 => {
+ bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
+ }
+ 15 => {
+ bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
+ }
+ 16 => {
+ let val = SizeStatistics::read_thrift(&mut prot)?;
+ size_statistics = Some(val);
+ }
+ 17 => {
+ let val = GeospatialStatistics::read_thrift(&mut prot)?;
+ geospatial_statistics = Some(val);
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+
+ let Some(encodings) = encodings else {
+ return Err(ParquetError::General(
+ "Required field encodings is missing".to_owned(),
+ ));
+ };
+ let Some(codec) = codec else {
+ return Err(ParquetError::General(
+ "Required field codec is missing".to_owned(),
+ ));
+ };
+ let Some(num_values) = num_values else {
+ return Err(ParquetError::General(
+ "Required field num_values is missing".to_owned(),
+ ));
+ };
+ let Some(total_uncompressed_size) = total_uncompressed_size else {
+ return Err(ParquetError::General(
+ "Required field total_uncompressed_size is missing".to_owned(),
+ ));
+ };
+ let Some(total_compressed_size) = total_compressed_size else {
+ return Err(ParquetError::General(
+ "Required field total_compressed_size is missing".to_owned(),
+ ));
+ };
+ let Some(data_page_offset) = data_page_offset else {
+ return Err(ParquetError::General(
+ "Required field data_page_offset is missing".to_owned(),
+ ));
+ };
+
+ Ok(ColumnMetaData {
+ encodings,
+ num_values,
+ codec,
+ total_uncompressed_size,
+ total_compressed_size,
+ data_page_offset,
+ index_page_offset,
+ dictionary_page_offset,
+ statistics,
+ encoding_stats,
+ bloom_filter_offset,
+ bloom_filter_length,
+ size_statistics,
+ geospatial_statistics,
+ })
+}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 87585f2d71..6c6e980b86 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -87,6 +87,8 @@
//!
//! * Same name, different struct
//! ```
+#[cfg(feature = "encryption")]
+mod encryption;
mod footer_tail;
mod memory;
mod parser;
@@ -95,14 +97,14 @@ pub(crate) mod reader;
pub(crate) mod thrift_gen;
mod writer;
-use crate::basic::PageType;
+use crate::basic::{EncodingMask, PageType};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptor;
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
-pub(crate) use crate::file::metadata::memory::HeapSize;
#[cfg(feature = "encryption")]
-use crate::file::metadata::thrift_gen::EncryptionAlgorithm;
+use crate::file::metadata::encryption::EncryptionAlgorithm;
+pub(crate) use crate::file::metadata::memory::HeapSize;
use crate::file::page_index::column_index::{ByteArrayColumnIndex,
PrimitiveColumnIndex};
use crate::file::page_index::{column_index::ColumnIndexMetaData,
offset_index::PageLocation};
use crate::file::statistics::Statistics;
@@ -788,7 +790,7 @@ impl RowGroupMetaDataBuilder {
#[derive(Debug, Clone, PartialEq)]
pub struct ColumnChunkMetaData {
column_descr: ColumnDescPtr,
- encodings: Vec<Encoding>,
+ encodings: EncodingMask,
file_path: Option<String>,
file_offset: i64,
num_values: i64,
@@ -968,7 +970,12 @@ impl ColumnChunkMetaData {
}
/// All encodings used for this column.
- pub fn encodings(&self) -> &Vec<Encoding> {
+ pub fn encodings(&self) -> impl Iterator<Item = Encoding> {
+ self.encodings.encodings()
+ }
+
+ /// All encodings used for this column, returned as a bitmask.
+ pub fn encodings_mask(&self) -> &EncodingMask {
&self.encodings
}
@@ -1148,7 +1155,7 @@ impl ColumnChunkMetaDataBuilder {
fn new(column_descr: ColumnDescPtr) -> Self {
Self(ColumnChunkMetaData {
column_descr,
- encodings: Vec::new(),
+ encodings: Default::default(),
file_path: None,
file_offset: 0,
num_values: 0,
@@ -1179,6 +1186,12 @@ impl ColumnChunkMetaDataBuilder {
/// Sets list of encodings for this column chunk.
pub fn set_encodings(mut self, encodings: Vec<Encoding>) -> Self {
+ self.0.encodings = EncodingMask::new_from_encodings(encodings.iter());
+ self
+ }
+
+ /// Sets the encodings mask for this column chunk.
+ pub fn set_encodings_mask(mut self, encodings: EncodingMask) -> Self {
self.0.encodings = encodings;
self
}
@@ -1704,9 +1717,10 @@ mod tests {
#[test]
fn test_column_chunk_metadata_thrift_conversion() {
let column_descr = get_test_schema_descr().column(0);
-
let col_metadata = ColumnChunkMetaData::builder(column_descr.clone())
- .set_encodings(vec![Encoding::PLAIN, Encoding::RLE])
+ .set_encodings_mask(EncodingMask::new_from_encodings(
+ [Encoding::PLAIN, Encoding::RLE].iter(),
+ ))
.set_file_path("file_path".to_owned())
.set_num_values(1000)
.set_compression(Compression::SNAPPY)
@@ -1858,9 +1872,9 @@ mod tests {
.build();
#[cfg(not(feature = "encryption"))]
- let base_expected_size = 2312;
+ let base_expected_size = 2248;
#[cfg(feature = "encryption")]
- let base_expected_size = 2480;
+ let base_expected_size = 2416;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -1889,9 +1903,9 @@ mod tests {
.build();
#[cfg(not(feature = "encryption"))]
- let bigger_expected_size = 2738;
+ let bigger_expected_size = 2674;
#[cfg(feature = "encryption")]
- let bigger_expected_size = 2906;
+ let bigger_expected_size = 2842;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
diff --git a/parquet/src/file/metadata/parser.rs
b/parquet/src/file/metadata/parser.rs
index 223ea3484c..1e30093909 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -72,7 +72,7 @@ mod inner {
encrypted_footer: bool,
) -> Result<ParquetMetaData> {
if encrypted_footer || self.file_decryption_properties.is_some() {
-
crate::file::metadata::thrift_gen::parquet_metadata_with_encryption(
+
crate::file::metadata::encryption::parquet_metadata_with_encryption(
self.file_decryption_properties.as_deref(),
encrypted_footer,
buf,
diff --git a/parquet/src/file/metadata/thrift_gen.rs
b/parquet/src/file/metadata/thrift_gen.rs
index a107acfad8..36fbe42a90 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift_gen.rs
@@ -20,9 +20,14 @@
use std::io::Write;
use std::sync::Arc;
+#[cfg(feature = "encryption")]
+use crate::file::{
+ column_crypto_metadata::ColumnCryptoMetaData,
metadata::encryption::EncryptionAlgorithm,
+};
use crate::{
basic::{
- ColumnOrder, Compression, ConvertedType, Encoding, LogicalType,
PageType, Repetition, Type,
+ ColumnOrder, Compression, ConvertedType, Encoding, EncodingMask,
LogicalType, PageType,
+ Repetition, Type,
},
data_type::{ByteArray, FixedLenByteArray, Int96},
errors::{ParquetError, Result},
@@ -44,12 +49,6 @@ use crate::{
thrift_struct,
util::bit_util::FromBytes,
};
-#[cfg(feature = "encryption")]
-use crate::{
- encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
- file::{column_crypto_metadata::ColumnCryptoMetaData, metadata::HeapSize},
- thrift_union,
-};
// this needs to be visible to the schema conversion code
thrift_struct!(
@@ -106,118 +105,8 @@ pub(crate) struct Statistics<'a> {
}
);
-// TODO(ets): move a lot of the encryption stuff to its own module
-#[cfg(feature = "encryption")]
-thrift_struct!(
-pub(crate) struct AesGcmV1 {
- /// AAD prefix
- 1: optional binary aad_prefix
-
- /// Unique file identifier part of AAD suffix
- 2: optional binary aad_file_unique
-
- /// In files encrypted with AAD prefix without storing it,
- /// readers must supply the prefix
- 3: optional bool supply_aad_prefix
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for AesGcmV1 {
- fn heap_size(&self) -> usize {
- self.aad_prefix.heap_size()
- + self.aad_file_unique.heap_size()
- + self.supply_aad_prefix.heap_size()
- }
-}
-
-#[cfg(feature = "encryption")]
thrift_struct!(
-pub(crate) struct AesGcmCtrV1 {
- /// AAD prefix
- 1: optional binary aad_prefix
-
- /// Unique file identifier part of AAD suffix
- 2: optional binary aad_file_unique
-
- /// In files encrypted with AAD prefix without storing it,
- /// readers must supply the prefix
- 3: optional bool supply_aad_prefix
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for AesGcmCtrV1 {
- fn heap_size(&self) -> usize {
- self.aad_prefix.heap_size()
- + self.aad_file_unique.heap_size()
- + self.supply_aad_prefix.heap_size()
- }
-}
-
-#[cfg(feature = "encryption")]
-thrift_union!(
-union EncryptionAlgorithm {
- 1: (AesGcmV1) AES_GCM_V1
- 2: (AesGcmCtrV1) AES_GCM_CTR_V1
-}
-);
-
-#[cfg(feature = "encryption")]
-impl HeapSize for EncryptionAlgorithm {
- fn heap_size(&self) -> usize {
- match self {
- Self::AES_GCM_V1(gcm) => gcm.heap_size(),
- Self::AES_GCM_CTR_V1(gcm_ctr) => gcm_ctr.heap_size(),
- }
- }
-}
-
-#[cfg(feature = "encryption")]
-thrift_struct!(
-/// Crypto metadata for files with encrypted footer
-pub(crate) struct FileCryptoMetaData<'a> {
- /// Encryption algorithm. This field is only used for files
- /// with encrypted footer. Files with plaintext footer store algorithm id
- /// inside footer (FileMetaData structure).
- 1: required EncryptionAlgorithm encryption_algorithm
-
- /// Retrieval metadata of key used for encryption of footer,
- /// and (possibly) columns.
- 2: optional binary<'a> key_metadata
-}
-);
-
-// the following are only used internally so are private
-#[cfg(feature = "encryption")]
-type CompressionCodec = Compression;
-#[cfg(feature = "encryption")]
-thrift_struct!(
-struct ColumnMetaData<'a> {
- 1: required Type r#type
- 2: required list<Encoding> encodings
- // we don't expose path_in_schema so skip
- //3: required list<string> path_in_schema
- 4: required CompressionCodec codec
- 5: required i64 num_values
- 6: required i64 total_uncompressed_size
- 7: required i64 total_compressed_size
- // we don't expose key_value_metadata so skip
- //8: optional list<KeyValue> key_value_metadata
- 9: required i64 data_page_offset
- 10: optional i64 index_page_offset
- 11: optional i64 dictionary_page_offset
- 12: optional Statistics<'a> statistics
- 13: optional list<PageEncodingStats> encoding_stats;
- 14: optional i64 bloom_filter_offset;
- 15: optional i32 bloom_filter_length;
- 16: optional SizeStatistics size_statistics;
- 17: optional GeospatialStatistics geospatial_statistics;
-}
-);
-
-thrift_struct!(
-struct BoundingBox {
+pub(super) struct BoundingBox {
1: required double xmin;
2: required double xmax;
3: required double ymin;
@@ -230,21 +119,21 @@ struct BoundingBox {
);
thrift_struct!(
-struct GeospatialStatistics {
+pub(super) struct GeospatialStatistics {
1: optional BoundingBox bbox;
2: optional list<i32> geospatial_types;
}
);
thrift_struct!(
-struct SizeStatistics {
+pub(super) struct SizeStatistics {
1: optional i64 unencoded_byte_array_data_bytes;
2: optional list<i64> repetition_level_histogram;
3: optional list<i64> definition_level_histogram;
}
);
-fn convert_geo_stats(
+pub(super) fn convert_geo_stats(
stats: Option<GeospatialStatistics>,
) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
stats.map(|st| {
@@ -434,269 +323,6 @@ pub(crate) fn convert_stats(
})
}
-#[cfg(feature = "encryption")]
-fn row_group_from_encrypted_thrift(
- mut rg: RowGroupMetaData,
- decryptor: Option<&FileDecryptor>,
-) -> Result<RowGroupMetaData> {
- let schema_descr = rg.schema_descr;
-
- if schema_descr.num_columns() != rg.columns.len() {
- return Err(general_err!(
- "Column count mismatch. Schema has {} columns while Row Group has
{}",
- schema_descr.num_columns(),
- rg.columns.len()
- ));
- }
- let total_byte_size = rg.total_byte_size;
- let num_rows = rg.num_rows;
- let mut columns = vec![];
-
- for (i, (mut c, d)) in rg
- .columns
- .drain(0..)
- .zip(schema_descr.columns())
- .enumerate()
- {
- // Read encrypted metadata if it's present and we have a decryptor.
- if let (true, Some(decryptor)) =
(c.encrypted_column_metadata.is_some(), decryptor) {
- let column_decryptor = match c.crypto_metadata() {
- None => {
- return Err(general_err!(
- "No crypto_metadata is set for column '{}', which has
encrypted metadata",
- d.path().string()
- ));
- }
-
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(crypto_metadata)) => {
- let column_name = crypto_metadata.path_in_schema.join(".");
- decryptor.get_column_metadata_decryptor(
- column_name.as_str(),
- crypto_metadata.key_metadata.as_deref(),
- )?
- }
- Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY) => {
- decryptor.get_footer_decryptor()?
- }
- };
-
- let column_aad = crate::encryption::modules::create_module_aad(
- decryptor.file_aad(),
- crate::encryption::modules::ModuleType::ColumnMetaData,
- rg.ordinal.unwrap() as usize,
- i,
- None,
- )?;
-
- // Take the encrypted column metadata as it is no longer needed.
- let encrypted_column_metadata = c.encrypted_column_metadata.take();
- let buf = encrypted_column_metadata.unwrap();
- let decrypted_cc_buf = column_decryptor
- .decrypt(&buf, column_aad.as_ref())
- .map_err(|_| {
- general_err!(
- "Unable to decrypt column '{}', perhaps the column key
is wrong?",
- d.path().string()
- )
- })?;
-
- // parse decrypted buffer and then replace fields in 'c'
- let mut prot =
ThriftSliceInputProtocol::new(decrypted_cc_buf.as_slice());
- let col_meta = ColumnMetaData::read_thrift(&mut prot)?;
-
- let (
- unencoded_byte_array_data_bytes,
- repetition_level_histogram,
- definition_level_histogram,
- ) = if let Some(size_stats) = col_meta.size_statistics {
- (
- size_stats.unencoded_byte_array_data_bytes,
- size_stats.repetition_level_histogram,
- size_stats.definition_level_histogram,
- )
- } else {
- (None, None, None)
- };
-
- let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
- let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
-
- c.encodings = col_meta.encodings;
- c.compression = col_meta.codec;
- c.num_values = col_meta.num_values;
- c.total_uncompressed_size = col_meta.total_uncompressed_size;
- c.total_compressed_size = col_meta.total_compressed_size;
- c.data_page_offset = col_meta.data_page_offset;
- c.index_page_offset = col_meta.index_page_offset;
- c.dictionary_page_offset = col_meta.dictionary_page_offset;
- c.statistics = convert_stats(col_meta.r#type,
col_meta.statistics)?;
- c.encoding_stats = col_meta.encoding_stats;
- c.bloom_filter_offset = col_meta.bloom_filter_offset;
- c.bloom_filter_length = col_meta.bloom_filter_length;
- c.unencoded_byte_array_data_bytes =
unencoded_byte_array_data_bytes;
- c.repetition_level_histogram = repetition_level_histogram;
- c.definition_level_histogram = definition_level_histogram;
- c.geo_statistics =
convert_geo_stats(col_meta.geospatial_statistics);
-
- columns.push(c);
- } else {
- columns.push(c);
- }
- }
-
- let sorting_columns = rg.sorting_columns;
- let file_offset = rg.file_offset;
- let ordinal = rg.ordinal;
-
- Ok(RowGroupMetaData {
- columns,
- num_rows,
- sorting_columns,
- total_byte_size,
- schema_descr,
- file_offset,
- ordinal,
- })
-}
-
-#[cfg(feature = "encryption")]
-/// Decodes [`ParquetMetaData`] from the provided bytes, handling metadata
that may be encrypted.
-///
-/// Typically this is used to decode the metadata from the end of a parquet
-/// file. The format of `buf` is the Thrift compact binary protocol, as
specified
-/// by the [Parquet Spec]. Buffer can be encrypted with AES GCM or AES CTR
-/// ciphers as specfied in the [Parquet Encryption Spec].
-///
-/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
-/// [Parquet Encryption Spec]:
https://parquet.apache.org/docs/file-format/data-pages/encryption/
-pub(crate) fn parquet_metadata_with_encryption(
- file_decryption_properties: Option<&FileDecryptionProperties>,
- encrypted_footer: bool,
- buf: &[u8],
-) -> Result<ParquetMetaData> {
- use crate::file::metadata::ParquetMetaDataBuilder;
-
- let mut buf = buf;
- let mut file_decryptor = None;
- let decrypted_fmd_buf;
-
- if encrypted_footer {
- let mut prot = ThriftSliceInputProtocol::new(buf);
- if let Some(file_decryption_properties) = file_decryption_properties {
- let t_file_crypto_metadata: FileCryptoMetaData =
- FileCryptoMetaData::read_thrift(&mut prot)
- .map_err(|e| general_err!("Could not parse crypto
metadata: {}", e))?;
- let supply_aad_prefix = match
&t_file_crypto_metadata.encryption_algorithm {
- EncryptionAlgorithm::AES_GCM_V1(algo) =>
algo.supply_aad_prefix,
- _ => Some(false),
- }
- .unwrap_or(false);
- if supply_aad_prefix &&
file_decryption_properties.aad_prefix().is_none() {
- return Err(general_err!(
- "Parquet file was encrypted with an AAD prefix that is not
stored in the file, \
- but no AAD prefix was provided in the file decryption
properties"
- ));
- }
- let decryptor = get_file_decryptor(
- t_file_crypto_metadata.encryption_algorithm,
- t_file_crypto_metadata.key_metadata,
- file_decryption_properties,
- )?;
- let footer_decryptor = decryptor.get_footer_decryptor();
- let aad_footer =
crate::encryption::modules::create_footer_aad(decryptor.file_aad())?;
-
- decrypted_fmd_buf = footer_decryptor?
- .decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())
- .map_err(|_| {
- general_err!(
- "Provided footer key and AAD were unable to decrypt
parquet footer"
- )
- })?;
-
- buf = &decrypted_fmd_buf;
- file_decryptor = Some(decryptor);
- } else {
- return Err(general_err!(
- "Parquet file has an encrypted footer but decryption
properties were not provided"
- ));
- }
- }
-
- let parquet_meta = parquet_metadata_from_bytes(buf)
- .map_err(|e| general_err!("Could not parse metadata: {}", e))?;
-
- let ParquetMetaData {
- mut file_metadata,
- row_groups,
- column_index: _,
- offset_index: _,
- file_decryptor: _,
- } = parquet_meta;
-
- // Take the encryption algorithm and footer signing key metadata as they
are no longer
- // needed after this.
- if let (Some(algo), Some(file_decryption_properties)) = (
- file_metadata.encryption_algorithm.take(),
- file_decryption_properties,
- ) {
- let footer_signing_key_metadata =
file_metadata.footer_signing_key_metadata.take();
-
- // File has a plaintext footer but encryption algorithm is set
- let file_decryptor_value = get_file_decryptor(
- *algo,
- footer_signing_key_metadata.as_deref(),
- file_decryption_properties,
- )?;
- if file_decryption_properties.check_plaintext_footer_integrity() &&
!encrypted_footer {
- file_decryptor_value.verify_plaintext_footer_signature(buf)?;
- }
- file_decryptor = Some(file_decryptor_value);
- }
-
- // decrypt column chunk info
- let row_groups = row_groups
- .into_iter()
- .map(|rg| row_group_from_encrypted_thrift(rg, file_decryptor.as_ref()))
- .collect::<Result<Vec<_>>>()?;
-
- let metadata = ParquetMetaDataBuilder::new(file_metadata)
- .set_row_groups(row_groups)
- .set_file_decryptor(file_decryptor)
- .build();
-
- Ok(metadata)
-}
-
-#[cfg(feature = "encryption")]
-fn get_file_decryptor(
- encryption_algorithm: EncryptionAlgorithm,
- footer_key_metadata: Option<&[u8]>,
- file_decryption_properties: &FileDecryptionProperties,
-) -> Result<FileDecryptor> {
- match encryption_algorithm {
- EncryptionAlgorithm::AES_GCM_V1(algo) => {
- let aad_file_unique = algo
- .aad_file_unique
- .ok_or_else(|| general_err!("AAD unique file identifier is not
set"))?;
- let aad_prefix = if let Some(aad_prefix) =
file_decryption_properties.aad_prefix() {
- aad_prefix.clone()
- } else {
- algo.aad_prefix.map(|v| v.to_vec()).unwrap_or_default()
- };
- let aad_file_unique = aad_file_unique.to_vec();
-
- FileDecryptor::new(
- file_decryption_properties,
- footer_key_metadata,
- aad_file_unique,
- aad_prefix,
- )
- }
- EncryptionAlgorithm::AES_GCM_CTR_V1(_) => Err(nyi_err!(
- "The AES_GCM_CTR_V1 encryption algorithm is not yet supported"
- )),
- }
-}
-
// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait
because
// these are all internal and operate on slices.
fn read_column_chunk<'a>(
@@ -716,7 +342,7 @@ fn read_column_chunk<'a>(
let mut encrypted_column_metadata: Option<&[u8]> = None;
// ColumnMetaData
- let mut encodings: Option<Vec<Encoding>> = None;
+ let mut encodings: Option<EncodingMask> = None;
let mut codec: Option<Compression> = None;
let mut num_values: Option<i64> = None;
let mut total_uncompressed_size: Option<i64> = None;
@@ -785,8 +411,7 @@ fn read_column_chunk<'a>(
match field_ident.id {
// 1: type is never used, we can use the column
descriptor
2 => {
- let val =
- read_thrift_vec::<Encoding,
ThriftSliceInputProtocol>(&mut *prot)?;
+ let val = EncodingMask::read_thrift(&mut *prot)?;
encodings = Some(val);
}
// 3: path_in_schema is redundant
@@ -1637,7 +1262,10 @@ pub(crate) fn serialize_column_meta_data<W: Write>(
use crate::file::statistics::page_stats_to_thrift;
column_chunk.column_type().write_thrift_field(w, 1, 0)?;
- column_chunk.encodings.write_thrift_field(w, 2, 1)?;
+ column_chunk
+ .encodings()
+ .collect::<Vec<_>>()
+ .write_thrift_field(w, 2, 1)?;
let path = column_chunk.column_descr.path().parts();
let path: Vec<&str> = path.iter().map(|v| v.as_str()).collect();
path.write_thrift_field(w, 3, 2)?;
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index 35c69935a8..398f5419c6 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -31,7 +31,7 @@ use crate::{
modules::{ModuleType, create_footer_aad, create_module_aad},
},
file::column_crypto_metadata::ColumnCryptoMetaData,
- file::metadata::thrift_gen::{AesGcmV1, EncryptionAlgorithm,
FileCryptoMetaData},
+ file::metadata::encryption::{AesGcmV1, EncryptionAlgorithm,
FileCryptoMetaData},
};
use crate::{errors::Result,
file::page_index::column_index::ColumnIndexMetaData};
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index a1008c9509..d261d9ec65 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -706,7 +706,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
let map_offset = |x| x - src_offset + write_offset as i64;
let mut builder =
ColumnChunkMetaData::builder(metadata.column_descr_ptr())
.set_compression(metadata.compression())
- .set_encodings(metadata.encodings().clone())
+ .set_encodings_mask(*metadata.encodings_mask())
.set_total_compressed_size(metadata.compressed_size())
.set_total_uncompressed_size(metadata.uncompressed_size())
.set_num_values(metadata.num_values())
@@ -2390,6 +2390,7 @@ mod tests {
.row_group(0)
.column(x)
.encodings()
+ .collect::<Vec<_>>()
.contains(&Encoding::BYTE_STREAM_SPLIT)
);
};
diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs
index 0cc5df59f3..838d84a1b1 100644
--- a/parquet/src/schema/printer.rs
+++ b/parquet/src/schema/printer.rs
@@ -171,11 +171,7 @@ fn print_row_group_metadata(out: &mut dyn io::Write,
rg_metadata: &RowGroupMetaD
fn print_column_chunk_metadata(out: &mut dyn io::Write, cc_metadata:
&ColumnChunkMetaData) {
writeln!(out, "column type: {}", cc_metadata.column_type());
writeln!(out, "column path: {}", cc_metadata.column_path());
- let encoding_strs: Vec<_> = cc_metadata
- .encodings()
- .iter()
- .map(|e| format!("{e}"))
- .collect();
+ let encoding_strs: Vec<_> = cc_metadata.encodings().map(|e|
format!("{e}")).collect();
writeln!(out, "encodings: {}", encoding_strs.join(" "));
let file_path_str = cc_metadata.file_path().unwrap_or("N/A");
writeln!(out, "file path: {file_path_str}");