gabotechs commented on code in PR #10044:
URL: https://github.com/apache/arrow-rs/pull/10044#discussion_r3341395598


##########
arrow-ipc/src/writer.rs:
##########
@@ -2111,33 +2294,33 @@ fn write_array_data(
 /// follows is not compressed, which can be useful for cases where
 /// compression does not yield appreciable savings.
 fn write_buffer(
-    buffer: &[u8],                    // input
-    buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
-    arrow_data: &mut Vec<u8>,         // output stream
-    offset: i64,                      // current output stream offset
+    buffer: Buffer,                           // input array buffer to encode
+    buffers: &mut Vec<crate::Buffer>, // IPC buffer metadata (offset + length) 
for the FlatBuffer message
+    encoded_buffers: &mut Vec<EncodedBuffer>, // accumulated encoded segments, 
written to output after the message header
+    offset: i64,                              // current output stream offset
     compression_codec: Option<CompressionCodec>,

Review Comment:
   I see this function is no longer writing the buffers, instead of writing 
them, it's collecting them into `encoded_buffers`. Given this new scenario, I 
think we need to do two things:
   1. change the name of this function to something like 
`collect_encoded_buffers`
   2. Update the doc comment, which still mentions writing to `arrow_data`



##########
arrow-ipc/src/writer.rs:
##########
@@ -1165,24 +1364,24 @@ impl<W: Write> FileWriter<W> {
             ));
         }
 
-        let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
+        let (dict_sizes, (meta, data)) = self.data_gen.write_batch_direct(

Review Comment:
   The two last `(meta, data)` fields returned by `write_batch_direct` are 
named (aligned_size, body_len) in that function.
   
   Is this correct? not sure if this is just a naming thing, but it's hard to 
know if this is correct given the different naming. Is `meta == aligned_size` 
and `data == body_len`? they sound like completely different things.



##########
arrow-ipc/src/writer.rs:
##########
@@ -69,7 +69,39 @@ pub struct IpcWriteOptions {
     /// How to handle updating dictionaries in IPC messages
     dictionary_handling: DictionaryHandling,
 }
+/// Return type for [`IpcDataGenerator::write_batch_direct`]: `(dict_sizes, 
batch_sizes)` where
+/// each element is `(ipc_metadata_bytes, body_bytes)`.
+///
+/// [`FileWriter`] uses these sizes to build the Block index entries required 
by the IPC
+/// footer for random-access reads.
+type IPCMetadata = Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError>;

Review Comment:
   It's pretty hard to know what's all of this without a proper structure with 
proper fields. How about having a proper struct for this?
   
   For example:
   ```rust
   struct IPCMetadata {
       dict_sizes: Vec<(usize, usize)>,
       aligned_size: usize,
       body_len: usize,
   }
   ```
   
   Maybe there are better names for the different fields



##########
arrow-ipc/src/writer.rs:
##########
@@ -693,19 +734,177 @@ impl IpcDataGenerator {
             let mut message_builder = crate::MessageBuilder::new(&mut fbb);
             message_builder.add_version(write_options.metadata_version);
             
message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
-            message_builder.add_bodyLength(arrow_data.len() as i64);
+            message_builder.add_bodyLength(body_len as i64);
             message_builder.add_header(root);
             message_builder.finish()
         };
 
         fbb.finish(root, None);
         let finished_data = fbb.finished_data();
 
+        let mut arrow_data: Vec<u8> = Vec::with_capacity(body_len);
+        for encoded in &encoded_buffers {
+            arrow_data.extend_from_slice(encoded.as_slice());
+            arrow_data.extend_from_slice(
+                &PADDING[..pad_to_alignment(write_options.alignment, 
encoded.len())],
+            );
+        }
+        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
+
         Ok(EncodedData {
             ipc_message: finished_data.to_vec(),
             arrow_data,
         })
     }
+
+    /// Write dictionaries and record batch's directly to `writer`, skipping 
the
+    /// intermediate `arrow_data: Vec<u8>` accumulator used by 
[`Self::record_batch_to_bytes`].
+    ///
+    /// For the uncompressed path each array buffer is held as an Arc-backed 
slice and
+    /// written straight to `writer`. one copy instead of two.  For the 
compressed path
+    /// each buffer is compressed into a per-buffer scratch `Vec<u8>` and 
written from
+    /// there, eliminating the extra copy that `write_buffer` -> `arrow_data` 
->
+    /// `write_body_buffers` would otherwise incur.
+    fn write_batch_direct<W: Write>(

Review Comment:
   I see most contents of this function are essentially copy-pastes from 
`record_batch_to_bytes`, duplication seems too much here. Is there any chance 
to:
   - Completely replace `record_batch_to_bytes` and keep just a single function 
for writing batches in IPC format
   - Factoring out some ergonomic helpers that could be reused in both 
functions?
   
   Also, it seems like the `.encode()` method and the new 
`.write_batch_direct()` are both doing the same thing with slightly different 
ergonomics. Do you see any opportunity to collapse them into just 1 method?
   
   This file is overall pretty bloated with complex logic and a relatively 
arbitrary separation of concerns between methods, the more we can do for 
debloating it the better it will be for future maintainers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to