This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new add8f567c8 Add FileReaderBuilder for arrow-ipc to allow reading large 
no. of column files (#5136)
add8f567c8 is described below

commit add8f567c8331effc0ade02917cbaccda30a830d
Author: Jeffrey <[email protected]>
AuthorDate: Wed Dec 27 00:04:18 2023 +1100

    Add FileReaderBuilder for arrow-ipc to allow reading large no. of column 
files (#5136)
    
    * Add FileReaderBuilder for arrow-ipc
    
    * Switch parameter to not expose flatbuffer types
---
 arrow-ipc/src/reader.rs | 238 ++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 188 insertions(+), 50 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 06e53505fc..39365b7829 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -20,7 +20,7 @@
 //! The `FileReader` and `StreamReader` have similar interfaces,
 //! however the `FileReader` expects a reader that supports `Seek`ing
 
-use flatbuffers::VectorIter;
+use flatbuffers::{VectorIter, VerifierOptions};
 use std::collections::HashMap;
 use std::fmt;
 use std::io::{BufReader, Read, Seek, SeekFrom};
@@ -522,59 +522,78 @@ fn parse_message(buf: &[u8]) -> Result<Message, 
ArrowError> {
         .map_err(|err| ArrowError::ParseError(format!("Unable to get root as 
message: {err:?}")))
 }
 
-/// Arrow File reader
-pub struct FileReader<R: Read + Seek> {
-    /// Buffered file reader that supports reading and seeking
-    reader: R,
+/// Build an Arrow [`FileReader`] with custom options.
+#[derive(Debug)]
+pub struct FileReaderBuilder {
+    /// Optional projection for which columns to load (zero-based column 
indices)
+    projection: Option<Vec<usize>>,
+    /// Passed through to construct [`VerifierOptions`]
+    max_footer_fb_tables: usize,
+    /// Passed through to construct [`VerifierOptions`]
+    max_footer_fb_depth: usize,
+}
 
-    /// The schema that is read from the file header
-    schema: SchemaRef,
+impl Default for FileReaderBuilder {
+    fn default() -> Self {
+        let verifier_options = VerifierOptions::default();
+        Self {
+            max_footer_fb_tables: verifier_options.max_tables,
+            max_footer_fb_depth: verifier_options.max_depth,
+            projection: None,
+        }
+    }
+}
 
-    /// The blocks in the file
+impl FileReaderBuilder {
+    /// Options for creating a new [`FileReader`].
     ///
-    /// A block indicates the regions in the file to read to get data
-    blocks: Vec<crate::Block>,
-
-    /// A counter to keep track of the current block that should be read
-    current_block: usize,
+    /// To convert a builder into a reader, call [`FileReaderBuilder::build`].
+    pub fn new() -> Self {
+        Self::default()
+    }
 
-    /// The total number of blocks, which may contain record batches and other 
types
-    total_blocks: usize,
+    /// Optional projection for which columns to load (zero-based column 
indices).
+    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
 
-    /// Optional dictionaries for each schema field.
+    /// Flatbuffers option for parsing the footer. Controls the max number of 
fields and
+    /// metadata key-value pairs that can be parsed from the schema of the 
footer.
     ///
-    /// Dictionaries may be appended to in the streaming format.
-    dictionaries_by_id: HashMap<i64, ArrayRef>,
-
-    /// Metadata version
-    metadata_version: crate::MetadataVersion,
-
-    /// User defined metadata
-    custom_metadata: HashMap<String, String>,
-
-    /// Optional projection and projected_schema
-    projection: Option<(Vec<usize>, Schema)>,
-}
-
-impl<R: Read + Seek> fmt::Debug for FileReader<R> {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), 
fmt::Error> {
-        f.debug_struct("FileReader<R>")
-            .field("schema", &self.schema)
-            .field("blocks", &self.blocks)
-            .field("current_block", &self.current_block)
-            .field("total_blocks", &self.total_blocks)
-            .field("dictionaries_by_id", &self.dictionaries_by_id)
-            .field("metadata_version", &self.metadata_version)
-            .field("projection", &self.projection)
-            .finish_non_exhaustive()
+    /// By default this is set to `1_000_000` which roughly translates to a 
schema with
+    /// no metadata key-value pairs but 499,999 fields.
+    ///
+    /// This default limit is enforced to protect against malicious files with 
a massive
+    /// amount of flatbuffer tables which could cause a denial of service 
attack.
+    ///
+    /// If you need to ingest a trusted file with a massive number of fields 
and/or
+    /// metadata key-value pairs and are facing the error `"Unable to get root 
as
+    /// footer: TooManyTables"` then increase this parameter as necessary.
+    pub fn with_max_footer_fb_tables(mut self, max_footer_fb_tables: usize) -> 
Self {
+        self.max_footer_fb_tables = max_footer_fb_tables;
+        self
     }
-}
 
-impl<R: Read + Seek> FileReader<R> {
-    /// Try to create a new file reader
+    /// Flatbuffers option for parsing the footer. Controls the max depth for 
schemas with
+    /// nested fields parsed from the footer.
     ///
-    /// Returns errors if the file does not meet the Arrow Format footer 
requirements
-    pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> 
Result<Self, ArrowError> {
+    /// By default this is set to `64` which roughly translates to a schema 
with
+    /// a field nested 60 levels down through other struct fields.
+    ///
+    /// This default limit is enforced to protect against malicious files with 
a extremely
+    /// deep flatbuffer structure which could cause a denial of service attack.
+    ///
+    /// If you need to ingest a trusted file with a deeply nested field and 
are facing the
+    /// error `"Unable to get root as footer: DepthLimitReached"` then 
increase this
+    /// parameter as necessary.
+    pub fn with_max_footer_fb_depth(mut self, max_footer_fb_depth: usize) -> 
Self {
+        self.max_footer_fb_depth = max_footer_fb_depth;
+        self
+    }
+
+    /// Build [`FileReader`] with given reader.
+    pub fn build<R: Read + Seek>(self, mut reader: R) -> Result<FileReader<R>, 
ArrowError> {
         // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
         let mut buffer = [0; 10];
         reader.seek(SeekFrom::End(-10))?;
@@ -594,9 +613,14 @@ impl<R: Read + Seek> FileReader<R> {
         reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
         reader.read_exact(&mut footer_data)?;
 
-        let footer = crate::root_as_footer(&footer_data[..]).map_err(|err| {
-            ArrowError::ParseError(format!("Unable to get root as footer: 
{err:?}"))
-        })?;
+        let verifier_options = VerifierOptions {
+            max_tables: self.max_footer_fb_tables,
+            max_depth: self.max_footer_fb_depth,
+            ..Default::default()
+        };
+        let footer = crate::root_as_footer_with_opts(&verifier_options, 
&footer_data[..]).map_err(
+            |err| ArrowError::ParseError(format!("Unable to get root as 
footer: {err:?}")),
+        )?;
 
         let blocks = footer.recordBatches().ok_or_else(|| {
             ArrowError::ParseError("Unable to get record batches from IPC 
Footer".to_string())
@@ -643,7 +667,7 @@ impl<R: Read + Seek> FileReader<R> {
                 }
             }
         }
-        let projection = match projection {
+        let projection = match self.projection {
             Some(projection_indices) => {
                 let schema = schema.project(&projection_indices)?;
                 Some((projection_indices, schema))
@@ -651,7 +675,7 @@ impl<R: Read + Seek> FileReader<R> {
             _ => None,
         };
 
-        Ok(Self {
+        Ok(FileReader {
             reader,
             schema: Arc::new(schema),
             blocks: blocks.iter().copied().collect(),
@@ -663,6 +687,67 @@ impl<R: Read + Seek> FileReader<R> {
             projection,
         })
     }
+}
+
+/// Arrow File reader
+pub struct FileReader<R: Read + Seek> {
+    /// Buffered file reader that supports reading and seeking
+    reader: R,
+
+    /// The schema that is read from the file header
+    schema: SchemaRef,
+
+    /// The blocks in the file
+    ///
+    /// A block indicates the regions in the file to read to get data
+    blocks: Vec<crate::Block>,
+
+    /// A counter to keep track of the current block that should be read
+    current_block: usize,
+
+    /// The total number of blocks, which may contain record batches and other 
types
+    total_blocks: usize,
+
+    /// Optional dictionaries for each schema field.
+    ///
+    /// Dictionaries may be appended to in the streaming format.
+    dictionaries_by_id: HashMap<i64, ArrayRef>,
+
+    /// Metadata version
+    metadata_version: crate::MetadataVersion,
+
+    /// User defined metadata
+    custom_metadata: HashMap<String, String>,
+
+    /// Optional projection and projected_schema
+    projection: Option<(Vec<usize>, Schema)>,
+}
+
+impl<R: Read + Seek> fmt::Debug for FileReader<R> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), 
fmt::Error> {
+        f.debug_struct("FileReader<R>")
+            .field("schema", &self.schema)
+            .field("blocks", &self.blocks)
+            .field("current_block", &self.current_block)
+            .field("total_blocks", &self.total_blocks)
+            .field("dictionaries_by_id", &self.dictionaries_by_id)
+            .field("metadata_version", &self.metadata_version)
+            .field("projection", &self.projection)
+            .finish_non_exhaustive()
+    }
+}
+
+impl<R: Read + Seek> FileReader<R> {
+    /// Try to create a new file reader
+    ///
+    /// Returns errors if the file does not meet the Arrow Format footer 
requirements
+    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, 
ArrowError> {
+        let builder = FileReaderBuilder {
+            projection,
+            ..Default::default()
+        };
+        builder.build(reader)
+    }
 
     /// Return user defined customized metadata
     pub fn custom_metadata(&self) -> &HashMap<String, String> {
@@ -1622,4 +1707,57 @@ mod tests {
         .unwrap();
         assert_eq!(batch, roundtrip);
     }
+
+    #[test]
+    fn test_file_with_massive_column_count() {
+        // 499_999 is upper limit for default settings (1_000_000)
+        let limit = 600_000;
+
+        let fields = (0..limit)
+            .map(|i| Field::new(format!("{i}"), DataType::Boolean, false))
+            .collect::<Vec<_>>();
+        let schema = Arc::new(Schema::new(fields));
+        let batch = RecordBatch::new_empty(schema);
+
+        let mut buf = Vec::new();
+        let mut writer = crate::writer::FileWriter::try_new(&mut buf, 
&batch.schema()).unwrap();
+        writer.write(&batch).unwrap();
+        writer.finish().unwrap();
+        drop(writer);
+
+        let mut reader = FileReaderBuilder::new()
+            .with_max_footer_fb_tables(1_500_000)
+            .build(std::io::Cursor::new(buf))
+            .unwrap();
+        let roundtrip_batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(batch, roundtrip_batch);
+    }
+
+    #[test]
+    fn test_file_with_deeply_nested_columns() {
+        // 60 is upper limit for default settings (64)
+        let limit = 61;
+
+        let fields = (0..limit).fold(
+            vec![Field::new("leaf", DataType::Boolean, false)],
+            |field, index| vec![Field::new_struct(format!("{index}"), field, 
false)],
+        );
+        let schema = Arc::new(Schema::new(fields));
+        let batch = RecordBatch::new_empty(schema);
+
+        let mut buf = Vec::new();
+        let mut writer = crate::writer::FileWriter::try_new(&mut buf, 
&batch.schema()).unwrap();
+        writer.write(&batch).unwrap();
+        writer.finish().unwrap();
+        drop(writer);
+
+        let mut reader = FileReaderBuilder::new()
+            .with_max_footer_fb_depth(65)
+            .build(std::io::Cursor::new(buf))
+            .unwrap();
+        let roundtrip_batch = reader.next().unwrap().unwrap();
+
+        assert_eq!(batch, roundtrip_batch);
+    }
 }

Reply via email to