etseidl commented on code in PR #8340:
URL: https://github.com/apache/arrow-rs/pull/8340#discussion_r2382839671
##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -211,16 +232,38 @@ impl ParquetMetaDataPushDecoder {
)));
};
- let metadata_reader =
-
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional);
-
Ok(Self {
- done: false,
- metadata_reader,
+ state: DecodeState::ReadingFooter,
+ column_index_policy: PageIndexPolicy::Optional,
+ offset_index_policy: PageIndexPolicy::Optional,
buffers: crate::util::push_buffers::PushBuffers::new(file_len),
+ metadata_parser: MetadataParser::new(),
})
}
+ /// Begin decoding from the given footer tail.
+ pub(crate) fn try_new_with_footer_tail(
+ file_len: u64,
+ footer_tail: FooterTail,
+ ) -> Result<Self, ParquetError> {
Review Comment:
Just curious, but do you dislike use of `crate::errors::Result`?
##########
parquet/src/file/metadata/parser.rs:
##########
@@ -79,7 +159,7 @@ pub(crate) fn decode_metadata(buf: &[u8]) ->
crate::errors::Result<ParquetMetaDa
/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
-pub(crate) fn parse_column_orders(
+fn parse_column_orders(
Review Comment:
This will go away, btw.
##########
parquet/src/file/metadata/parser.rs:
##########
@@ -43,6 +43,86 @@ use crate::encryption::{
#[cfg(feature = "encryption")]
use crate::format::EncryptionAlgorithm;
+/// Helper struct for metadata parsing
+///
+/// This structure parses thrift-encoded bytes into the correct Rust structs,
+/// such as [`ParquetMetaData`], handling decryption if necessary.
+//
+// Note this structure is used to minimize the number of
+// places need to add `#[cfg(feature = "encryption")]` checks.
+pub(crate) use inner::MetadataParser;
+
+#[cfg(feature = "encryption")]
+mod inner {
+ use super::*;
+ use crate::encryption::decrypt::FileDecryptionProperties;
+ use crate::errors::Result;
+
+ /// API for decoding metadata that may be encrypted
+ #[derive(Debug, Default)]
+ pub(crate) struct MetadataParser {
+ // the credentials and keys needed to decrypt metadata
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+ }
+
+ impl MetadataParser {
+ pub(crate) fn new() -> Self {
+ MetadataParser::default()
+ }
+
+ pub(crate) fn with_file_decryption_properties(
+ mut self,
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+ ) -> Self {
+ self.file_decryption_properties = file_decryption_properties;
+ self
+ }
+
+ pub(crate) fn decode_metadata(
+ &self,
+ buf: &[u8],
+ encrypted_footer: bool,
+ ) -> Result<ParquetMetaData> {
+ decode_metadata_with_encryption(
+ buf,
+ encrypted_footer,
+ self.file_decryption_properties.as_deref(),
+ )
+ }
+ }
+}
+
+#[cfg(not(feature = "encryption"))]
+mod inner {
+ use super::*;
+ use crate::errors::Result;
+ /// parallel implementation when encryption feature is not enabled
+ ///
+ /// This has the same API as the encryption-enabled version
+ #[derive(Debug, Default)]
+ pub(crate) struct MetadataParser;
+
+ impl MetadataParser {
+ pub(crate) fn new() -> Self {
+ MetadataParser
+ }
+
+ pub(crate) fn decode_metadata(
+ &self,
+ buf: &[u8],
+ encrypted_footer: bool,
+ ) -> Result<ParquetMetaData> {
+ if encrypted_footer {
+ Err(general_err!(
+ "Parquet file has an encrypted footer but the encryption
feature is disabled"
+ ))
+ } else {
+ decode_metadata(buf)
Review Comment:
This is the only problematic line for the merge. For my initial pass I
replaced this call with the thrift decode, but on second thought I should just
change the implementation of `decode_metadata` below to use the new structs.
##########
parquet/src/file/metadata/push_decoder.rs:
##########
@@ -259,70 +325,160 @@ impl ParquetMetaDataPushDecoder {
/// example on [`Self`]
pub fn push_ranges(
&mut self,
- ranges: Vec<std::ops::Range<u64>>,
- buffers: Vec<bytes::Bytes>,
- ) -> std::result::Result<(), String> {
- if self.done {
- return Err(
+ ranges: Vec<Range<u64>>,
+ buffers: Vec<Bytes>,
+ ) -> Result<(), ParquetError> {
+ if matches!(&self.state, DecodeState::Finished) {
+ return Err(general_err!(
"ParquetMetaDataPushDecoder: cannot push data after decoding
is finished"
- .to_string(),
- );
+ ));
}
self.buffers.push_ranges(ranges, buffers);
Ok(())
}
- /// Try to decode the metadata from the pushed data, returning the
- /// decoded metadata or an error if not enough data is available.
- pub fn try_decode(
- &mut self,
- ) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
- if self.done {
- return Ok(DecodeResult::Finished);
+ /// Pushes a single range of data into the decoder's buffer.
+ pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) ->
Result<(), ParquetError> {
+ if matches!(&self.state, DecodeState::Finished) {
+ return Err(general_err!(
+ "ParquetMetaDataPushDecoder: cannot push data after decoding
is finished"
+ ));
}
+ self.buffers.push_range(range, buffer);
+ Ok(())
+ }
- // need to have the last 8 bytes of the file to decode the metadata
+ /// Try to decode the metadata from the pushed data, returning the
+ /// decoded metadata or an error if not enough data is available.
+ pub fn try_decode(&mut self) -> Result<DecodeResult<ParquetMetaData>,
ParquetError> {
let file_len = self.buffers.file_len();
- if !self.buffers.has_range(&(file_len - 8..file_len)) {
- #[expect(clippy::single_range_in_vec_init)]
- return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
+ let footer_len = FOOTER_SIZE as u64;
+ loop {
+ match std::mem::replace(&mut self.state,
DecodeState::Intermediate) {
+ DecodeState::ReadingFooter => {
+ // need to have the last 8 bytes of the file to decode the
metadata
+ let footer_start = file_len.saturating_sub(footer_len);
+ let footer_range = footer_start..file_len;
+
+ if !self.buffers.has_range(&footer_range) {
+ self.state = DecodeState::ReadingFooter;
+ return Ok(needs_range(footer_range));
+ }
+ let footer_bytes = self.get_bytes(&footer_range)?;
+ let footer_tail =
FooterTail::try_from(footer_bytes.as_ref())?;
+
+ self.state = DecodeState::ReadingMetadata(footer_tail);
+ continue;
+ }
+
+ DecodeState::ReadingMetadata(footer_tail) => {
+ let metadata_len: u64 = footer_tail.metadata_length() as
u64;
+ let metadata_start = file_len - footer_len - metadata_len;
+ let metadata_end = metadata_start + metadata_len;
+ let metadata_range = metadata_start..metadata_end;
+
+ if !self.buffers.has_range(&metadata_range) {
+ self.state = DecodeState::ReadingMetadata(footer_tail);
+ return Ok(needs_range(metadata_range));
+ }
+
+ let metadata = self.metadata_parser.decode_metadata(
+ &self.get_bytes(&metadata_range)?,
+ footer_tail.is_encrypted_footer(),
+ )?;
+ self.state =
DecodeState::ReadingPageIndex(Box::new(metadata));
+ continue;
+ }
+
+ DecodeState::ReadingPageIndex(mut metadata) => {
Review Comment:
A little documentation of the state change would be nice here. It's not
immediately obvious that `range_for_page_index` will return `None` if page
indexes were not requested. I was thinking the check for the latter would be
done above while deciding the next state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]