This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c94698c561 [thrift-remodel] Refactor Parquet Thrift code into new
`thrift` module (#8599)
c94698c561 is described below
commit c94698c561bb76a1d5bd0532c4143718a40f9e86
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Oct 14 10:12:22 2025 -0700
[thrift-remodel] Refactor Parquet Thrift code into new `thrift` module
(#8599)
# Which issue does this PR close?
- Part of #5853.
# Rationale for this change
Earlier work had introduced some code duplication dealing with decoding
of the `ColumnMetaData` Thrift struct. This PR addresses that, and also
addresses earlier review comments
(https://github.com/apache/arrow-rs/pull/8587#discussion_r2421703047).
# What changes are included in this PR?
This PR changes how some metadata structures are parsed, utilizing a
flag for required fields rather than relying on `Option::is_some`. This
allows for passing around partially initialized `ColumnChunkMetaData`
structs which in turn allows for sharing of the `ColumnMetaData` parsing
code between the encrypted and unencrypted code paths.
This PR also moves the `file/metadata/{encryption,thrift_gen}.rs` files
to a new `file::metadata::thrift` module.
# Are these changes tested?
Covered by existing tests.
# Are there any user-facing changes?
No, only makes changes to private APIs.
---
parquet/src/arrow/arrow_writer/mod.rs | 2 +-
parquet/src/column/page.rs | 6 +-
parquet/src/column/page_encryption.rs | 2 +-
parquet/src/column/page_encryption_disabled.rs | 2 +-
parquet/src/file/metadata/mod.rs | 15 +-
parquet/src/file/metadata/parser.rs | 4 +-
.../src/file/metadata/{ => thrift}/encryption.rs | 218 +------
.../file/metadata/{thrift_gen.rs => thrift/mod.rs} | 640 +++++++++------------
parquet/src/file/metadata/writer.rs | 6 +-
parquet/src/file/serialized_reader.rs | 4 +-
parquet/src/file/statistics.rs | 2 +-
parquet/src/file/writer.rs | 2 +-
parquet/src/schema/types.rs | 4 +-
13 files changed, 307 insertions(+), 600 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 3625c3e844..66801d2d38 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1532,7 +1532,7 @@ mod tests {
use crate::arrow::ARROW_SCHEMA_META_KEY;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
use crate::column::page::{Page, PageReader};
- use crate::file::metadata::thrift_gen::PageHeader;
+ use crate::file::metadata::thrift::PageHeader;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::reader::SerializedPageReader;
use crate::parquet_thrift::{ReadThrift, ThriftSliceInputProtocol};
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index abce04bb5e..23517f05df 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -21,7 +21,7 @@ use bytes::Bytes;
use crate::basic::{Encoding, PageType};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::thrift_gen::{
+use crate::file::metadata::thrift::{
DataPageHeader, DataPageHeaderV2, DictionaryPageHeader, PageHeader,
};
use crate::file::statistics::{Statistics, page_stats_to_thrift};
@@ -345,11 +345,11 @@ pub struct PageMetadata {
pub is_dict: bool,
}
-impl TryFrom<&crate::file::metadata::thrift_gen::PageHeader> for PageMetadata {
+impl TryFrom<&crate::file::metadata::thrift::PageHeader> for PageMetadata {
type Error = ParquetError;
fn try_from(
- value: &crate::file::metadata::thrift_gen::PageHeader,
+ value: &crate::file::metadata::thrift::PageHeader,
) -> std::result::Result<Self, Self::Error> {
match value.r#type {
PageType::DATA_PAGE => {
diff --git a/parquet/src/column/page_encryption.rs
b/parquet/src/column/page_encryption.rs
index 032579ab63..26df75900c 100644
--- a/parquet/src/column/page_encryption.rs
+++ b/parquet/src/column/page_encryption.rs
@@ -22,7 +22,7 @@ use crate::encryption::encrypt::{FileEncryptor,
encrypt_thrift_object};
use crate::encryption::modules::{ModuleType, create_module_aad};
use crate::errors::ParquetError;
use crate::errors::Result;
-use crate::file::metadata::thrift_gen::PageHeader;
+use crate::file::metadata::thrift::PageHeader;
use bytes::Bytes;
use std::io::Write;
use std::sync::Arc;
diff --git a/parquet/src/column/page_encryption_disabled.rs
b/parquet/src/column/page_encryption_disabled.rs
index 347024f7f2..71f25862cc 100644
--- a/parquet/src/column/page_encryption_disabled.rs
+++ b/parquet/src/column/page_encryption_disabled.rs
@@ -17,7 +17,7 @@
use crate::column::page::CompressedPage;
use crate::errors::Result;
-use crate::file::metadata::thrift_gen::PageHeader;
+use crate::file::metadata::thrift::PageHeader;
use std::io::Write;
#[derive(Debug)]
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 6c6e980b86..03975d6394 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -87,14 +87,12 @@
//!
//! * Same name, different struct
//! ```
-#[cfg(feature = "encryption")]
-mod encryption;
mod footer_tail;
mod memory;
mod parser;
mod push_decoder;
pub(crate) mod reader;
-pub(crate) mod thrift_gen;
+pub(crate) mod thrift;
mod writer;
use crate::basic::{EncodingMask, PageType};
@@ -102,9 +100,9 @@ use crate::basic::{EncodingMask, PageType};
use crate::encryption::decrypt::FileDecryptor;
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::ColumnCryptoMetaData;
-#[cfg(feature = "encryption")]
-use crate::file::metadata::encryption::EncryptionAlgorithm;
pub(crate) use crate::file::metadata::memory::HeapSize;
+#[cfg(feature = "encryption")]
+use crate::file::metadata::thrift::encryption::EncryptionAlgorithm;
use crate::file::page_index::column_index::{ByteArrayColumnIndex,
PrimitiveColumnIndex};
use crate::file::page_index::{column_index::ColumnIndexMetaData,
offset_index::PageLocation};
use crate::file::statistics::Statistics;
@@ -784,6 +782,11 @@ impl RowGroupMetaDataBuilder {
Ok(self.0)
}
+
+ /// Build row group metadata without validation.
+ pub(super) fn build_unchecked(self) -> RowGroupMetaData {
+ self.0
+ }
}
/// Metadata for a column chunk.
@@ -1600,7 +1603,7 @@ impl OffsetIndexBuilder {
mod tests {
use super::*;
use crate::basic::{PageType, SortOrder};
- use crate::file::metadata::thrift_gen::tests::{read_column_chunk,
read_row_group};
+ use crate::file::metadata::thrift::tests::{read_column_chunk,
read_row_group};
#[test]
fn test_row_group_metadata_thrift_conversion() {
diff --git a/parquet/src/file/metadata/parser.rs
b/parquet/src/file/metadata/parser.rs
index 1e30093909..5929e6440c 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -21,7 +21,7 @@
//! into the corresponding Rust structures
use crate::errors::ParquetError;
-use crate::file::metadata::thrift_gen::parquet_metadata_from_bytes;
+use crate::file::metadata::thrift::parquet_metadata_from_bytes;
use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy,
ParquetMetaData};
use crate::file::page_index::column_index::ColumnIndexMetaData;
@@ -72,7 +72,7 @@ mod inner {
encrypted_footer: bool,
) -> Result<ParquetMetaData> {
if encrypted_footer || self.file_decryption_properties.is_some() {
-
crate::file::metadata::encryption::parquet_metadata_with_encryption(
+
crate::file::metadata::thrift::encryption::parquet_metadata_with_encryption(
self.file_decryption_properties.as_deref(),
encrypted_footer,
buf,
diff --git a/parquet/src/file/metadata/encryption.rs
b/parquet/src/file/metadata/thrift/encryption.rs
similarity index 58%
rename from parquet/src/file/metadata/encryption.rs
rename to parquet/src/file/metadata/thrift/encryption.rs
index 4c97640b67..ae7d5bbc2b 100644
--- a/parquet/src/file/metadata/encryption.rs
+++ b/parquet/src/file/metadata/thrift/encryption.rs
@@ -15,28 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-// a collection of generated structs used to parse thrift metadata
+//! Encryption support for Thrift serialization
use std::io::Write;
use crate::{
- basic::{Compression, EncodingMask},
encryption::decrypt::{FileDecryptionProperties, FileDecryptor},
errors::{ParquetError, Result},
file::{
column_crypto_metadata::ColumnCryptoMetaData,
metadata::{
- HeapSize, LevelHistogram, PageEncodingStats, ParquetMetaData,
RowGroupMetaData,
- thrift_gen::{
- GeospatialStatistics, SizeStatistics, Statistics,
convert_geo_stats, convert_stats,
- parquet_metadata_from_bytes,
- },
+ HeapSize, ParquetMetaData, RowGroupMetaData,
+ thrift::{parquet_metadata_from_bytes, read_column_metadata,
validate_column_metadata},
},
},
parquet_thrift::{
ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol,
ThriftCompactOutputProtocol, ThriftSliceInputProtocol, WriteThrift,
WriteThriftField,
- read_thrift_vec,
},
thrift_struct, thrift_union,
};
@@ -180,41 +175,9 @@ fn row_group_from_encrypted_thrift(
})?;
// parse decrypted buffer and then replace fields in 'c'
- let col_meta = read_column_metadata(decrypted_cc_buf.as_slice())?;
-
- let (
- unencoded_byte_array_data_bytes,
- repetition_level_histogram,
- definition_level_histogram,
- ) = if let Some(size_stats) = col_meta.size_statistics {
- (
- size_stats.unencoded_byte_array_data_bytes,
- size_stats.repetition_level_histogram,
- size_stats.definition_level_histogram,
- )
- } else {
- (None, None, None)
- };
-
- let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
- let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
-
- c.encodings = col_meta.encodings;
- c.compression = col_meta.codec;
- c.num_values = col_meta.num_values;
- c.total_uncompressed_size = col_meta.total_uncompressed_size;
- c.total_compressed_size = col_meta.total_compressed_size;
- c.data_page_offset = col_meta.data_page_offset;
- c.index_page_offset = col_meta.index_page_offset;
- c.dictionary_page_offset = col_meta.dictionary_page_offset;
- c.statistics = convert_stats(d.physical_type(),
col_meta.statistics)?;
- c.encoding_stats = col_meta.encoding_stats;
- c.bloom_filter_offset = col_meta.bloom_filter_offset;
- c.bloom_filter_length = col_meta.bloom_filter_length;
- c.unencoded_byte_array_data_bytes =
unencoded_byte_array_data_bytes;
- c.repetition_level_histogram = repetition_level_histogram;
- c.definition_level_histogram = definition_level_histogram;
- c.geo_statistics =
convert_geo_stats(col_meta.geospatial_statistics);
+ let mut prot = ThriftSliceInputProtocol::new(&decrypted_cc_buf);
+ let mask = read_column_metadata(&mut prot, &mut c)?;
+ validate_column_metadata(mask)?;
columns.push(c);
} else {
@@ -373,172 +336,3 @@ fn get_file_decryptor(
)),
}
}
-
-#[derive(Clone, Debug, Eq, PartialEq)]
-struct ColumnMetaData<'a> {
- encodings: EncodingMask,
- codec: Compression,
- num_values: i64,
- total_uncompressed_size: i64,
- total_compressed_size: i64,
- data_page_offset: i64,
- index_page_offset: Option<i64>,
- dictionary_page_offset: Option<i64>,
- statistics: Option<Statistics<'a>>,
- encoding_stats: Option<Vec<PageEncodingStats>>,
- bloom_filter_offset: Option<i64>,
- bloom_filter_length: Option<i32>,
- size_statistics: Option<SizeStatistics>,
- geospatial_statistics: Option<GeospatialStatistics>,
-}
-
-fn read_column_metadata<'a>(buf: &'a [u8]) -> Result<ColumnMetaData<'a>> {
- let mut prot = ThriftSliceInputProtocol::new(buf);
-
- let mut encodings: Option<EncodingMask> = None;
- let mut codec: Option<Compression> = None;
- let mut num_values: Option<i64> = None;
- let mut total_uncompressed_size: Option<i64> = None;
- let mut total_compressed_size: Option<i64> = None;
- let mut data_page_offset: Option<i64> = None;
- let mut index_page_offset: Option<i64> = None;
- let mut dictionary_page_offset: Option<i64> = None;
- let mut statistics: Option<Statistics> = None;
- let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
- let mut bloom_filter_offset: Option<i64> = None;
- let mut bloom_filter_length: Option<i32> = None;
- let mut size_statistics: Option<SizeStatistics> = None;
- let mut geospatial_statistics: Option<GeospatialStatistics> = None;
-
- // `ColumnMetaData`. Read inline for performance sake.
- // struct ColumnMetaData {
- // 1: required Type type
- // 2: required list<Encoding> encodings
- // 3: required list<string> path_in_schema
- // 4: required CompressionCodec codec
- // 5: required i64 num_values
- // 6: required i64 total_uncompressed_size
- // 7: required i64 total_compressed_size
- // 8: optional list<KeyValue> key_value_metadata
- // 9: required i64 data_page_offset
- // 10: optional i64 index_page_offset
- // 11: optional i64 dictionary_page_offset
- // 12: optional Statistics statistics;
- // 13: optional list<PageEncodingStats> encoding_stats;
- // 14: optional i64 bloom_filter_offset;
- // 15: optional i32 bloom_filter_length;
- // 16: optional SizeStatistics size_statistics;
- // 17: optional GeospatialStatistics geospatial_statistics;
- // }
- let mut last_field_id = 0i16;
- loop {
- let field_ident = prot.read_field_begin(last_field_id)?;
- if field_ident.field_type == FieldType::Stop {
- break;
- }
- match field_ident.id {
- // 1: type is never used, we can use the column descriptor
- 2 => {
- let val = EncodingMask::read_thrift(&mut prot)?;
- encodings = Some(val);
- }
- // 3: path_in_schema is redundant
- 4 => {
- codec = Some(Compression::read_thrift(&mut prot)?);
- }
- 5 => {
- num_values = Some(i64::read_thrift(&mut prot)?);
- }
- 6 => {
- total_uncompressed_size = Some(i64::read_thrift(&mut prot)?);
- }
- 7 => {
- total_compressed_size = Some(i64::read_thrift(&mut prot)?);
- }
- // 8: we don't expose this key value
- 9 => {
- data_page_offset = Some(i64::read_thrift(&mut prot)?);
- }
- 10 => {
- index_page_offset = Some(i64::read_thrift(&mut prot)?);
- }
- 11 => {
- dictionary_page_offset = Some(i64::read_thrift(&mut prot)?);
- }
- 12 => {
- statistics = Some(Statistics::read_thrift(&mut prot)?);
- }
- 13 => {
- let val =
- read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(&mut prot)?;
- encoding_stats = Some(val);
- }
- 14 => {
- bloom_filter_offset = Some(i64::read_thrift(&mut prot)?);
- }
- 15 => {
- bloom_filter_length = Some(i32::read_thrift(&mut prot)?);
- }
- 16 => {
- let val = SizeStatistics::read_thrift(&mut prot)?;
- size_statistics = Some(val);
- }
- 17 => {
- let val = GeospatialStatistics::read_thrift(&mut prot)?;
- geospatial_statistics = Some(val);
- }
- _ => {
- prot.skip(field_ident.field_type)?;
- }
- };
- last_field_id = field_ident.id;
- }
-
- let Some(encodings) = encodings else {
- return Err(ParquetError::General(
- "Required field encodings is missing".to_owned(),
- ));
- };
- let Some(codec) = codec else {
- return Err(ParquetError::General(
- "Required field codec is missing".to_owned(),
- ));
- };
- let Some(num_values) = num_values else {
- return Err(ParquetError::General(
- "Required field num_values is missing".to_owned(),
- ));
- };
- let Some(total_uncompressed_size) = total_uncompressed_size else {
- return Err(ParquetError::General(
- "Required field total_uncompressed_size is missing".to_owned(),
- ));
- };
- let Some(total_compressed_size) = total_compressed_size else {
- return Err(ParquetError::General(
- "Required field total_compressed_size is missing".to_owned(),
- ));
- };
- let Some(data_page_offset) = data_page_offset else {
- return Err(ParquetError::General(
- "Required field data_page_offset is missing".to_owned(),
- ));
- };
-
- Ok(ColumnMetaData {
- encodings,
- num_values,
- codec,
- total_uncompressed_size,
- total_compressed_size,
- data_page_offset,
- index_page_offset,
- dictionary_page_offset,
- statistics,
- encoding_stats,
- bloom_filter_offset,
- bloom_filter_length,
- size_statistics,
- geospatial_statistics,
- })
-}
diff --git a/parquet/src/file/metadata/thrift_gen.rs
b/parquet/src/file/metadata/thrift/mod.rs
similarity index 76%
rename from parquet/src/file/metadata/thrift_gen.rs
rename to parquet/src/file/metadata/thrift/mod.rs
index 36fbe42a90..332f768991 100644
--- a/parquet/src/file/metadata/thrift_gen.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -15,14 +15,23 @@
// specific language governing permissions and limitations
// under the License.
-// a collection of generated structs used to parse thrift metadata
+//! This module is the bridge between a Parquet file's thrift encoded metadata
+//! and this crate's [Parquet metadata API]. It contains objects and functions
used
+//! to serialize/deserialize metadata objects into/from the Thrift compact
protocol
+//! format as defined by the [Parquet specification].
+//!
+//! [Parquet metadata API]: crate::file::metadata
+//! [Parquet specification]:
https://github.com/apache/parquet-format/tree/master
use std::io::Write;
use std::sync::Arc;
+#[cfg(feature = "encryption")]
+pub(crate) mod encryption;
+
#[cfg(feature = "encryption")]
use crate::file::{
- column_crypto_metadata::ColumnCryptoMetaData,
metadata::encryption::EncryptionAlgorithm,
+ column_crypto_metadata::ColumnCryptoMetaData,
metadata::thrift::encryption::EncryptionAlgorithm,
};
use crate::{
basic::{
@@ -33,8 +42,9 @@ use crate::{
errors::{ParquetError, Result},
file::{
metadata::{
- ColumnChunkMetaData, KeyValue, LevelHistogram, PageEncodingStats,
ParquetMetaData,
- RowGroupMetaData, SortingColumn,
+ ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue,
LevelHistogram,
+ PageEncodingStats, ParquetMetaData, RowGroupMetaData,
RowGroupMetaDataBuilder,
+ SortingColumn,
},
statistics::ValueStatistics,
},
@@ -93,7 +103,7 @@ pub(crate) struct SchemaElement<'a> {
);
thrift_struct!(
-pub(crate) struct Statistics<'a> {
+struct Statistics<'a> {
1: optional binary<'a> max;
2: optional binary<'a> min;
3: optional i64 null_count;
@@ -106,7 +116,7 @@ pub(crate) struct Statistics<'a> {
);
thrift_struct!(
-pub(super) struct BoundingBox {
+struct BoundingBox {
1: required double xmin;
2: required double xmax;
3: required double ymin;
@@ -119,21 +129,21 @@ pub(super) struct BoundingBox {
);
thrift_struct!(
-pub(super) struct GeospatialStatistics {
+struct GeospatialStatistics {
1: optional BoundingBox bbox;
2: optional list<i32> geospatial_types;
}
);
thrift_struct!(
-pub(super) struct SizeStatistics {
+struct SizeStatistics {
1: optional i64 unencoded_byte_array_data_bytes;
2: optional list<i64> repetition_level_histogram;
3: optional list<i64> definition_level_histogram;
}
);
-pub(super) fn convert_geo_stats(
+fn convert_geo_stats(
stats: Option<GeospatialStatistics>,
) -> Option<Box<crate::geospatial::statistics::GeospatialStatistics>> {
stats.map(|st| {
@@ -174,8 +184,8 @@ fn convert_bounding_box(
}
/// Create a [`crate::file::statistics::Statistics`] from a thrift
[`Statistics`] object.
-pub(crate) fn convert_stats(
- physical_type: Type,
+fn convert_stats(
+ column_descr: &Arc<ColumnDescriptor>,
thrift_stats: Option<Statistics>,
) -> Result<Option<crate::file::statistics::Statistics>> {
use crate::file::statistics::Statistics as FStatistics;
@@ -187,9 +197,10 @@ pub(crate) fn convert_stats(
let null_count = stats.null_count.unwrap_or(0);
if null_count < 0 {
- return Err(ParquetError::General(format!(
- "Statistics null count is negative {null_count}",
- )));
+ return Err(general_err!(
+ "Statistics null count is negative {}",
+ null_count
+ ));
}
// Generic null count.
@@ -214,21 +225,18 @@ pub(crate) fn convert_stats(
fn check_len(min: &Option<&[u8]>, max: &Option<&[u8]>, len: usize)
-> Result<()> {
if let Some(min) = min {
if min.len() < len {
- return Err(ParquetError::General(
- "Insufficient bytes to parse min
statistic".to_string(),
- ));
+ return Err(general_err!("Insufficient bytes to parse
min statistic",));
}
}
if let Some(max) = max {
if max.len() < len {
- return Err(ParquetError::General(
- "Insufficient bytes to parse max
statistic".to_string(),
- ));
+ return Err(general_err!("Insufficient bytes to parse
max statistic",));
}
}
Ok(())
}
+ let physical_type = column_descr.physical_type();
match physical_type {
Type::BOOLEAN => check_len(&min, &max, 1),
Type::INT32 | Type::FLOAT => check_len(&min, &max, 4),
@@ -323,39 +331,181 @@ pub(crate) fn convert_stats(
})
}
+// bit positions for required fields in the Thrift ColumnMetaData struct
+const COL_META_TYPE: u16 = 1 << 1;
+const COL_META_ENCODINGS: u16 = 1 << 2;
+const COL_META_CODEC: u16 = 1 << 4;
+const COL_META_NUM_VALUES: u16 = 1 << 5;
+const COL_META_TOTAL_UNCOMP_SZ: u16 = 1 << 6;
+const COL_META_TOTAL_COMP_SZ: u16 = 1 << 7;
+const COL_META_DATA_PAGE_OFFSET: u16 = 1 << 9;
+
+// a mask where all required fields' bits are set
+const COL_META_ALL_REQUIRED: u16 = COL_META_TYPE
+ | COL_META_ENCODINGS
+ | COL_META_CODEC
+ | COL_META_NUM_VALUES
+ | COL_META_TOTAL_UNCOMP_SZ
+ | COL_META_TOTAL_COMP_SZ
+ | COL_META_DATA_PAGE_OFFSET;
+
+// check mask to see if all required fields are set. return an appropriate
error if
+// any are missing.
+fn validate_column_metadata(mask: u16) -> Result<()> {
+ if mask != COL_META_ALL_REQUIRED {
+ if mask & COL_META_ENCODINGS == 0 {
+ return Err(general_err!("Required field encodings is missing"));
+ }
+ if mask & COL_META_CODEC == 0 {
+ return Err(general_err!("Required field codec is missing"));
+ }
+ if mask & COL_META_NUM_VALUES == 0 {
+ return Err(general_err!("Required field num_values is missing"));
+ }
+ if mask & COL_META_TOTAL_UNCOMP_SZ == 0 {
+ return Err(general_err!(
+ "Required field total_uncompressed_size is missing"
+ ));
+ }
+ if mask & COL_META_TOTAL_COMP_SZ == 0 {
+ return Err(general_err!(
+ "Required field total_compressed_size is missing"
+ ));
+ }
+ if mask & COL_META_DATA_PAGE_OFFSET == 0 {
+ return Err(general_err!("Required field data_page_offset is
missing"));
+ }
+ }
+
+ Ok(())
+}
+
+// Decode `ColumnMetaData`. Returns a mask of all required fields that were
observed.
+// This mask can be passed to `validate_column_metadata`.
+fn read_column_metadata<'a>(
+ prot: &mut ThriftSliceInputProtocol<'a>,
+ column: &mut ColumnChunkMetaData,
+) -> Result<u16> {
+ // mask for seen required fields in ColumnMetaData
+ let mut seen_mask = 0u16;
+
+ // struct ColumnMetaData {
+ // 1: required Type type
+ // 2: required list<Encoding> encodings
+ // 3: required list<string> path_in_schema
+ // 4: required CompressionCodec codec
+ // 5: required i64 num_values
+ // 6: required i64 total_uncompressed_size
+ // 7: required i64 total_compressed_size
+ // 8: optional list<KeyValue> key_value_metadata
+ // 9: required i64 data_page_offset
+ // 10: optional i64 index_page_offset
+ // 11: optional i64 dictionary_page_offset
+ // 12: optional Statistics statistics;
+ // 13: optional list<PageEncodingStats> encoding_stats;
+ // 14: optional i64 bloom_filter_offset;
+ // 15: optional i32 bloom_filter_length;
+ // 16: optional SizeStatistics size_statistics;
+ // 17: optional GeospatialStatistics geospatial_statistics;
+ // }
+ let column_descr = &column.column_descr;
+
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ // 1: type is never used, we can use the column descriptor
+ 1 => {
+ // read for error handling
+ Type::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_TYPE;
+ }
+ 2 => {
+ column.encodings = EncodingMask::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_ENCODINGS;
+ }
+ // 3: path_in_schema is redundant
+ 4 => {
+ column.compression = Compression::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_CODEC;
+ }
+ 5 => {
+ column.num_values = i64::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_NUM_VALUES;
+ }
+ 6 => {
+ column.total_uncompressed_size = i64::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_TOTAL_UNCOMP_SZ;
+ }
+ 7 => {
+ column.total_compressed_size = i64::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_TOTAL_COMP_SZ;
+ }
+ // 8: we don't expose this key value
+ 9 => {
+ column.data_page_offset = i64::read_thrift(&mut *prot)?;
+ seen_mask |= COL_META_DATA_PAGE_OFFSET;
+ }
+ 10 => {
+ column.index_page_offset = Some(i64::read_thrift(&mut *prot)?);
+ }
+ 11 => {
+ column.dictionary_page_offset = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ 12 => {
+ column.statistics =
+ convert_stats(column_descr,
Some(Statistics::read_thrift(&mut *prot)?))?;
+ }
+ 13 => {
+ let val =
+ read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(&mut *prot)?;
+ column.encoding_stats = Some(val);
+ }
+ 14 => {
+ column.bloom_filter_offset = Some(i64::read_thrift(&mut
*prot)?);
+ }
+ 15 => {
+ column.bloom_filter_length = Some(i32::read_thrift(&mut
*prot)?);
+ }
+ 16 => {
+ let val = SizeStatistics::read_thrift(&mut *prot)?;
+ column.unencoded_byte_array_data_bytes =
val.unencoded_byte_array_data_bytes;
+ column.repetition_level_histogram =
+ val.repetition_level_histogram.map(LevelHistogram::from);
+ column.definition_level_histogram =
+ val.definition_level_histogram.map(LevelHistogram::from);
+ }
+ 17 => {
+ let val = GeospatialStatistics::read_thrift(&mut *prot)?;
+ column.geo_statistics = convert_geo_stats(Some(val));
+ }
+ _ => {
+ prot.skip(field_ident.field_type)?;
+ }
+ };
+ last_field_id = field_ident.id;
+ }
+
+ Ok(seen_mask)
+}
+
// using ThriftSliceInputProtocol rather than ThriftCompactInputProtocl trait
because
// these are all internal and operate on slices.
fn read_column_chunk<'a>(
prot: &mut ThriftSliceInputProtocol<'a>,
column_descr: &Arc<ColumnDescriptor>,
) -> Result<ColumnChunkMetaData> {
- // ColumnChunk fields
- let mut file_path: Option<&str> = None;
- let mut file_offset: Option<i64> = None;
- let mut offset_index_offset: Option<i64> = None;
- let mut offset_index_length: Option<i32> = None;
- let mut column_index_offset: Option<i64> = None;
- let mut column_index_length: Option<i32> = None;
- #[cfg(feature = "encryption")]
- let mut column_crypto_metadata: Option<Box<ColumnCryptoMetaData>> = None;
- #[cfg(feature = "encryption")]
- let mut encrypted_column_metadata: Option<&[u8]> = None;
-
- // ColumnMetaData
- let mut encodings: Option<EncodingMask> = None;
- let mut codec: Option<Compression> = None;
- let mut num_values: Option<i64> = None;
- let mut total_uncompressed_size: Option<i64> = None;
- let mut total_compressed_size: Option<i64> = None;
- let mut data_page_offset: Option<i64> = None;
- let mut index_page_offset: Option<i64> = None;
- let mut dictionary_page_offset: Option<i64> = None;
- let mut statistics: Option<Statistics> = None;
- let mut encoding_stats: Option<Vec<PageEncodingStats>> = None;
- let mut bloom_filter_offset: Option<i64> = None;
- let mut bloom_filter_length: Option<i32> = None;
- let mut size_statistics: Option<SizeStatistics> = None;
- let mut geospatial_statistics: Option<GeospatialStatistics> = None;
+ // create a default initialized ColumnMetaData
+ let mut col =
ColumnChunkMetaDataBuilder::new(column_descr.clone()).build()?;
+
+ // seen flag for file_offset
+ let mut has_file_offset = false;
+
+ // mask of seen flags for ColumnMetaData
+ let mut col_meta_mask = 0u16;
// struct ColumnChunk {
// 1: optional string file_path
@@ -376,117 +526,35 @@ fn read_column_chunk<'a>(
}
match field_ident.id {
1 => {
- file_path = Some(<&str>::read_thrift(&mut *prot)?);
+ col.file_path = Some(String::read_thrift(&mut *prot)?);
}
2 => {
- file_offset = Some(i64::read_thrift(&mut *prot)?);
+ col.file_offset = i64::read_thrift(&mut *prot)?;
+ has_file_offset = true;
}
3 => {
- // `ColumnMetaData`. Read inline for performance sake.
- // struct ColumnMetaData {
- // 1: required Type type
- // 2: required list<Encoding> encodings
- // 3: required list<string> path_in_schema
- // 4: required CompressionCodec codec
- // 5: required i64 num_values
- // 6: required i64 total_uncompressed_size
- // 7: required i64 total_compressed_size
- // 8: optional list<KeyValue> key_value_metadata
- // 9: required i64 data_page_offset
- // 10: optional i64 index_page_offset
- // 11: optional i64 dictionary_page_offset
- // 12: optional Statistics statistics;
- // 13: optional list<PageEncodingStats> encoding_stats;
- // 14: optional i64 bloom_filter_offset;
- // 15: optional i32 bloom_filter_length;
- // 16: optional SizeStatistics size_statistics;
- // 17: optional GeospatialStatistics geospatial_statistics;
- // }
- let mut last_field_id = 0i16;
- loop {
- let field_ident = prot.read_field_begin(last_field_id)?;
- if field_ident.field_type == FieldType::Stop {
- break;
- }
- match field_ident.id {
- // 1: type is never used, we can use the column
descriptor
- 2 => {
- let val = EncodingMask::read_thrift(&mut *prot)?;
- encodings = Some(val);
- }
- // 3: path_in_schema is redundant
- 4 => {
- codec = Some(Compression::read_thrift(&mut
*prot)?);
- }
- 5 => {
- num_values = Some(i64::read_thrift(&mut *prot)?);
- }
- 6 => {
- total_uncompressed_size =
Some(i64::read_thrift(&mut *prot)?);
- }
- 7 => {
- total_compressed_size = Some(i64::read_thrift(&mut
*prot)?);
- }
- // 8: we don't expose this key value
- 9 => {
- data_page_offset = Some(i64::read_thrift(&mut
*prot)?);
- }
- 10 => {
- index_page_offset = Some(i64::read_thrift(&mut
*prot)?);
- }
- 11 => {
- dictionary_page_offset =
Some(i64::read_thrift(&mut *prot)?);
- }
- 12 => {
- statistics = Some(Statistics::read_thrift(&mut
*prot)?);
- }
- 13 => {
- let val = read_thrift_vec::<PageEncodingStats,
ThriftSliceInputProtocol>(
- &mut *prot,
- )?;
- encoding_stats = Some(val);
- }
- 14 => {
- bloom_filter_offset = Some(i64::read_thrift(&mut
*prot)?);
- }
- 15 => {
- bloom_filter_length = Some(i32::read_thrift(&mut
*prot)?);
- }
- 16 => {
- let val = SizeStatistics::read_thrift(&mut *prot)?;
- size_statistics = Some(val);
- }
- 17 => {
- let val = GeospatialStatistics::read_thrift(&mut
*prot)?;
- geospatial_statistics = Some(val);
- }
- _ => {
- prot.skip(field_ident.field_type)?;
- }
- };
- last_field_id = field_ident.id;
- }
+ col_meta_mask = read_column_metadata(&mut *prot, &mut col)?;
}
4 => {
- offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
+ col.offset_index_offset = Some(i64::read_thrift(&mut *prot)?);
}
5 => {
- offset_index_length = Some(i32::read_thrift(&mut *prot)?);
+ col.offset_index_length = Some(i32::read_thrift(&mut *prot)?);
}
6 => {
- column_index_offset = Some(i64::read_thrift(&mut *prot)?);
+ col.column_index_offset = Some(i64::read_thrift(&mut *prot)?);
}
7 => {
- column_index_length = Some(i32::read_thrift(&mut *prot)?);
+ col.column_index_length = Some(i32::read_thrift(&mut *prot)?);
}
#[cfg(feature = "encryption")]
8 => {
let val = ColumnCryptoMetaData::read_thrift(&mut *prot)?;
- column_crypto_metadata = Some(Box::new(val));
+ col.column_crypto_metadata = Some(Box::new(val));
}
#[cfg(feature = "encryption")]
9 => {
- encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut
*prot)?);
+ col.encrypted_column_metadata = Some(<&[u8]>::read_thrift(&mut
*prot)?.to_vec());
}
_ => {
prot.skip(field_ident.field_type)?;
@@ -496,164 +564,36 @@ fn read_column_chunk<'a>(
}
// the only required field from ColumnChunk
- let Some(file_offset) = file_offset else {
+ if !has_file_offset {
return Err(general_err!("Required field file_offset is missing"));
};
- // transform optional fields
- let file_path = file_path.map(|f| f.to_owned());
- let (unencoded_byte_array_data_bytes, repetition_level_histogram,
definition_level_histogram) =
- if let Some(size_stats) = size_statistics {
- (
- size_stats.unencoded_byte_array_data_bytes,
- size_stats.repetition_level_histogram,
- size_stats.definition_level_histogram,
- )
- } else {
- (None, None, None)
- };
-
- let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
- let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
-
- let statistics = convert_stats(column_descr.physical_type(), statistics)?;
- let geo_statistics = convert_geo_stats(geospatial_statistics);
- let column_descr = column_descr.clone();
-
- // if encrypted, set the encrypted column metadata and return. we'll
decrypt after finishing
- // the footer and populate the rest.
+ // if encrypted just return. we'll decrypt after finishing the footer and
populate the rest.
#[cfg(feature = "encryption")]
- if encrypted_column_metadata.is_some() {
- use crate::file::metadata::ColumnChunkMetaDataBuilder;
-
- let encrypted_column_metadata = encrypted_column_metadata.map(|s|
s.to_vec());
-
- // use builder to get uninitialized ColumnChunkMetaData
- let mut col = ColumnChunkMetaDataBuilder::new(column_descr).build()?;
-
- // set ColumnChunk fields
- col.file_path = file_path;
- col.file_offset = file_offset;
- col.offset_index_offset = offset_index_offset;
- col.offset_index_length = offset_index_length;
- col.column_index_offset = column_index_offset;
- col.column_index_length = column_index_length;
- col.column_crypto_metadata = column_crypto_metadata;
- col.encrypted_column_metadata = encrypted_column_metadata;
-
- // check for ColumnMetaData fields that might be present
- // first required fields
- if let Some(encodings) = encodings {
- col.encodings = encodings;
- }
- if let Some(codec) = codec {
- col.compression = codec;
- }
- if let Some(num_values) = num_values {
- col.num_values = num_values;
- }
- if let Some(total_uncompressed_size) = total_uncompressed_size {
- col.total_uncompressed_size = total_uncompressed_size;
- }
- if let Some(total_compressed_size) = total_compressed_size {
- col.total_compressed_size = total_compressed_size;
- }
- if let Some(data_page_offset) = data_page_offset {
- col.data_page_offset = data_page_offset;
- }
-
- // then optional
- col.index_page_offset = index_page_offset;
- col.dictionary_page_offset = dictionary_page_offset;
- col.bloom_filter_offset = bloom_filter_offset;
- col.bloom_filter_length = bloom_filter_length;
- col.unencoded_byte_array_data_bytes = unencoded_byte_array_data_bytes;
- col.repetition_level_histogram = repetition_level_histogram;
- col.definition_level_histogram = definition_level_histogram;
- col.encoding_stats = encoding_stats;
- col.statistics = statistics;
- col.geo_statistics = geo_statistics;
-
+ if col.encrypted_column_metadata.is_some() {
return Ok(col);
}
- // not encrypted, so meta_data better exist
- let Some(encodings) = encodings else {
- return Err(ParquetError::General(
- "Required field encodings is missing".to_owned(),
- ));
- };
- let Some(codec) = codec else {
- return Err(ParquetError::General(
- "Required field codec is missing".to_owned(),
- ));
- };
- let Some(num_values) = num_values else {
- return Err(ParquetError::General(
- "Required field num_values is missing".to_owned(),
- ));
- };
- let Some(total_uncompressed_size) = total_uncompressed_size else {
- return Err(ParquetError::General(
- "Required field total_uncompressed_size is missing".to_owned(),
- ));
- };
- let Some(total_compressed_size) = total_compressed_size else {
- return Err(ParquetError::General(
- "Required field total_compressed_size is missing".to_owned(),
- ));
- };
- let Some(data_page_offset) = data_page_offset else {
- return Err(ParquetError::General(
- "Required field data_page_offset is missing".to_owned(),
- ));
- };
+ // not encrypted, so make sure all required fields were read
+ validate_column_metadata(col_meta_mask)?;
- let compression = codec;
-
- // NOTE: I tried using the builder for this, but it added 20% to the
execution time
- let result = ColumnChunkMetaData {
- column_descr,
- encodings,
- file_path,
- file_offset,
- num_values,
- compression,
- total_compressed_size,
- total_uncompressed_size,
- data_page_offset,
- index_page_offset,
- dictionary_page_offset,
- statistics,
- geo_statistics,
- encoding_stats,
- bloom_filter_offset,
- bloom_filter_length,
- offset_index_offset,
- offset_index_length,
- column_index_offset,
- column_index_length,
- unencoded_byte_array_data_bytes,
- repetition_level_histogram,
- definition_level_histogram,
- #[cfg(feature = "encryption")]
- column_crypto_metadata,
- #[cfg(feature = "encryption")]
- encrypted_column_metadata: None, // tested is_some above
- };
- Ok(result)
+ Ok(col)
}
fn read_row_group(
prot: &mut ThriftSliceInputProtocol,
schema_descr: &Arc<SchemaDescriptor>,
) -> Result<RowGroupMetaData> {
- let mut columns: Option<Vec<ColumnChunkMetaData>> = None;
- let mut total_byte_size: Option<i64> = None;
- let mut num_rows: Option<i64> = None;
- let mut sorting_columns: Option<Vec<SortingColumn>> = None;
- let mut file_offset: Option<i64> = None;
- let mut ordinal: Option<i16> = None;
+ // create default initialized RowGroupMetaData
+ let mut row_group =
RowGroupMetaDataBuilder::new(schema_descr.clone()).build_unchecked();
+
+ // mask values for required fields
+ const RG_COLUMNS: u8 = 1 << 1;
+ const RG_TOT_BYTE_SIZE: u8 = 1 << 2;
+ const RG_NUM_ROWS: u8 = 1 << 3;
+ const RG_ALL_REQUIRED: u8 = RG_COLUMNS | RG_TOT_BYTE_SIZE | RG_NUM_ROWS;
+
+ let mut mask = 0u8;
// struct RowGroup {
// 1: required list<ColumnChunk> columns
@@ -680,29 +620,30 @@ fn read_row_group(
list_ident.size
));
}
- let mut cols = Vec::with_capacity(list_ident.size as usize);
for i in 0..list_ident.size as usize {
let col = read_column_chunk(prot,
&schema_descr.columns()[i])?;
- cols.push(col);
+ row_group.columns.push(col);
}
- columns = Some(cols);
+ mask |= RG_COLUMNS;
}
2 => {
- total_byte_size = Some(i64::read_thrift(&mut *prot)?);
+ row_group.total_byte_size = i64::read_thrift(&mut *prot)?;
+ mask |= RG_TOT_BYTE_SIZE;
}
3 => {
- num_rows = Some(i64::read_thrift(&mut *prot)?);
+ row_group.num_rows = i64::read_thrift(&mut *prot)?;
+ mask |= RG_NUM_ROWS;
}
4 => {
let val = read_thrift_vec::<SortingColumn,
ThriftSliceInputProtocol>(&mut *prot)?;
- sorting_columns = Some(val);
+ row_group.sorting_columns = Some(val);
}
5 => {
- file_offset = Some(i64::read_thrift(&mut *prot)?);
+ row_group.file_offset = Some(i64::read_thrift(&mut *prot)?);
}
// 6: we don't expose total_compressed_size
7 => {
- ordinal = Some(i16::read_thrift(&mut *prot)?);
+ row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
}
_ => {
prot.skip(field_ident.field_type)?;
@@ -710,31 +651,20 @@ fn read_row_group(
};
last_field_id = field_ident.id;
}
- let Some(columns) = columns else {
- return Err(ParquetError::General(
- "Required field columns is missing".to_owned(),
- ));
- };
- let Some(total_byte_size) = total_byte_size else {
- return Err(ParquetError::General(
- "Required field total_byte_size is missing".to_owned(),
- ));
- };
- let Some(num_rows) = num_rows else {
- return Err(ParquetError::General(
- "Required field num_rows is missing".to_owned(),
- ));
- };
- Ok(RowGroupMetaData {
- columns,
- num_rows,
- sorting_columns,
- total_byte_size,
- schema_descr: schema_descr.clone(),
- file_offset,
- ordinal,
- })
+ if mask != RG_ALL_REQUIRED {
+ if mask & RG_COLUMNS == 0 {
+ return Err(general_err!("Required field columns is missing"));
+ }
+ if mask & RG_TOT_BYTE_SIZE == 0 {
+ return Err(general_err!("Required field total_byte_size is
missing"));
+ }
+ if mask & RG_NUM_ROWS == 0 {
+ return Err(general_err!("Required field num_rows is missing"));
+ }
+ }
+
+ Ok(row_group)
}
/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes
the file metadata in
@@ -826,19 +756,13 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) ->
Result<ParquetMetaData>
last_field_id = field_ident.id;
}
let Some(version) = version else {
- return Err(ParquetError::General(
- "Required field version is missing".to_owned(),
- ));
+ return Err(general_err!("Required field version is missing"));
};
let Some(num_rows) = num_rows else {
- return Err(ParquetError::General(
- "Required field num_rows is missing".to_owned(),
- ));
+ return Err(general_err!("Required field num_rows is missing"));
};
let Some(row_groups) = row_groups else {
- return Err(ParquetError::General(
- "Required field row_groups is missing".to_owned(),
- ));
+ return Err(general_err!("Required field row_groups is missing"));
};
let created_by = created_by.map(|c| c.to_owned());
@@ -981,23 +905,19 @@ impl DataPageHeader {
last_field_id = field_ident.id;
}
let Some(num_values) = num_values else {
- return Err(ParquetError::General(
- "Required field num_values is missing".to_owned(),
- ));
+ return Err(general_err!("Required field num_values is missing"));
};
let Some(encoding) = encoding else {
- return Err(ParquetError::General(
- "Required field encoding is missing".to_owned(),
- ));
+ return Err(general_err!("Required field encoding is missing"));
};
let Some(definition_level_encoding) = definition_level_encoding else {
- return Err(ParquetError::General(
- "Required field definition_level_encoding is
missing".to_owned(),
+ return Err(general_err!(
+ "Required field definition_level_encoding is missing"
));
};
let Some(repetition_level_encoding) = repetition_level_encoding else {
- return Err(ParquetError::General(
- "Required field repetition_level_encoding is
missing".to_owned(),
+ return Err(general_err!(
+ "Required field repetition_level_encoding is missing"
));
};
Ok(Self {
@@ -1079,33 +999,25 @@ impl DataPageHeaderV2 {
last_field_id = field_ident.id;
}
let Some(num_values) = num_values else {
- return Err(ParquetError::General(
- "Required field num_values is missing".to_owned(),
- ));
+ return Err(general_err!("Required field num_values is missing"));
};
let Some(num_nulls) = num_nulls else {
- return Err(ParquetError::General(
- "Required field num_nulls is missing".to_owned(),
- ));
+ return Err(general_err!("Required field num_nulls is missing"));
};
let Some(num_rows) = num_rows else {
- return Err(ParquetError::General(
- "Required field num_rows is missing".to_owned(),
- ));
+ return Err(general_err!("Required field num_rows is missing"));
};
let Some(encoding) = encoding else {
- return Err(ParquetError::General(
- "Required field encoding is missing".to_owned(),
- ));
+ return Err(general_err!("Required field encoding is missing"));
};
let Some(definition_levels_byte_length) =
definition_levels_byte_length else {
- return Err(ParquetError::General(
- "Required field definition_levels_byte_length is
missing".to_owned(),
+ return Err(general_err!(
+ "Required field definition_levels_byte_length is missing"
));
};
let Some(repetition_levels_byte_length) =
repetition_levels_byte_length else {
- return Err(ParquetError::General(
- "Required field repetition_levels_byte_length is
missing".to_owned(),
+ return Err(general_err!(
+ "Required field repetition_levels_byte_length is missing"
));
};
Ok(Self {
@@ -1145,7 +1057,7 @@ pub(crate) struct PageHeader {
impl PageHeader {
// reader that skips reading page statistics. obtained by running
- // `cargo expand -p parquet --all-features --lib
file::metadata::thrift_gen`
+ // `cargo expand -p parquet --all-features --lib file::metadata::thrift`
// and modifying the impl of `read_thrift`
pub(crate) fn read_thrift_without_stats<'a, R>(prot: &mut R) ->
Result<Self>
where
@@ -1205,18 +1117,16 @@ impl PageHeader {
last_field_id = field_ident.id;
}
let Some(type_) = type_ else {
- return Err(ParquetError::General(
- "Required field type_ is missing".to_owned(),
- ));
+ return Err(general_err!("Required field type_ is missing"));
};
let Some(uncompressed_page_size) = uncompressed_page_size else {
- return Err(ParquetError::General(
- "Required field uncompressed_page_size is missing".to_owned(),
+ return Err(general_err!(
+ "Required field uncompressed_page_size is missing"
));
};
let Some(compressed_page_size) = compressed_page_size else {
- return Err(ParquetError::General(
- "Required field compressed_page_size is missing".to_owned(),
+ return Err(general_err!(
+ "Required field compressed_page_size is missing"
));
};
Ok(Self {
@@ -1255,7 +1165,7 @@ impl PageHeader {
// 16: optional SizeStatistics size_statistics;
// 17: optional GeospatialStatistics geospatial_statistics;
// }
-pub(crate) fn serialize_column_meta_data<W: Write>(
+pub(super) fn serialize_column_meta_data<W: Write>(
column_chunk: &ColumnChunkMetaData,
w: &mut ThriftCompactOutputProtocol<W>,
) -> Result<()> {
@@ -1333,9 +1243,9 @@ pub(crate) fn serialize_column_meta_data<W: Write>(
}
// temp struct used for writing
-pub(crate) struct FileMeta<'a> {
- pub(crate) file_metadata: &'a crate::file::metadata::FileMetaData,
- pub(crate) row_groups: &'a Vec<RowGroupMetaData>,
+pub(super) struct FileMeta<'a> {
+ pub(super) file_metadata: &'a crate::file::metadata::FileMetaData,
+ pub(super) row_groups: &'a Vec<RowGroupMetaData>,
}
// struct FileMetaData {
@@ -1675,7 +1585,7 @@ impl WriteThriftField for
crate::geospatial::bounding_box::BoundingBox {
#[cfg(test)]
pub(crate) mod tests {
use crate::errors::Result;
- use crate::file::metadata::thrift_gen::{BoundingBox, SchemaElement,
write_schema};
+ use crate::file::metadata::thrift::{BoundingBox, SchemaElement,
write_schema};
use crate::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::parquet_thrift::tests::test_roundtrip;
use crate::parquet_thrift::{
@@ -1692,7 +1602,7 @@ pub(crate) mod tests {
schema_descr: Arc<SchemaDescriptor>,
) -> Result<RowGroupMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
- crate::file::metadata::thrift_gen::read_row_group(&mut reader,
&schema_descr)
+ crate::file::metadata::thrift::read_row_group(&mut reader,
&schema_descr)
}
pub(crate) fn read_column_chunk(
@@ -1700,7 +1610,7 @@ pub(crate) mod tests {
column_descr: Arc<ColumnDescriptor>,
) -> Result<ColumnChunkMetaData> {
let mut reader = ThriftSliceInputProtocol::new(buf);
- crate::file::metadata::thrift_gen::read_column_chunk(&mut reader,
&column_descr)
+ crate::file::metadata::thrift::read_column_chunk(&mut reader,
&column_descr)
}
pub(crate) fn roundtrip_schema(schema: TypePtr) -> Result<TypePtr> {
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index 398f5419c6..124bc11bdd 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::file::metadata::thrift_gen::FileMeta;
+use crate::file::metadata::thrift::FileMeta;
use crate::file::metadata::{
ColumnChunkMetaData, ParquetColumnIndex, ParquetOffsetIndex,
RowGroupMetaData,
};
@@ -31,7 +31,7 @@ use crate::{
modules::{ModuleType, create_footer_aad, create_module_aad},
},
file::column_crypto_metadata::ColumnCryptoMetaData,
- file::metadata::encryption::{AesGcmV1, EncryptionAlgorithm,
FileCryptoMetaData},
+ file::metadata::thrift::encryption::{AesGcmV1, EncryptionAlgorithm,
FileCryptoMetaData},
};
use crate::{errors::Result,
file::page_index::column_index::ColumnIndexMetaData};
@@ -762,7 +762,7 @@ impl MetadataObjectWriter {
// so the column chunk does not need additional encryption.
}
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(col_key)) =>
{
- use
crate::file::metadata::thrift_gen::serialize_column_meta_data;
+ use crate::file::metadata::thrift::serialize_column_meta_data;
let column_path = col_key.path_in_schema.join(".");
let mut column_encryptor =
file_encryptor.get_column_encryptor(&column_path)?;
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 748a2df32f..6da5c39d74 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -25,7 +25,7 @@ use crate::compression::{Codec, create_codec};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::{CryptoContext, read_and_decrypt};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::thrift_gen::PageHeader;
+use crate::file::metadata::thrift::PageHeader;
use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use crate::file::statistics;
use crate::file::{
@@ -769,7 +769,7 @@ impl SerializedPageReaderContext {
if self.read_stats {
Ok(PageHeader::read_thrift(&mut prot)?)
} else {
- use crate::file::metadata::thrift_gen::PageHeader;
+ use crate::file::metadata::thrift::PageHeader;
Ok(PageHeader::read_thrift_without_stats(&mut prot)?)
}
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index 0c54940fac..4cb81bff65 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -45,7 +45,7 @@ use crate::basic::Type;
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::thrift_gen::PageStatistics;
+use crate::file::metadata::thrift::PageStatistics;
use crate::util::bit_util::FromBytes;
pub(crate) mod private {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 4972fbf79b..4533d25401 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -18,7 +18,7 @@
//! [`SerializedFileWriter`]: Low level Parquet writer API
use crate::bloom_filter::Sbbf;
-use crate::file::metadata::thrift_gen::PageHeader;
+use crate::file::metadata::thrift::PageHeader;
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index a847611a75..1ae37d0a46 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -21,7 +21,7 @@ use std::vec::IntoIter;
use std::{collections::HashMap, fmt, sync::Arc};
use crate::file::metadata::HeapSize;
-use crate::file::metadata::thrift_gen::SchemaElement;
+use crate::file::metadata::thrift::SchemaElement;
use crate::basic::{
ColumnOrder, ConvertedType, LogicalType, Repetition, SortOrder, TimeUnit,
Type as PhysicalType,
@@ -1370,7 +1370,7 @@ mod tests {
use super::*;
use crate::{
- file::metadata::thrift_gen::tests::{buf_to_schema_list,
roundtrip_schema, schema_to_buf},
+ file::metadata::thrift::tests::{buf_to_schema_list, roundtrip_schema,
schema_to_buf},
schema::parser::parse_message_type,
};