gabotechs commented on code in PR #10044:
URL: https://github.com/apache/arrow-rs/pull/10044#discussion_r3378522327


##########
arrow-ipc/src/writer.rs:
##########
@@ -474,27 +523,116 @@ impl IpcDataGenerator {
         write_options: &IpcWriteOptions,
         compression_context: &mut CompressionContext,
     ) -> Result<(Vec<EncodedData>, EncodedData), ArrowError> {
+        let encoded_dictionaries = self.encode_all_dicts(
+            batch,
+            dictionary_tracker,
+            write_options,
+            compression_context,
+        )?;
+        let mut arrow_data = Vec::new();
+        let (ipc_message, _, tail_pad) = self.record_batch_to_bytes(
+            batch,
+            write_options,
+            compression_context,
+            &mut BufferSink::Write(&mut arrow_data),
+        )?;
+        arrow_data.extend_from_slice(&PADDING[..tail_pad]);
+        Ok((
+            encoded_dictionaries,
+            EncodedData {
+                ipc_message,
+                arrow_data,
+            },
+        ))
+    }
+
+    /// Encode dictionary batches for all columns in `batch`.
+    fn encode_all_dicts(
+        &self,
+        batch: &RecordBatch,
+        dictionary_tracker: &mut DictionaryTracker,
+        write_options: &IpcWriteOptions,
+        compression_context: &mut CompressionContext,
+    ) -> Result<Vec<EncodedData>, ArrowError> {
         let schema = batch.schema();
         let mut encoded_dictionaries = 
Vec::with_capacity(schema.flattened_fields().len());
-
         let mut dict_id = dictionary_tracker.dict_ids.clone().into_iter();
-
         for (i, field) in schema.fields().iter().enumerate() {
-            let column = batch.column(i);
             self.encode_dictionaries(
                 field,
-                column,
+                batch.column(i),
                 &mut encoded_dictionaries,
                 dictionary_tracker,
                 write_options,
                 &mut dict_id,
                 compression_context,
             )?;
         }
+        Ok(encoded_dictionaries)
+    }
 
-        let encoded_message =
-            self.record_batch_to_bytes(batch, write_options, 
compression_context)?;
-        Ok((encoded_dictionaries, encoded_message))
+    /// Write dictionary batches and the record batch directly to `writer`, 
skipping the
+    /// intermediate body `Vec<u8>` allocations
+    /// Returns [`IpcWriteMetadata`] with the sizes needed to build footer 
blocks.
+    fn write_direct<W: Write>(

Review Comment:
   The fact that this is named `write_direct` makes me think that it's a 
variant of another method called `write()` also present in this struct, but it 
does not seem to be the case.
   
   Do you see any drawbacks in calling this just `.write()`?



##########
arrow-ipc/src/writer.rs:
##########
@@ -515,22 +653,20 @@ impl IpcDataGenerator {
         )
     }
 
-    /// Write a `RecordBatch` into two sets of bytes, one for the header 
(crate::Message) and the
-    /// other for the batch's data
+    /// Encodes a `RecordBatch` into a flatbuffer IPC message and fills `sink` 
with the
+    /// serialised buffer data.
     fn record_batch_to_bytes(
         &self,
         batch: &RecordBatch,
         write_options: &IpcWriteOptions,
         compression_context: &mut CompressionContext,
-    ) -> Result<EncodedData, ArrowError> {
+        sink: &mut BufferSink<'_>,
+    ) -> Result<(Vec<u8>, usize, usize), ArrowError> {

Review Comment:
   It's a bit hard to infer what are all these just from reading ath the 
function signature.
   
   If you think it's not worth it to encapsulate the `(Vec<u8>, usize, usize)` 
return value into a struct with nice field names, I'd at least extend the doc 
comments of the function stating what each position in the tuple is supposed to 
represent.



##########
arrow-ipc/src/writer.rs:
##########
@@ -1807,7 +1930,7 @@ fn get_or_truncate_buffer(array_data: &ArrayData) -> 
&[u8] {
 fn write_array_data(
     array_data: &ArrayData,
     buffers: &mut Vec<crate::Buffer>,
-    arrow_data: &mut Vec<u8>,
+    sink: &mut BufferSink<'_>,
     nodes: &mut Vec<crate::FieldNode>,

Review Comment:
   This is looking very good!
   
   There's one thing that might confuse readers though:
   
   Here, we are passing these two bits:
   
   ```rust
       buffers: &mut Vec<crate::Buffer>,
       sink: &mut BufferSink<'_>,
   ```
   
   Which at first sight, it raises some questions:
   - Why do we need to pass two "buffer sinks"? isn't `buffers` already a sink 
for `crate::Buffer` objects?
   - Why do we need to dump the buffers in two places, one for `buffer` and 
other for `sink`, are we unnecessarily copying data because of that?
   
   After reading a bit more carefully, one notices that there are two kinds of 
buffers:
   - `crate::Buffer`: an autogenerated struct from the Arrow IPC spec
   - `arrow_buffer::Buffer`: The classical in-memory Arrow buffer
   
   Maybe there's a way of making this distinction a bit clearer with some more 
descriptive names for `BufferSink`? like naming it `ArrowDataSink` maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to