This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a0db1985c3 Add options to control various aspects of Parquet metadata
decoding (#8763)
a0db1985c3 is described below
commit a0db1985c3a0f3190cfc5166b428933a28c740f9
Author: Ed Seidl <[email protected]>
AuthorDate: Tue Nov 11 07:02:59 2025 -0800
Add options to control various aspects of Parquet metadata decoding (#8763)
---
parquet/benches/metadata.rs | 21 ++++-
parquet/src/arrow/arrow_reader/mod.rs | 42 +++++++++-
parquet/src/arrow/async_reader/mod.rs | 9 ++-
parquet/src/arrow/async_reader/store.rs | 2 +
parquet/src/file/metadata/mod.rs | 2 +
parquet/src/file/metadata/options.rs | 108 +++++++++++++++++++++++++
parquet/src/file/metadata/parser.rs | 45 +++++++++--
parquet/src/file/metadata/push_decoder.rs | 9 ++-
parquet/src/file/metadata/reader.rs | 44 ++++++++--
parquet/src/file/metadata/thrift/encryption.rs | 5 +-
parquet/src/file/metadata/thrift/mod.rs | 53 ++++++++++--
parquet/src/file/serialized_reader.rs | 35 +++++++-
12 files changed, 342 insertions(+), 33 deletions(-)
diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs
index 04feb0d900..43b08e6b26 100644
--- a/parquet/benches/metadata.rs
+++ b/parquet/benches/metadata.rs
@@ -20,8 +20,8 @@ use std::sync::Arc;
use parquet::basic::{Encoding, PageType, Type as PhysicalType};
use parquet::file::metadata::{
- ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData,
ParquetMetaDataReader,
- ParquetMetaDataWriter, RowGroupMetaData,
+ ColumnChunkMetaData, FileMetaData, PageEncodingStats, ParquetMetaData,
ParquetMetaDataOptions,
+ ParquetMetaDataReader, ParquetMetaDataWriter, RowGroupMetaData,
};
use parquet::file::statistics::Statistics;
use parquet::file::writer::TrackedWrite;
@@ -164,12 +164,29 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});
+ let schema = ParquetMetaDataReader::decode_schema(&meta_data).unwrap();
+ let options = ParquetMetaDataOptions::new().with_schema(schema);
+ c.bench_function("decode metadata with schema", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&meta_data,
Some(&options))
+ .unwrap();
+ })
+ });
+
let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
})
});
+
+ let schema = ParquetMetaDataReader::decode_schema(&buf).unwrap();
+ let options = ParquetMetaDataOptions::new().with_schema(schema);
+ c.bench_function("decode metadata (wide) with schema", |b| {
+ b.iter(|| {
+ ParquetMetaDataReader::decode_metadata_with_options(&buf,
Some(&options)).unwrap();
+ })
+ });
}
criterion_group!(benches, criterion_benchmark);
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 4e12c55c9f..673e9d9d7f 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -38,7 +38,9 @@ use crate::column::page::{PageIterator, PageReader};
#[cfg(feature = "encryption")]
use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{PageIndexPolicy, ParquetMetaData,
ParquetMetaDataReader};
+use crate::file::metadata::{
+ PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataReader,
+};
use crate::file::reader::{ChunkReader, SerializedPageReader};
use crate::schema::types::SchemaDescriptor;
@@ -404,6 +406,8 @@ pub struct ArrowReaderOptions {
supplied_schema: Option<SchemaRef>,
/// Policy for reading offset and column indexes.
pub(crate) page_index_policy: PageIndexPolicy,
+ /// Options to control reading of Parquet metadata
+ metadata_options: ParquetMetaDataOptions,
/// If encryption is enabled, the file decryption properties can be
provided
#[cfg(feature = "encryption")]
pub(crate) file_decryption_properties:
Option<Arc<FileDecryptionProperties>>,
@@ -523,6 +527,16 @@ impl ArrowReaderOptions {
}
}
+ /// Provide a Parquet schema to use when decoding the metadata. The schema
in the Parquet
+ /// footer will be skipped.
+ ///
+ /// This can be used to avoid reparsing the schema from the file when it is
+ /// already known.
+ pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) ->
Self {
+ self.metadata_options.set_schema(schema);
+ self
+ }
+
/// Provide the file decryption properties to use when reading encrypted
parquet files.
///
/// If encryption is enabled and the file is encrypted, the
`file_decryption_properties` must be provided.
@@ -544,6 +558,11 @@ impl ArrowReaderOptions {
self.page_index_policy != PageIndexPolicy::Skip
}
+ /// Retrieve the currently set metadata decoding options.
+ pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
+ &self.metadata_options
+ }
+
/// Retrieve the currently set file decryption properties.
///
/// This can be set via
@@ -591,8 +610,9 @@ impl ArrowReaderMetadata {
/// `Self::metadata` is missing the page index, this function will attempt
/// to load the page index by making an object store request.
pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) ->
Result<Self> {
- let metadata =
-
ParquetMetaDataReader::new().with_page_index_policy(options.page_index_policy);
+ let metadata = ParquetMetaDataReader::new()
+ .with_page_index_policy(options.page_index_policy)
+ .with_metadata_options(Some(options.metadata_options.clone()));
#[cfg(feature = "encryption")]
let metadata = metadata.with_decryption_properties(
options.file_decryption_properties.as_ref().map(Arc::clone),
@@ -1246,6 +1266,22 @@ mod tests {
assert_eq!(original_schema.fields(), reader.schema().fields());
}
+ #[test]
+ fn test_reuse_schema() {
+ let file = get_test_file("parquet/alltypes-java.parquet");
+
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
+ let expected = builder.metadata;
+ let schema = expected.file_metadata().schema_descr_ptr();
+
+ let arrow_options =
ArrowReaderOptions::new().with_parquet_schema(schema.clone());
+ let builder =
+ ParquetRecordBatchReaderBuilder::try_new_with_options(file,
arrow_options).unwrap();
+
+ // Verify that the metadata matches
+ assert_eq!(expected.as_ref(), builder.metadata.as_ref());
+ }
+
#[test]
fn test_arrow_reader_single_column() {
let file =
get_test_file("parquet/generated_simple_numerics/blogs.parquet");
diff --git a/parquet/src/arrow/async_reader/mod.rs
b/parquet/src/arrow/async_reader/mod.rs
index 44c5465202..becc698abe 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -164,9 +164,12 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send>
AsyncFileReader for T {
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
async move {
- let metadata_reader =
ParquetMetaDataReader::new().with_page_index_policy(
- PageIndexPolicy::from(options.is_some_and(|o| o.page_index())),
- );
+ let metadata_opts = options.map(|o| o.metadata_options().clone());
+ let metadata_reader = ParquetMetaDataReader::new()
+ .with_page_index_policy(PageIndexPolicy::from(
+ options.is_some_and(|o| o.page_index()),
+ ))
+ .with_metadata_options(metadata_opts);
#[cfg(feature = "encryption")]
let metadata_reader = metadata_reader.with_decryption_properties(
diff --git a/parquet/src/arrow/async_reader/store.rs
b/parquet/src/arrow/async_reader/store.rs
index 5ac21567a1..efb3de0f22 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -199,7 +199,9 @@ impl AsyncFileReader for ParquetObjectReader {
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
+ let metadata_opts = options.map(|o| o.metadata_options().clone());
let mut metadata = ParquetMetaDataReader::new()
+ .with_metadata_options(metadata_opts)
.with_column_index_policy(PageIndexPolicy::from(self.preload_column_index))
.with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index))
.with_prefetch_hint(self.metadata_size_hint);
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 1c8f3e9c69..45b69a6679 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -88,6 +88,7 @@
//! ```
mod footer_tail;
mod memory;
+mod options;
mod parser;
mod push_decoder;
pub(crate) mod reader;
@@ -127,6 +128,7 @@ use crate::{
};
pub use footer_tail::FooterTail;
+pub use options::ParquetMetaDataOptions;
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{PageIndexPolicy, ParquetMetaDataReader};
use std::io::Write;
diff --git a/parquet/src/file/metadata/options.rs
b/parquet/src/file/metadata/options.rs
new file mode 100644
index 0000000000..bbc5314d3a
--- /dev/null
+++ b/parquet/src/file/metadata/options.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Options used to control metadata parsing
+
+use crate::schema::types::SchemaDescPtr;
+
+/// Options that can be set to control what parts of the Parquet file footer
+/// metadata will be decoded and made present in the [`ParquetMetaData`]
returned
+/// by [`ParquetMetaDataReader`] and [`ParquetMetaDataPushDecoder`].
+///
+/// [`ParquetMetaData`]: crate::file::metadata::ParquetMetaData
+/// [`ParquetMetaDataReader`]: crate::file::metadata::ParquetMetaDataReader
+/// [`ParquetMetaDataPushDecoder`]:
crate::file::metadata::ParquetMetaDataPushDecoder
+#[derive(Default, Debug, Clone)]
+pub struct ParquetMetaDataOptions {
+ schema_descr: Option<SchemaDescPtr>,
+}
+
+impl ParquetMetaDataOptions {
+ /// Return a new default [`ParquetMetaDataOptions`].
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Returns an optional [`SchemaDescPtr`] to use when decoding. If this is
not `None` then
+ /// the schema in the footer will be skipped.
+ pub fn schema(&self) -> Option<&SchemaDescPtr> {
+ self.schema_descr.as_ref()
+ }
+
+ /// Provide a schema to use when decoding the metadata.
+ pub fn set_schema(&mut self, val: SchemaDescPtr) {
+ self.schema_descr = Some(val);
+ }
+
+ /// Provide a schema to use when decoding the metadata. Returns `Self` for
chaining.
+ pub fn with_schema(mut self, val: SchemaDescPtr) -> Self {
+ self.schema_descr = Some(val);
+ self
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use bytes::Bytes;
+
+ use crate::{
+ DecodeResult,
+ file::metadata::{ParquetMetaDataOptions, ParquetMetaDataPushDecoder},
+ util::test_common::file_util::get_test_file,
+ };
+ use std::{io::Read, sync::Arc};
+
+ #[test]
+ fn test_provide_schema() {
+ let mut buf: Vec<u8> = Vec::new();
+ get_test_file("alltypes_plain.parquet")
+ .read_to_end(&mut buf)
+ .unwrap();
+
+ let data = Bytes::from(buf);
+ let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as
u64).unwrap();
+ decoder
+ .push_range(0..data.len() as u64, data.clone())
+ .unwrap();
+
+ let expected = match decoder.try_decode().unwrap() {
+ DecodeResult::Data(m) => m,
+ _ => panic!("could not parse metadata"),
+ };
+ let expected_schema = expected.file_metadata().schema_descr_ptr();
+
+ let mut options = ParquetMetaDataOptions::new();
+ options.set_schema(expected_schema);
+ let options = Arc::new(options);
+
+ let mut decoder = ParquetMetaDataPushDecoder::try_new(data.len() as
u64)
+ .unwrap()
+ .with_metadata_options(Some(options));
+ decoder.push_range(0..data.len() as u64, data).unwrap();
+ let metadata = match decoder.try_decode().unwrap() {
+ DecodeResult::Data(m) => m,
+ _ => panic!("could not parse metadata"),
+ };
+
+ assert_eq!(expected, metadata);
+ // the schema pointers should be the same
+ assert!(Arc::ptr_eq(
+ &expected.file_metadata().schema_descr_ptr(),
+ &metadata.file_metadata().schema_descr_ptr()
+ ));
+ }
+}
diff --git a/parquet/src/file/metadata/parser.rs
b/parquet/src/file/metadata/parser.rs
index cb7c67e9bf..9df6bcdd71 100644
--- a/parquet/src/file/metadata/parser.rs
+++ b/parquet/src/file/metadata/parser.rs
@@ -22,7 +22,9 @@
use crate::errors::ParquetError;
use crate::file::metadata::thrift::parquet_metadata_from_bytes;
-use crate::file::metadata::{ColumnChunkMetaData, PageIndexPolicy,
ParquetMetaData};
+use crate::file::metadata::{
+ ColumnChunkMetaData, PageIndexPolicy, ParquetMetaData,
ParquetMetaDataOptions,
+};
use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::index_reader::{decode_column_index,
decode_offset_index};
@@ -51,6 +53,8 @@ mod inner {
pub(crate) struct MetadataParser {
// the credentials and keys needed to decrypt metadata
file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
+ // metadata parsing options
+ metadata_options: Option<Arc<ParquetMetaDataOptions>>,
}
impl MetadataParser {
@@ -66,6 +70,16 @@ mod inner {
self
}
+ pub(crate) fn with_metadata_options(
+ self,
+ options: Option<Arc<ParquetMetaDataOptions>>,
+ ) -> Self {
+ Self {
+ metadata_options: options,
+ ..self
+ }
+ }
+
pub(crate) fn decode_metadata(
&self,
buf: &[u8],
@@ -76,9 +90,10 @@ mod inner {
self.file_decryption_properties.as_ref(),
encrypted_footer,
buf,
+ self.metadata_options.as_deref(),
)
} else {
- decode_metadata(buf)
+ decode_metadata(buf, self.metadata_options.as_deref())
}
}
}
@@ -144,15 +159,28 @@ mod inner {
mod inner {
use super::*;
use crate::errors::Result;
+ use std::sync::Arc;
/// parallel implementation when encryption feature is not enabled
///
/// This has the same API as the encryption-enabled version
#[derive(Debug, Default)]
- pub(crate) struct MetadataParser;
+ pub(crate) struct MetadataParser {
+ // metadata parsing options
+ metadata_options: Option<Arc<ParquetMetaDataOptions>>,
+ }
impl MetadataParser {
pub(crate) fn new() -> Self {
- MetadataParser
+ MetadataParser::default()
+ }
+
+ pub(crate) fn with_metadata_options(
+ self,
+ options: Option<Arc<ParquetMetaDataOptions>>,
+ ) -> Self {
+ Self {
+ metadata_options: options,
+ }
}
pub(crate) fn decode_metadata(
@@ -165,7 +193,7 @@ mod inner {
"Parquet file has an encrypted footer but the encryption
feature is disabled"
))
} else {
- decode_metadata(buf)
+ decode_metadata(buf, self.metadata_options.as_deref())
}
}
}
@@ -198,8 +226,11 @@ mod inner {
/// by the [Parquet Spec].
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
-pub(crate) fn decode_metadata(buf: &[u8]) ->
crate::errors::Result<ParquetMetaData> {
- parquet_metadata_from_bytes(buf)
+pub(crate) fn decode_metadata(
+ buf: &[u8],
+ options: Option<&ParquetMetaDataOptions>,
+) -> crate::errors::Result<ParquetMetaData> {
+ parquet_metadata_from_bytes(buf, options)
}
/// Parses column index from the provided bytes and adds it to the metadata.
diff --git a/parquet/src/file/metadata/push_decoder.rs
b/parquet/src/file/metadata/push_decoder.rs
index 3393007f64..34b7fec2c0 100644
--- a/parquet/src/file/metadata/push_decoder.rs
+++ b/parquet/src/file/metadata/push_decoder.rs
@@ -21,11 +21,12 @@ use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::FOOTER_SIZE;
use crate::file::metadata::parser::{MetadataParser, parse_column_index,
parse_offset_index};
-use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData};
+use crate::file::metadata::{FooterTail, PageIndexPolicy, ParquetMetaData,
ParquetMetaDataOptions};
use crate::file::page_index::index_reader::acc_range;
use crate::file::reader::ChunkReader;
use bytes::Bytes;
use std::ops::Range;
+use std::sync::Arc;
/// A push decoder for [`ParquetMetaData`].
///
@@ -299,6 +300,12 @@ impl ParquetMetaDataPushDecoder {
self
}
+ /// Set the options to use when decoding the Parquet metadata.
+ pub fn with_metadata_options(mut self, options:
Option<Arc<ParquetMetaDataOptions>>) -> Self {
+ self.metadata_parser =
self.metadata_parser.with_metadata_options(options);
+ self
+ }
+
#[cfg(feature = "encryption")]
/// Provide decryption properties for decoding encrypted Parquet files
pub(crate) fn with_file_decryption_properties(
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 091895e659..a18a5e68a9 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -20,9 +20,14 @@ use crate::encryption::decrypt::FileDecryptionProperties;
use crate::errors::{ParquetError, Result};
use crate::file::FOOTER_SIZE;
use crate::file::metadata::parser::decode_metadata;
-use crate::file::metadata::{FooterTail, ParquetMetaData,
ParquetMetaDataPushDecoder};
+use crate::file::metadata::thrift::parquet_schema_from_bytes;
+use crate::file::metadata::{
+ FooterTail, ParquetMetaData, ParquetMetaDataOptions,
ParquetMetaDataPushDecoder,
+};
use crate::file::reader::ChunkReader;
+use crate::schema::types::SchemaDescriptor;
use bytes::Bytes;
+use std::sync::Arc;
use std::{io::Read, ops::Range};
use crate::DecodeResult;
@@ -68,11 +73,12 @@ pub struct ParquetMetaDataReader {
column_index: PageIndexPolicy,
offset_index: PageIndexPolicy,
prefetch_hint: Option<usize>,
+ metadata_options: Option<Arc<ParquetMetaDataOptions>>,
// Size of the serialized thrift metadata plus the 8 byte footer. Only set
if
// `self.parse_metadata` is called.
metadata_size: Option<usize>,
#[cfg(feature = "encryption")]
- file_decryption_properties:
Option<std::sync::Arc<FileDecryptionProperties>>,
+ file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
}
/// Describes the policy for reading page indexes
@@ -158,6 +164,12 @@ impl ParquetMetaDataReader {
self
}
+ /// Sets the [`ParquetMetaDataOptions`] to use when decoding
+ pub fn with_metadata_options(mut self, options:
Option<ParquetMetaDataOptions>) -> Self {
+ self.metadata_options = options.map(Arc::new);
+ self
+ }
+
/// Provide a hint as to the number of bytes needed to fully parse the
[`ParquetMetaData`].
/// Only used for the asynchronous [`Self::try_load()`] method.
///
@@ -355,7 +367,8 @@ impl ParquetMetaDataReader {
let push_decoder =
ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
.with_offset_index_policy(self.offset_index)
- .with_column_index_policy(self.column_index);
+ .with_column_index_policy(self.column_index)
+ .with_metadata_options(self.metadata_options.clone());
let mut push_decoder = self.prepare_push_decoder(push_decoder);
// Get bounds needed for page indexes (if any are present in the file).
@@ -501,7 +514,8 @@ impl ParquetMetaDataReader {
let file_size = u64::MAX;
let push_decoder =
ParquetMetaDataPushDecoder::try_new_with_metadata(file_size, metadata)?
.with_offset_index_policy(self.offset_index)
- .with_column_index_policy(self.column_index);
+ .with_column_index_policy(self.column_index)
+ .with_metadata_options(self.metadata_options.clone());
let mut push_decoder = self.prepare_push_decoder(push_decoder);
// Get bounds needed for page indexes (if any are present in the file).
@@ -751,7 +765,8 @@ impl ParquetMetaDataReader {
let push_decoder =
ParquetMetaDataPushDecoder::try_new_with_footer_tail(file_size,
footer_tail)?
// NOTE: DO NOT enable page indexes here, they are handled
separately
- .with_page_index_policy(PageIndexPolicy::Skip);
+ .with_page_index_policy(PageIndexPolicy::Skip)
+ .with_metadata_options(self.metadata_options.clone());
let mut push_decoder = self.prepare_push_decoder(push_decoder);
push_decoder.push_range(range, buf)?;
@@ -795,7 +810,24 @@ impl ParquetMetaDataReader {
///
/// [Parquet Spec]: https://github.com/apache/parquet-format#metadata
pub fn decode_metadata(buf: &[u8]) -> Result<ParquetMetaData> {
- decode_metadata(buf)
+ decode_metadata(buf, None)
+ }
+
+ /// Decodes [`ParquetMetaData`] from the provided bytes.
+ ///
+ /// Like [`Self::decode_metadata`] but this also accepts
+ /// metadata parsing options.
+ pub fn decode_metadata_with_options(
+ buf: &[u8],
+ options: Option<&ParquetMetaDataOptions>,
+ ) -> Result<ParquetMetaData> {
+ decode_metadata(buf, options)
+ }
+
+ /// Decodes the schema from the Parquet footer in `buf`. Returned as
+ /// a [`SchemaDescriptor`].
+ pub fn decode_schema(buf: &[u8]) -> Result<Arc<SchemaDescriptor>> {
+ Ok(Arc::new(parquet_schema_from_bytes(buf)?))
}
}
diff --git a/parquet/src/file/metadata/thrift/encryption.rs
b/parquet/src/file/metadata/thrift/encryption.rs
index 9744f0f7a6..56c5a6a4b9 100644
--- a/parquet/src/file/metadata/thrift/encryption.rs
+++ b/parquet/src/file/metadata/thrift/encryption.rs
@@ -23,7 +23,7 @@ use crate::{
file::{
column_crypto_metadata::ColumnCryptoMetaData,
metadata::{
- HeapSize, ParquetMetaData, RowGroupMetaData,
+ HeapSize, ParquetMetaData, ParquetMetaDataOptions,
RowGroupMetaData,
thrift::{parquet_metadata_from_bytes, read_column_metadata,
validate_column_metadata},
},
},
@@ -213,6 +213,7 @@ pub(crate) fn parquet_metadata_with_encryption(
file_decryption_properties: Option<&Arc<FileDecryptionProperties>>,
encrypted_footer: bool,
buf: &[u8],
+ options: Option<&ParquetMetaDataOptions>,
) -> Result<ParquetMetaData> {
use crate::file::metadata::ParquetMetaDataBuilder;
@@ -262,7 +263,7 @@ pub(crate) fn parquet_metadata_with_encryption(
}
}
- let parquet_meta = parquet_metadata_from_bytes(buf)
+ let parquet_meta = parquet_metadata_from_bytes(buf, options)
.map_err(|e| general_err!("Could not parse metadata: {}", e))?;
let ParquetMetaData {
diff --git a/parquet/src/file/metadata/thrift/mod.rs
b/parquet/src/file/metadata/thrift/mod.rs
index 1477491096..175a152839 100644
--- a/parquet/src/file/metadata/thrift/mod.rs
+++ b/parquet/src/file/metadata/thrift/mod.rs
@@ -43,8 +43,8 @@ use crate::{
file::{
metadata::{
ColumnChunkMetaData, ColumnChunkMetaDataBuilder, KeyValue,
LevelHistogram,
- PageEncodingStats, ParquetMetaData, RowGroupMetaData,
RowGroupMetaDataBuilder,
- SortingColumn,
+ PageEncodingStats, ParquetMetaData, ParquetMetaDataOptions,
RowGroupMetaData,
+ RowGroupMetaDataBuilder, SortingColumn,
},
statistics::ValueStatistics,
},
@@ -669,9 +669,37 @@ fn read_row_group(
Ok(row_group)
}
+/// Create a [`SchemaDescriptor`] from thrift input. The input buffer must
contain a complete
+/// Parquet footer.
+pub(crate) fn parquet_schema_from_bytes(buf: &[u8]) ->
Result<SchemaDescriptor> {
+ let mut prot = ThriftSliceInputProtocol::new(buf);
+
+ let mut last_field_id = 0i16;
+ loop {
+ let field_ident = prot.read_field_begin(last_field_id)?;
+ if field_ident.field_type == FieldType::Stop {
+ break;
+ }
+ match field_ident.id {
+ 2 => {
+ // read schema and convert to SchemaDescriptor for use when
reading row groups
+ let val = read_thrift_vec::<SchemaElement,
ThriftSliceInputProtocol>(&mut prot)?;
+ let val = parquet_schema_from_array(val)?;
+ return Ok(SchemaDescriptor::new(val));
+ }
+ _ => prot.skip(field_ident.field_type)?,
+ }
+ last_field_id = field_ident.id;
+ }
+ Err(general_err!("Input does not contain a schema"))
+}
+
/// Create [`ParquetMetaData`] from thrift input. Note that this only decodes
the file metadata in
/// the Parquet footer. Page indexes will need to be added later.
-pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) ->
Result<ParquetMetaData> {
+pub(crate) fn parquet_metadata_from_bytes(
+ buf: &[u8],
+ options: Option<&ParquetMetaDataOptions>,
+) -> Result<ParquetMetaData> {
let mut prot = ThriftSliceInputProtocol::new(buf);
// begin reading the file metadata
@@ -689,6 +717,11 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) ->
Result<ParquetMetaData>
// this will need to be set before parsing row groups
let mut schema_descr: Option<Arc<SchemaDescriptor>> = None;
+ // see if we already have a schema.
+ if let Some(options) = options {
+ schema_descr = options.schema().cloned();
+ }
+
// struct FileMetaData {
// 1: required i32 version
// 2: required list<SchemaElement> schema;
@@ -711,10 +744,16 @@ pub(crate) fn parquet_metadata_from_bytes(buf: &[u8]) ->
Result<ParquetMetaData>
version = Some(i32::read_thrift(&mut prot)?);
}
2 => {
- // read schema and convert to SchemaDescriptor for use when
reading row groups
- let val = read_thrift_vec::<SchemaElement,
ThriftSliceInputProtocol>(&mut prot)?;
- let val = parquet_schema_from_array(val)?;
- schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
+ // If schema was passed in, skip parsing it
+ if schema_descr.is_some() {
+ prot.skip(field_ident.field_type)?;
+ } else {
+ // read schema and convert to SchemaDescriptor for use
when reading row groups
+ let val =
+ read_thrift_vec::<SchemaElement,
ThriftSliceInputProtocol>(&mut prot)?;
+ let val = parquet_schema_from_array(val)?;
+ schema_descr = Some(Arc::new(SchemaDescriptor::new(val)));
+ }
}
3 => {
num_rows = Some(i64::read_thrift(&mut prot)?);
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 22bb8ba465..990b2f4f16 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -38,7 +38,7 @@ use crate::parquet_thrift::ThriftSliceInputProtocol;
use crate::parquet_thrift::{ReadThrift, ThriftReadInputProtocol};
use crate::record::Row;
use crate::record::reader::RowIter;
-use crate::schema::types::Type as SchemaType;
+use crate::schema::types::{SchemaDescPtr, Type as SchemaType};
use bytes::Bytes;
use std::collections::VecDeque;
use std::{fs::File, io::Read, path::Path, sync::Arc};
@@ -110,6 +110,7 @@ pub struct ReadOptionsBuilder {
predicates: Vec<ReadGroupPredicate>,
enable_page_index: bool,
props: Option<ReaderProperties>,
+ metadata_options: ParquetMetaDataOptions,
}
impl ReadOptionsBuilder {
@@ -152,6 +153,13 @@ impl ReadOptionsBuilder {
self
}
+ /// Provide a Parquet schema to use when decoding the metadata. The schema
in the Parquet
+ /// footer will be skipped.
+ pub fn with_parquet_schema(mut self, schema: SchemaDescPtr) -> Self {
+ self.metadata_options.set_schema(schema);
+ self
+ }
+
/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
let props = self
@@ -161,18 +169,20 @@ impl ReadOptionsBuilder {
predicates: self.predicates,
enable_page_index: self.enable_page_index,
props,
+ metadata_options: self.metadata_options,
}
}
}
/// A collection of options for reading a Parquet file.
///
-/// Currently, only predicates on row group metadata are supported.
+/// Predicates are currently only supported on row group metadata.
/// All predicates will be chained using 'AND' to filter the row groups.
pub struct ReadOptions {
predicates: Vec<ReadGroupPredicate>,
enable_page_index: bool,
props: ReaderProperties,
+ metadata_options: ParquetMetaDataOptions,
}
impl<R: 'static + ChunkReader> SerializedFileReader<R> {
@@ -193,6 +203,7 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
#[allow(deprecated)]
pub fn new_with_options(chunk_reader: R, options: ReadOptions) ->
Result<Self> {
let mut metadata_builder = ParquetMetaDataReader::new()
+ .with_metadata_options(Some(options.metadata_options.clone()))
.parse_and_finish(&chunk_reader)?
.into_builder();
let mut predicates = options.predicates;
@@ -2697,6 +2708,26 @@ mod tests {
}
}
+ #[test]
+ fn test_reuse_schema() {
+ let file = get_test_file("alltypes_plain.parquet");
+ let file_reader =
SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
+ let schema = file_reader.metadata().file_metadata().schema_descr_ptr();
+ let expected = file_reader.metadata;
+
+ let options = ReadOptionsBuilder::new()
+ .with_parquet_schema(schema)
+ .build();
+ let file_reader = SerializedFileReader::new_with_options(file,
options).unwrap();
+
+ assert_eq!(expected.as_ref(), file_reader.metadata.as_ref());
+ // Should have used the same schema instance
+ assert!(Arc::ptr_eq(
+ &expected.file_metadata().schema_descr_ptr(),
+ &file_reader.metadata.file_metadata().schema_descr_ptr()
+ ));
+ }
+
#[test]
fn test_read_unknown_logical_type() {
let file = get_test_file("unknown-logical-type.parquet");