gabotechs commented on code in PR #10044:
URL: https://github.com/apache/arrow-rs/pull/10044#discussion_r3341395598
##########
arrow-ipc/src/writer.rs:
##########
@@ -2111,33 +2294,33 @@ fn write_array_data(
/// follows is not compressed, which can be useful for cases where
/// compression does not yield appreciable savings.
fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+ buffer: Buffer, // input array buffer to encode
+ buffers: &mut Vec<crate::Buffer>, // IPC buffer metadata (offset + length)
for the FlatBuffer message
+ encoded_buffers: &mut Vec<EncodedBuffer>, // accumulated encoded segments,
written to output after the message header
+ offset: i64, // current output stream offset
compression_codec: Option<CompressionCodec>,
Review Comment:
I see this function is no longer writing the buffers, instead of writing
them, it's collecting them into `encoded_buffers`. Given this new scenario, I
think we need to do two things:
1. change the name of this function to something like
`collect_encoded_buffers`
2. Update the doc comment, which still mentions writing to `arrow_data`
##########
arrow-ipc/src/writer.rs:
##########
@@ -1165,24 +1364,24 @@ impl<W: Write> FileWriter<W> {
));
}
- let (encoded_dictionaries, encoded_message) = self.data_gen.encode(
+ let (dict_sizes, (meta, data)) = self.data_gen.write_batch_direct(
Review Comment:
The two last `(meta, data)` fields returned by `write_batch_direct` are
named (aligned_size, body_len) in that function.
Is this correct? not sure if this is just a naming thing, but it's hard to
know if this is correct given the different naming. Is `meta == aligned_size`
and `data == body_len`? they sound like completely different things.
##########
arrow-ipc/src/writer.rs:
##########
@@ -69,7 +69,39 @@ pub struct IpcWriteOptions {
/// How to handle updating dictionaries in IPC messages
dictionary_handling: DictionaryHandling,
}
+/// Return type for [`IpcDataGenerator::write_batch_direct`]: `(dict_sizes,
batch_sizes)` where
+/// each element is `(ipc_metadata_bytes, body_bytes)`.
+///
+/// [`FileWriter`] uses these sizes to build the Block index entries required
by the IPC
+/// footer for random-access reads.
+type IPCMetadata = Result<(Vec<(usize, usize)>, (usize, usize)), ArrowError>;
Review Comment:
It's pretty hard to know what's all of this without a proper structure with
proper fields. How about having a proper struct for this?
For example:
```rust
struct IPCMetadata {
dict_sizes: Vec<(usize, usize)>,
aligned_size: usize,
body_len: usize,
}
```
Maybe there are better names for the different fields
##########
arrow-ipc/src/writer.rs:
##########
@@ -693,19 +734,177 @@ impl IpcDataGenerator {
let mut message_builder = crate::MessageBuilder::new(&mut fbb);
message_builder.add_version(write_options.metadata_version);
message_builder.add_header_type(crate::MessageHeader::DictionaryBatch);
- message_builder.add_bodyLength(arrow_data.len() as i64);
+ message_builder.add_bodyLength(body_len as i64);
message_builder.add_header(root);
message_builder.finish()
};
fbb.finish(root, None);
let finished_data = fbb.finished_data();
+ let mut arrow_data: Vec<u8> = Vec::with_capacity(body_len);
+ for encoded in &encoded_buffers {
+ arrow_data.extend_from_slice(encoded.as_slice());
+ arrow_data.extend_from_slice(
+ &PADDING[..pad_to_alignment(write_options.alignment,
encoded.len())],
+ );
+ }
+ arrow_data.extend_from_slice(&PADDING[..tail_pad]);
+
Ok(EncodedData {
ipc_message: finished_data.to_vec(),
arrow_data,
})
}
+
+ /// Write dictionaries and record batch's directly to `writer`, skipping
the
+ /// intermediate `arrow_data: Vec<u8>` accumulator used by
[`Self::record_batch_to_bytes`].
+ ///
+ /// For the uncompressed path each array buffer is held as an Arc-backed
slice and
+ /// written straight to `writer`. one copy instead of two. For the
compressed path
+ /// each buffer is compressed into a per-buffer scratch `Vec<u8>` and
written from
+ /// there, eliminating the extra copy that `write_buffer` -> `arrow_data`
->
+ /// `write_body_buffers` would otherwise incur.
+ fn write_batch_direct<W: Write>(
Review Comment:
I see most contents of this function are essentially copy-pastes from
`record_batch_to_bytes`, duplication seems too much here. Is there any chance
to:
- Completely replace `record_batch_to_bytes` and keep just a single function
for writing batches in IPC format
- Factoring out some ergonomic helpers that could be reused in both
functions?
Also, it seems like the `.encode()` method and the new
`.write_batch_direct()` are both doing the same thing with slightly different
ergonomics. Do you see any opportunity to collapse them into just 1 method?
This file is overall pretty bloated with complex logic and a relatively
arbitrary separation of concerns between methods, the more we can do for
debloating it the better it will be for future maintainers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]