alamb commented on code in PR #9836:
URL: https://github.com/apache/arrow-rs/pull/9836#discussion_r3204026164
##########
arrow-ipc/src/writer.rs:
##########
@@ -194,6 +194,26 @@ impl Default for IpcWriteOptions {
/// [Arrow IPC Format]:
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
pub struct IpcDataGenerator {}
+struct IpcWriterScratch {
Review Comment:
I think some comments here explaining the rationale for this structure would
be useful (avoid allocations during IPC read)
##########
arrow-ipc/src/writer.rs:
##########
@@ -194,6 +194,26 @@ impl Default for IpcWriteOptions {
/// [Arrow IPC Format]:
https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc
pub struct IpcDataGenerator {}
+struct IpcWriterScratch {
+ fbb: FlatBufferBuilder<'static>,
+ nodes: Vec<crate::FieldNode>,
Review Comment:
nit: can you please use `use crate::FieldNode` in the above and then use
`FieldNode` directly (same for Buffer below) to avoid fully qulaified names?
##########
arrow-ipc/src/writer.rs:
##########
@@ -1550,6 +1596,163 @@ impl<W: Write> RecordBatchWriter for StreamWriter<W> {
}
}
+fn has_dictionary_batch(batch: &RecordBatch) -> bool {
+ batch
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| has_dictionary_type(field.data_type()))
+}
+
+fn has_dictionary_type(data_type: &DataType) -> bool {
+ match data_type {
+ DataType::Dictionary(_, _) => true,
+ DataType::Struct(fields) => fields.iter().any(|f|
has_dictionary_type(f.data_type())),
+ DataType::List(field)
+ | DataType::LargeList(field)
+ | DataType::ListView(field)
+ | DataType::LargeListView(field)
+ | DataType::FixedSizeList(field, _) =>
has_dictionary_type(field.data_type()),
+ DataType::Map(field, _) => has_dictionary_type(field.data_type()),
+ DataType::RunEndEncoded(_, field) =>
has_dictionary_type(field.data_type()),
+ DataType::Union(fields, _) => fields
+ .iter()
+ .any(|(_, f)| has_dictionary_type(f.data_type())),
+ _ => false,
+ }
+}
+
+fn write_record_batch_fast<W: Write>(
+ writer: &mut W,
+ batch: &RecordBatch,
+ write_options: &IpcWriteOptions,
+ compression_context: &mut CompressionContext,
+ scratch: &mut IpcWriterScratch,
+) -> Result<(usize, usize), ArrowError> {
+ scratch.fbb.reset();
+ scratch.nodes.clear();
+ scratch.buffers.clear();
+ scratch.arrow_data.clear();
+ scratch.variadic_buffer_counts.clear();
+
+ let batch_compression_type = write_options.batch_compression_type;
Review Comment:
this seems like a copy/paste/modify slight variation of
https://github.com/apache/arrow-rs/blob/8ce051e126cb09e4a688cb91d54bb3553780decc/arrow-ipc/src/writer.rs#L1764-L1763
##########
arrow-ipc/src/writer.rs:
##########
@@ -1550,6 +1596,163 @@ impl<W: Write> RecordBatchWriter for StreamWriter<W> {
}
}
+fn has_dictionary_batch(batch: &RecordBatch) -> bool {
+ batch
+ .schema()
+ .fields()
+ .iter()
+ .any(|field| has_dictionary_type(field.data_type()))
+}
+
+fn has_dictionary_type(data_type: &DataType) -> bool {
+ match data_type {
+ DataType::Dictionary(_, _) => true,
+ DataType::Struct(fields) => fields.iter().any(|f|
has_dictionary_type(f.data_type())),
+ DataType::List(field)
+ | DataType::LargeList(field)
+ | DataType::ListView(field)
+ | DataType::LargeListView(field)
+ | DataType::FixedSizeList(field, _) =>
has_dictionary_type(field.data_type()),
+ DataType::Map(field, _) => has_dictionary_type(field.data_type()),
+ DataType::RunEndEncoded(_, field) =>
has_dictionary_type(field.data_type()),
+ DataType::Union(fields, _) => fields
+ .iter()
+ .any(|(_, f)| has_dictionary_type(f.data_type())),
+ _ => false,
+ }
+}
+
+fn write_record_batch_fast<W: Write>(
Review Comment:
Could you please add some documentation to this function explaining how it
is different than `write_record_batch` (I think the big change is that it
doesn't handle dictionaries)?
##########
arrow-ipc/src/writer.rs:
##########
@@ -1165,23 +1188,33 @@ impl<W: Write> FileWriter<W> {
));
}
- let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
- batch,
- &mut self.dictionary_tracker,
- &self.write_options,
- &mut self.compression_context,
- )?;
+ let (meta, data) = if has_dictionary_batch(batch) {
+ let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
Review Comment:
If the goal of this PR is to avoid allocations in the IPC writer, I think we
should take a more holistic approach
Rather than trying to fast path only the non dictionary path, what do you
think about threading the writer somehow into `self.data_gen`?
It seems like the current code buffers the entire record batch's worth of
messages into memory, and then writes them all out
It would be more memory efficient I think if the data was written to a
(resuable) buffer and then immediately to the writer (or maybe even avoid the
buffering and write to the wrtier directly)
You would likely have to do something about dictionary encoding -- but I
think it is allowed to write dictionaris a column at a time (you don't have to
write them all up front..)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]