This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new add8f567c8 Add FileReaderBuilder for arrow-ipc to allow reading large
no. of column files (#5136)
add8f567c8 is described below
commit add8f567c8331effc0ade02917cbaccda30a830d
Author: Jeffrey <[email protected]>
AuthorDate: Wed Dec 27 00:04:18 2023 +1100
Add FileReaderBuilder for arrow-ipc to allow reading large no. of column
files (#5136)
* Add FileReaderBuilder for arrow-ipc
* Switch parameter to not expose flatbuffer types
---
arrow-ipc/src/reader.rs | 238 ++++++++++++++++++++++++++++++++++++++----------
1 file changed, 188 insertions(+), 50 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 06e53505fc..39365b7829 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -20,7 +20,7 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
-use flatbuffers::VectorIter;
+use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
@@ -522,59 +522,78 @@ fn parse_message(buf: &[u8]) -> Result<Message,
ArrowError> {
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as
message: {err:?}")))
}
-/// Arrow File reader
-pub struct FileReader<R: Read + Seek> {
- /// Buffered file reader that supports reading and seeking
- reader: R,
+/// Build an Arrow [`FileReader`] with custom options.
+#[derive(Debug)]
+pub struct FileReaderBuilder {
+ /// Optional projection for which columns to load (zero-based column
indices)
+ projection: Option<Vec<usize>>,
+ /// Passed through to construct [`VerifierOptions`]
+ max_footer_fb_tables: usize,
+ /// Passed through to construct [`VerifierOptions`]
+ max_footer_fb_depth: usize,
+}
- /// The schema that is read from the file header
- schema: SchemaRef,
+impl Default for FileReaderBuilder {
+ fn default() -> Self {
+ let verifier_options = VerifierOptions::default();
+ Self {
+ max_footer_fb_tables: verifier_options.max_tables,
+ max_footer_fb_depth: verifier_options.max_depth,
+ projection: None,
+ }
+ }
+}
- /// The blocks in the file
+impl FileReaderBuilder {
+ /// Options for creating a new [`FileReader`].
///
- /// A block indicates the regions in the file to read to get data
- blocks: Vec<crate::Block>,
-
- /// A counter to keep track of the current block that should be read
- current_block: usize,
+ /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
+ pub fn new() -> Self {
+ Self::default()
+ }
- /// The total number of blocks, which may contain record batches and other
types
- total_blocks: usize,
+ /// Optional projection for which columns to load (zero-based column
indices).
+ pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+ self.projection = Some(projection);
+ self
+ }
- /// Optional dictionaries for each schema field.
+ /// Flatbuffers option for parsing the footer. Controls the max number of
fields and
+ /// metadata key-value pairs that can be parsed from the schema of the
footer.
///
- /// Dictionaries may be appended to in the streaming format.
- dictionaries_by_id: HashMap<i64, ArrayRef>,
-
- /// Metadata version
- metadata_version: crate::MetadataVersion,
-
- /// User defined metadata
- custom_metadata: HashMap<String, String>,
-
- /// Optional projection and projected_schema
- projection: Option<(Vec<usize>, Schema)>,
-}
-
-impl<R: Read + Seek> fmt::Debug for FileReader<R> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(),
fmt::Error> {
- f.debug_struct("FileReader<R>")
- .field("schema", &self.schema)
- .field("blocks", &self.blocks)
- .field("current_block", &self.current_block)
- .field("total_blocks", &self.total_blocks)
- .field("dictionaries_by_id", &self.dictionaries_by_id)
- .field("metadata_version", &self.metadata_version)
- .field("projection", &self.projection)
- .finish_non_exhaustive()
+ /// By default this is set to `1_000_000` which roughly translates to a
schema with
+ /// no metadata key-value pairs but 499,999 fields.
+ ///
+ /// This default limit is enforced to protect against malicious files with
a massive
+ /// amount of flatbuffer tables which could cause a denial of service
attack.
+ ///
+ /// If you need to ingest a trusted file with a massive number of fields
and/or
+ /// metadata key-value pairs and are facing the error `"Unable to get root
as
+ /// footer: TooManyTables"` then increase this parameter as necessary.
+ pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) ->
Self {
+ self.max_footer_fb_tables = max_footer_fb_tables;
+ self
}
-}
-impl<R: Read + Seek> FileReader<R> {
- /// Try to create a new file reader
+ /// Flatbuffers option for parsing the footer. Controls the max depth for
schemas with
+ /// nested fields parsed from the footer.
///
- /// Returns errors if the file does not meet the Arrow Format footer
requirements
- pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) ->
Result<Self, ArrowError> {
+ /// By default this is set to `64` which roughly translates to a schema
with
+ /// a field nested 60 levels down through other struct fields.
+ ///
+ /// This default limit is enforced to protect against malicious files with
a extremely
+ /// deep flatbuffer structure which could cause a denial of service attack.
+ ///
+ /// If you need to ingest a trusted file with a deeply nested field and
are facing the
+ /// error `"Unable to get root as footer: DepthLimitReached"` then
increase this
+ /// parameter as necessary.
+ pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) ->
Self {
+ self.max_footer_fb_depth = max_footer_fb_depth;
+ self
+ }
+
+ /// Build [`FileReader`] with given reader.
+ pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>,
ArrowError> {
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
let mut buffer = [0; 10];
reader.seek(SeekFrom::End(-10))?;
@@ -594,9 +613,14 @@ impl<R: Read + Seek> FileReader<R> {
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
- let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
- ArrowError::ParseError(format!("Unable to get root as footer:
{err:?}"))
- })?;
+ let verifier_options = VerifierOptions {
+ max_tables: self.max_footer_fb_tables,
+ max_depth: self.max_footer_fb_depth,
+ ..Default::default()
+ };
+ let footer = crate::root_as_footer_with_opts(&verifier_options,
&footer_data[..]).map_err(
+ |err| ArrowError::ParseError(format!("Unable to get root as
footer: {err:?}")),
+ )?;
let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::ParseError("Unable to get record batches from IPC
Footer".to_string())
@@ -643,7 +667,7 @@ impl<R: Read + Seek> FileReader<R> {
}
}
}
- let projection = match projection {
+ let projection = match self.projection {
Some(projection_indices) => {
let schema = schema.project(&projection_indices)?;
Some((projection_indices, schema))
@@ -651,7 +675,7 @@ impl<R: Read + Seek> FileReader<R> {
_ => None,
};
- Ok(Self {
+ Ok(FileReader {
reader,
schema: Arc::new(schema),
blocks: blocks.iter().copied().collect(),
@@ -663,6 +687,67 @@ impl<R: Read + Seek> FileReader<R> {
projection,
})
}
+}
+
+/// Arrow File reader
+pub struct FileReader<R: Read + Seek> {
+ /// Buffered file reader that supports reading and seeking
+ reader: R,
+
+ /// The schema that is read from the file header
+ schema: SchemaRef,
+
+ /// The blocks in the file
+ ///
+ /// A block indicates the regions in the file to read to get data
+ blocks: Vec<crate::Block>,
+
+ /// A counter to keep track of the current block that should be read
+ current_block: usize,
+
+ /// The total number of blocks, which may contain record batches and other
types
+ total_blocks: usize,
+
+ /// Optional dictionaries for each schema field.
+ ///
+ /// Dictionaries may be appended to in the streaming format.
+ dictionaries_by_id: HashMap<i64, ArrayRef>,
+
+ /// Metadata version
+ metadata_version: crate::MetadataVersion,
+
+ /// User defined metadata
+ custom_metadata: HashMap<String, String>,
+
+ /// Optional projection and projected_schema
+ projection: Option<(Vec<usize>, Schema)>,
+}
+
+impl<R: Read + Seek> fmt::Debug for FileReader<R> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(),
fmt::Error> {
+ f.debug_struct("FileReader<R>")
+ .field("schema", &self.schema)
+ .field("blocks", &self.blocks)
+ .field("current_block", &self.current_block)
+ .field("total_blocks", &self.total_blocks)
+ .field("dictionaries_by_id", &self.dictionaries_by_id)
+ .field("metadata_version", &self.metadata_version)
+ .field("projection", &self.projection)
+ .finish_non_exhaustive()
+ }
+}
+
+impl<R: Read + Seek> FileReader<R> {
+ /// Try to create a new file reader
+ ///
+ /// Returns errors if the file does not meet the Arrow Format footer
requirements
+ pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self,
ArrowError> {
+ let builder = FileReaderBuilder {
+ projection,
+ ..Default::default()
+ };
+ builder.build(reader)
+ }
/// Return user defined customized metadata
pub fn custom_metadata(&self) -> &HashMap<String, String> {
@@ -1622,4 +1707,57 @@ mod tests {
.unwrap();
assert_eq!(batch, roundtrip);
}
+
+ #[test]
+ fn test_file_with_massive_column_count() {
+ // 499_999 is upper limit for default settings (1_000_000)
+ let limit = 600_000;
+
+ let fields = (0..limit)
+ .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
+ .collect::<Vec<_>>();
+ let schema = Arc::new(Schema::new(fields));
+ let batch = RecordBatch::new_empty(schema);
+
+ let mut buf = Vec::new();
+ let mut writer = crate::writer::FileWriter::try_new(&mut buf,
&batch.schema()).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+ drop(writer);
+
+ let mut reader = FileReaderBuilder::new()
+ .with_max_footer_fb_tables(1_500_000)
+ .build(std::io::Cursor::new(buf))
+ .unwrap();
+ let roundtrip_batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(batch, roundtrip_batch);
+ }
+
+ #[test]
+ fn test_file_with_deeply_nested_columns() {
+ // 60 is upper limit for default settings (64)
+ let limit = 61;
+
+ let fields = (0..limit).fold(
+ vec![Field::new("leaf", DataType::Boolean, false)],
+ |field, index| vec![Field::new_struct(format!("{index}"), field,
false)],
+ );
+ let schema = Arc::new(Schema::new(fields));
+ let batch = RecordBatch::new_empty(schema);
+
+ let mut buf = Vec::new();
+ let mut writer = crate::writer::FileWriter::try_new(&mut buf,
&batch.schema()).unwrap();
+ writer.write(&batch).unwrap();
+ writer.finish().unwrap();
+ drop(writer);
+
+ let mut reader = FileReaderBuilder::new()
+ .with_max_footer_fb_depth(65)
+ .build(std::io::Cursor::new(buf))
+ .unwrap();
+ let roundtrip_batch = reader.next().unwrap().unwrap();
+
+ assert_eq!(batch, roundtrip_batch);
+ }
}