viirya commented on code in PR #5249:
URL: https://github.com/apache/arrow-rs/pull/5249#discussion_r1438444328


##########
arrow-ipc/src/reader.rs:
##########
@@ -522,6 +522,174 @@ fn parse_message(buf: &[u8]) -> Result<Message, 
ArrowError> {
         .map_err(|err| ArrowError::ParseError(format!("Unable to get root as 
message: {err:?}")))
 }
 
+/// Read the footer length from the last 10 bytes of a file
+///
+/// Expects a 4 byte footer length followed by `b"ARROW1"`
+pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, ArrowError> {
+    if buf[4..] != super::ARROW_MAGIC {
+        return Err(ArrowError::ParseError(
+            "Arrow file does not contain correct footer".to_string(),
+        ));
+    }
+
+    // read footer length
+    let footer_len = i32::from_le_bytes(buf[..4].try_into().unwrap());
+    footer_len
+        .try_into()
+        .map_err(|_| ArrowError::ParseError(format!("Invalid footer length: 
{footer_len}")))
+}
+
+/// A low-level, push-based interface for reading an IPC file
+///
+/// For a higher-level interface see [`FileReader`]
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use arrow_array::*;
+/// # use arrow_array::types::Int32Type;
+/// # use arrow_buffer::Buffer;
+/// # use arrow_ipc::convert::fb_to_schema;
+/// # use arrow_ipc::reader::{FileDecoder, read_footer_length};
+/// # use arrow_ipc::root_as_footer;
+/// # use arrow_ipc::writer::FileWriter;
+/// // Write an IPC file
+///
+/// let batch = RecordBatch::try_from_iter([
+///     ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
+///     ("b", Arc::new(Int32Array::from(vec![1, 2, 3])) as _),
+///     ("c", Arc::new(DictionaryArray::<Int32Type>::from_iter(["hello", 
"hello", "world"])) as _),
+/// ]).unwrap();
+///
+/// let schema = batch.schema();
+///
+/// let mut out = Vec::with_capacity(1024);
+/// let mut writer = FileWriter::try_new(&mut out, schema.as_ref()).unwrap();
+/// writer.write(&batch).unwrap();
+/// writer.finish().unwrap();
+///
+/// drop(writer);
+///
+/// // Read IPC file
+///
+/// let buffer = Buffer::from_vec(out);
+/// let trailer_start = buffer.len() - 10;
+/// let footer_len = 
read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
+/// let footer = root_as_footer(&buffer[trailer_start - 
footer_len..trailer_start]).unwrap();
+///
+/// let back = fb_to_schema(footer.schema().unwrap());
+/// assert_eq!(&back, schema.as_ref());
+///
+/// let mut decoder = FileDecoder::new(schema, footer.version());
+///
+/// // Read dictionaries
+/// for block in footer.dictionaries().iter().flatten() {
+///     let block_len = block.bodyLength() as usize + block.metaDataLength() 
as usize;
+///     let data = buffer.slice_with_length(block.offset() as _, block_len);
+///     decoder.read_dictionary(&block, &data).unwrap();
+/// }
+///
+/// // Read record batch
+/// let batches = footer.recordBatches().unwrap();
+/// assert_eq!(batches.len(), 1); // Only wrote a single batch
+///
+/// let block = batches.get(0);
+/// let block_len = block.bodyLength() as usize + block.metaDataLength() as 
usize;
+/// let data = buffer.slice_with_length(block.offset() as _, block_len);
+/// let back = decoder.read_record_batch(block, &data).unwrap().unwrap();
+///
+/// assert_eq!(batch, back);
+/// ```
+#[derive(Debug)]
+pub struct FileDecoder {
+    schema: SchemaRef,
+    dictionaries: HashMap<i64, ArrayRef>,
+    version: MetadataVersion,
+    projection: Option<Vec<usize>>,
+}
+
+impl FileDecoder {
+    /// Create a new [`FileDecoder`] with the given schema and version
+    pub fn new(schema: SchemaRef, version: MetadataVersion) -> Self {
+        Self {
+            schema,
+            version,
+            dictionaries: Default::default(),
+            projection: None,
+        }
+    }
+
+    /// Specify a projection
+    pub fn with_projection(mut self, projection: Vec<usize>) -> Self {
+        self.projection = Some(projection);
+        self
+    }
+
+    fn read_message<'a>(&self, buf: &'a [u8]) -> Result<Message<'a>, 
ArrowError> {
+        let message = parse_message(buf)?;
+
+        // some old test data's footer metadata is not set, so we account for 
that
+        if self.version != MetadataVersion::V1 && message.version() != 
self.version {
+            return Err(ArrowError::IpcError(
+                "Could not read IPC message as metadata versions 
mismatch".to_string(),
+            ));
+        }
+        Ok(message)
+    }
+
+    /// Read the dictionary with the given block and data buffer
+    pub fn read_dictionary(&mut self, block: &Block, buf: &Buffer) -> 
Result<(), ArrowError> {
+        let message = self.read_message(buf)?;

Review Comment:
   Not bit deal but seems we didn't check for metadata version for 
`DictionaryBatch` before. Maybe it was missed before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to