alamb commented on code in PR #9971:
URL: https://github.com/apache/arrow-rs/pull/9971#discussion_r3244185492


##########
arrow-ipc/src/reader.rs:
##########
@@ -72,6 +72,58 @@ fn read_buffer(
         }
     }
 }
+
+/// Source for IPC body buffers.
+///
+/// Most decode paths use an already materialized [`Buffer`] and slice into it
+/// using the offsets from the IPC metadata. Keeping this behind a small helper
+/// lets typed buffer reads share the same bounds handling as regular buffer
+/// reads.
+enum IpcBufferSource<'a> {
+    Buffer(&'a Buffer),
+}

Review Comment:
   Stylistically if you want to make a wrapper type I think a more common 
pattern is a an "new type" struct -- something like
   
   ```rust
   struct IpcBufferSource<'a>(&'a Buffer)
   ```
   
   That way you can still define methods on it, but you don't have to use 
`match` all over the place - you can just refer to the inner field as `.0`
   



##########
arrow-ipc/src/reader.rs:
##########
@@ -72,6 +72,58 @@ fn read_buffer(
         }
     }
 }
+
+/// Source for IPC body buffers.
+///
+/// Most decode paths use an already materialized [`Buffer`] and slice into it
+/// using the offsets from the IPC metadata. Keeping this behind a small helper
+/// lets typed buffer reads share the same bounds handling as regular buffer
+/// reads.
+enum IpcBufferSource<'a> {
+    Buffer(&'a Buffer),
+}
+
+impl<'a> IpcBufferSource<'a> {
+    fn read_buffer(
+        &self,
+        buf: &crate::Buffer,
+        compression: Option<CompressionCodec>,
+        decompression_context: &mut DecompressionContext,
+    ) -> Result<Buffer, ArrowError> {
+        match self {
+            Self::Buffer(data) => read_buffer(buf, data, compression, 
decompression_context),
+        }
+    }
+    /// Reads a physical IPC buffer that is expected to contain `len` values of
+    /// type `T`.
+    ///
+    /// The returned value is still a [`Buffer`], not a [`ScalarBuffer<T>`]. 
This
+    /// preserves the existing alignment behavior: properly aligned buffers 
remain
+    /// zero-copy, while unaligned buffers are still handled later by
+    /// `ArrayDataBuilder::align_buffers` according to `require_alignment`.
+    fn read_typed_buffer<T: ArrowNativeType>(
+        &self,
+        buf: &crate::Buffer,
+        len: usize,
+        compression: Option<CompressionCodec>,
+        decompression_context: &mut DecompressionContext,
+    ) -> Result<Buffer, ArrowError> {
+        let byte_len = len

Review Comment:
   I found the naming a little confusing here as there are three buffers
   
   I found this a little confusing as there three buffers -- `self::Buffer`, 
`buf` (the input buffer) and the output `buffer`
   
   Can we maybe name them something more specific to distinguish them? I am in 
particular confused about the buffer in self and the one passed in via the 
argument



##########
arrow-ipc/src/reader.rs:
##########
@@ -902,16 +1107,29 @@ fn get_dictionary_values(
     Ok(dictionary_values)
 }
 
-/// Read the data for a given block
+/// Read the data for a given IPC file block.
+///
+/// The returned buffer is fully initialized by the reader. This avoids first
+/// zero-filling the allocation and then immediately overwriting it with block
+/// data.
 fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, 
ArrowError> {
     reader.seek(SeekFrom::Start(block.offset() as u64))?;
     let body_len = block.bodyLength().to_usize().unwrap();
     let metadata_len = block.metaDataLength().to_usize().unwrap();
     let total_len = body_len.checked_add(metadata_len).unwrap();
 
-    let mut buf = MutableBuffer::from_len_zeroed(total_len);
-    reader.read_exact(&mut buf)?;
-    Ok(buf.into())
+    let mut buf = Vec::with_capacity(total_len);
+    reader
+        .by_ref()
+        .take(total_len as u64)
+        .read_to_end(&mut buf)?;
+    if buf.len() != total_len {
+        return Err(ArrowError::IpcError(format!(
+            "Expected IPC block of length {total_len}, got {}",
+            buf.len()
+        )));
+    }
+    Ok(Buffer::from_vec(buf))

Review Comment:
   I think this Vec and the one below (still) has no guaranteed alignment. 
   



##########
arrow-ipc/src/reader.rs:
##########
@@ -615,14 +758,76 @@ impl<'a> RecordBatchDecoder<'a> {
         let buffer = self.buffers.next().ok_or_else(|| {
             ArrowError::IpcError("Buffer count mismatched with 
metadata".to_string())
         })?;
-        read_buffer(
+        self.data
+            .read_buffer(buffer, self.compression, &mut 
self.decompression_context)
+    }
+    /// Advances to the next IPC buffer and trims it to the expected physical
+    /// length for `len` values of `T`.
+    ///
+    /// This keeps typed buffer length handling in one place while leaving 
final
+    /// array construction on the existing `ArrayDataBuilder` path.
+    fn next_typed_buffer<T: ArrowNativeType>(&mut self, len: usize) -> 
Result<Buffer, ArrowError> {
+        let buffer = self.buffers.next().ok_or_else(|| {
+            ArrowError::IpcError("Buffer count mismatched with 
metadata".to_string())
+        })?;
+
+        self.data.read_typed_buffer::<T>(

Review Comment:
   this part looks good, so far



##########
arrow-ipc/src/reader.rs:
##########
@@ -72,6 +72,58 @@ fn read_buffer(
         }
     }
 }
+
+/// Source for IPC body buffers.
+///
+/// Most decode paths use an already materialized [`Buffer`] and slice into it
+/// using the offsets from the IPC metadata. Keeping this behind a small helper
+/// lets typed buffer reads share the same bounds handling as regular buffer
+/// reads.
+enum IpcBufferSource<'a> {
+    Buffer(&'a Buffer),
+}
+
+impl<'a> IpcBufferSource<'a> {
+    fn read_buffer(
+        &self,
+        buf: &crate::Buffer,
+        compression: Option<CompressionCodec>,
+        decompression_context: &mut DecompressionContext,
+    ) -> Result<Buffer, ArrowError> {
+        match self {
+            Self::Buffer(data) => read_buffer(buf, data, compression, 
decompression_context),
+        }
+    }
+    /// Reads a physical IPC buffer that is expected to contain `len` values of
+    /// type `T`.
+    ///
+    /// The returned value is still a [`Buffer`], not a [`ScalarBuffer<T>`]. 
This
+    /// preserves the existing alignment behavior: properly aligned buffers 
remain
+    /// zero-copy, while unaligned buffers are still handled later by
+    /// `ArrayDataBuilder::align_buffers` according to `require_alignment`.
+    fn read_typed_buffer<T: ArrowNativeType>(
+        &self,
+        buf: &crate::Buffer,
+        len: usize,
+        compression: Option<CompressionCodec>,
+        decompression_context: &mut DecompressionContext,
+    ) -> Result<Buffer, ArrowError> {
+        let byte_len = len
+            .checked_mul(std::mem::size_of::<T>())
+            .ok_or_else(|| ArrowError::IpcError("Buffer length 
overflow".to_string()))?;
+
+        let buffer = self.read_buffer(buf, compression, 
decompression_context)?;
+        // Some invalid or legacy IPC inputs may contain shorter buffers than
+        // implied by the schema. Preserve the existing behavior and let array
+        // construction/validation report the error.
+        if buffer.len() <= byte_len || 
buffer.as_ptr().align_offset(std::mem::align_of::<T>()) != 0

Review Comment:
   This doens't seem right -- our change should ensure that the buffer is 
always aligned correctly to the otutput requirements. The fact you have to 
check afterwards suggests this is not the case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to