alamb commented on code in PR #9836:
URL: https://github.com/apache/arrow-rs/pull/9836#discussion_r3204026164


##########
arrow-ipc/src/writer.rs:
##########
@@ -194,6 +194,26 @@ impl Default for IpcWriteOptions {
 /// [Arrow IPC Format]: 
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
 pub struct IpcDataGenerator {}
 
+struct IpcWriterScratch {

Review Comment:
   I think some comments here explaining the rationale for this structure would 
be useful (avoid allocations during IPC read)



##########
arrow-ipc/src/writer.rs:
##########
@@ -194,6 +194,26 @@ impl Default for IpcWriteOptions {
 /// [Arrow IPC Format]: 
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
 pub struct IpcDataGenerator {}
 
+struct IpcWriterScratch {
+    fbb: FlatBufferBuilder<'static>,
+    nodes: Vec<crate::FieldNode>,

Review Comment:
   nit: can you please use `use crate::FieldNode` in the above and then use 
`FieldNode` directly (same for Buffer below) to avoid fully qulaified names?



##########
arrow-ipc/src/writer.rs:
##########
@@ -1550,6 +1596,163 @@ impl<W: Write> RecordBatchWriter for StreamWriter<W> {
     }
 }
 
+fn has_dictionary_batch(batch: &RecordBatch) -> bool {
+    batch
+        .schema()
+        .fields()
+        .iter()
+        .any(|field| has_dictionary_type(field.data_type()))
+}
+
+fn has_dictionary_type(data_type: &DataType) -> bool {
+    match data_type {
+        DataType::Dictionary(_, _) => true,
+        DataType::Struct(fields) => fields.iter().any(|f| 
has_dictionary_type(f.data_type())),
+        DataType::List(field)
+        | DataType::LargeList(field)
+        | DataType::ListView(field)
+        | DataType::LargeListView(field)
+        | DataType::FixedSizeList(field, _) => 
has_dictionary_type(field.data_type()),
+        DataType::Map(field, _) => has_dictionary_type(field.data_type()),
+        DataType::RunEndEncoded(_, field) => 
has_dictionary_type(field.data_type()),
+        DataType::Union(fields, _) => fields
+            .iter()
+            .any(|(_, f)| has_dictionary_type(f.data_type())),
+        _ => false,
+    }
+}
+
+fn write_record_batch_fast<W: Write>(
+    writer: &mut W,
+    batch: &RecordBatch,
+    write_options: &IpcWriteOptions,
+    compression_context: &mut CompressionContext,
+    scratch: &mut IpcWriterScratch,
+) -> Result<(usize, usize), ArrowError> {
+    scratch.fbb.reset();
+    scratch.nodes.clear();
+    scratch.buffers.clear();
+    scratch.arrow_data.clear();
+    scratch.variadic_buffer_counts.clear();
+
+    let batch_compression_type = write_options.batch_compression_type;

Review Comment:
   this seems like a copy/paste/modify slight variation of 
https://github.com/apache/arrow-rs/blob/8ce051e126cb09e4a688cb91d54bb3553780decc/arrow-ipc/src/writer.rs#L1764-L1763
   
   



##########
arrow-ipc/src/writer.rs:
##########
@@ -1550,6 +1596,163 @@ impl<W: Write> RecordBatchWriter for StreamWriter<W> {
     }
 }
 
+fn has_dictionary_batch(batch: &RecordBatch) -> bool {
+    batch
+        .schema()
+        .fields()
+        .iter()
+        .any(|field| has_dictionary_type(field.data_type()))
+}
+
+fn has_dictionary_type(data_type: &DataType) -> bool {
+    match data_type {
+        DataType::Dictionary(_, _) => true,
+        DataType::Struct(fields) => fields.iter().any(|f| 
has_dictionary_type(f.data_type())),
+        DataType::List(field)
+        | DataType::LargeList(field)
+        | DataType::ListView(field)
+        | DataType::LargeListView(field)
+        | DataType::FixedSizeList(field, _) => 
has_dictionary_type(field.data_type()),
+        DataType::Map(field, _) => has_dictionary_type(field.data_type()),
+        DataType::RunEndEncoded(_, field) => 
has_dictionary_type(field.data_type()),
+        DataType::Union(fields, _) => fields
+            .iter()
+            .any(|(_, f)| has_dictionary_type(f.data_type())),
+        _ => false,
+    }
+}
+
+fn write_record_batch_fast<W: Write>(

Review Comment:
   Could you please add some documentation to this function explaining how it 
is different than `write_record_batch` (I think the big change is that it 
doesn't handle dictionaries)?



##########
arrow-ipc/src/writer.rs:
##########
@@ -1165,23 +1188,33 @@ impl<W: Write> FileWriter<W> {
             ));
         }
 
-        let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
-            batch,
-            &mut self.dictionary_tracker,
-            &self.write_options,
-            &mut self.compression_context,
-        )?;
+        let (meta, data) = if has_dictionary_batch(batch) {
+            let (encoded_dictionaries, encoded_message) = self.data_gen.encode(

Review Comment:
   If the goal of this PR is to avoid allocations in the IPC writer, I think we 
should take a more holistic approach
   
   Rather than trying to fast path only the non dictionary path, what do you 
think about threading the writer somehow into `self.data_gen`?
   
   It seems like the current code buffers the entire record batch's worth of 
messages into memory, and then writes them all out
   
   It would be more memory efficient I think if the data was written to a 
(resuable) buffer and then immediately to the writer (or maybe even avoid the 
buffering and write to the wrtier directly)
   
   You would likely have to do something about dictionary encoding -- but I 
think it is allowed to write dictionaris a column at a time (you don't have to 
write them all up front..)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to