alamb commented on code in PR #10044:
URL: https://github.com/apache/arrow-rs/pull/10044#discussion_r3381759987
##########
arrow-ipc/src/writer.rs:
##########
@@ -2101,50 +2226,104 @@ fn write_array_data(
Ok(offset)
}
-/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
-///
-///
-/// From
<https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
-/// Each constituent buffer is first compressed with the indicated
-/// compressor, and then written with the uncompressed length in the first 8
-/// bytes as a 64-bit little-endian signed integer followed by the compressed
-/// buffer bytes (and then padding as required by the protocol). The
-/// uncompressed length may be set to -1 to indicate that the data that
-/// follows is not compressed, which can be useful for cases where
-/// compression does not yield appreciable savings.
-fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+fn encode_sink_buffer(
Review Comment:
similarly to @gabotechs above I found the relationship between `buffer`,
`buffers` and `sink` confusing.
Could you perhaps (can be a follow on PR) to add documentation here
explaining what this is doing -- specifically that the `IpcBodySink` is used
for the actual arrow data and the `buffers` is an in-progresss list what will
eventually become the IPC metadata
##########
arrow-ipc/src/writer.rs:
##########
@@ -2101,50 +2226,104 @@ fn write_array_data(
Ok(offset)
}
-/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
-///
-///
-/// From
<https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
-/// Each constituent buffer is first compressed with the indicated
-/// compressor, and then written with the uncompressed length in the first 8
-/// bytes as a 64-bit little-endian signed integer followed by the compressed
-/// buffer bytes (and then padding as required by the protocol). The
-/// uncompressed length may be set to -1 to indicate that the data that
-/// follows is not compressed, which can be useful for cases where
-/// compression does not yield appreciable savings.
-fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+fn encode_sink_buffer(
+ buffer: Buffer,
+ buffers: &mut Vec<crate::Buffer>,
+ sink: &mut IpcBodySink<'_>,
+ offset: i64,
compression_codec: Option<CompressionCodec>,
compression_context: &mut CompressionContext,
alignment: u8,
) -> Result<i64, ArrowError> {
- let len: i64 = match compression_codec {
- Some(compressor) => compressor.compress_to_vec(buffer, arrow_data,
compression_context)?,
+ let (encoded, len) = match compression_codec {
None => {
- arrow_data.extend_from_slice(buffer);
- buffer.len()
+ let len = buffer.len() as i64;
+ (EncodedBuffer::Raw(buffer), len)
}
- }
- .try_into()
- .map_err(|e| {
- ArrowError::InvalidArgumentError(format!("Could not convert compressed
size to i64: {e}"))
- })?;
+ Some(codec) => {
+ let mut scratch = Vec::new();
Review Comment:
I think you could avoid this new allocation (and then copy below) if you
write directly into arrow_data.
I started playing around with this locally. et me kmow what you think
```diff
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 0f69fec5ba4..d94d528aeaa 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -105,6 +105,17 @@ enum IpcBodySink<'a> {
Collect(&'a mut Vec<EncodedBuffer>),
}
+impl <'a> IpcBodySink<'a> {
+ /// Adds the content of the buffer to this sink
+ fn write_buffer(&mut self, buffer: Buffer) {
+ match self {
+ IpcBodySink::Write(vec) =>
vec.extend_from_slice(buffer.as_slice()),
+ IpcBodySink::Collect(buffers) =>
buffers.push(EncodedBuffer::Raw(buffer)),
+ }
+
+ }
+}
+
/// Per-message sizes produced by [`IpcDataGenerator::write`].
///
/// [`FileWriter`] uses these to build the Block index entries required by
the IPC footer for
@@ -2007,7 +2018,7 @@ fn write_array_data(
)?;
for buffer in array_data.buffers().iter().skip(1) {
- offset = encode_sink_buffer(
+ offset = _buffer(
buffer.clone(),
buffers,
sink,
@@ -2235,18 +2246,19 @@ fn encode_sink_buffer(
compression_context: &mut CompressionContext,
alignment: u8,
) -> Result<i64, ArrowError> {
- let (encoded, len) = match compression_codec {
+ let len = match compression_codec {
None => {
let len = buffer.len() as i64;
- (EncodedBuffer::Raw(buffer), len)
+ sink.write_buffer(buffer);
+ len
}
Some(codec) => {
let mut scratch = Vec::new();
let written =
codec.compress_to_vec(buffer.as_slice(), &mut scratch,
compression_context)?;
- let len = i64::try_from(written)
- .map_err(|e|
ArrowError::InvalidArgumentError(format!("{e}")))?;
- (EncodedBuffer::Compressed(scratch), len)
+ sink.write_buffer(Buffer::from(scratch));
+ i64::try_from(written)
+ .map_err(|e|
ArrowError::InvalidArgumentError(format!("{e}")))?
}
};
```
##########
arrow-ipc/src/writer.rs:
##########
@@ -2101,50 +2226,104 @@ fn write_array_data(
Ok(offset)
}
-/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
-///
-///
-/// From
<https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
-/// Each constituent buffer is first compressed with the indicated
-/// compressor, and then written with the uncompressed length in the first 8
-/// bytes as a 64-bit little-endian signed integer followed by the compressed
-/// buffer bytes (and then padding as required by the protocol). The
-/// uncompressed length may be set to -1 to indicate that the data that
-/// follows is not compressed, which can be useful for cases where
-/// compression does not yield appreciable savings.
-fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+fn encode_sink_buffer(
Review Comment:
We could probably also make the code less verbose by making `buffers`,
`sink`, and `field`, fields on the IPC writer 🤔
##########
arrow-ipc/src/writer.rs:
##########
@@ -2101,50 +2226,104 @@ fn write_array_data(
Ok(offset)
}
-/// Write a buffer into `arrow_data`, a vector of bytes, and adds its
-/// [`crate::Buffer`] to `buffers`. Returns the new offset in `arrow_data`
-///
-///
-/// From
<https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58>
-/// Each constituent buffer is first compressed with the indicated
-/// compressor, and then written with the uncompressed length in the first 8
-/// bytes as a 64-bit little-endian signed integer followed by the compressed
-/// buffer bytes (and then padding as required by the protocol). The
-/// uncompressed length may be set to -1 to indicate that the data that
-/// follows is not compressed, which can be useful for cases where
-/// compression does not yield appreciable savings.
-fn write_buffer(
- buffer: &[u8], // input
- buffers: &mut Vec<crate::Buffer>, // output buffer descriptors
- arrow_data: &mut Vec<u8>, // output stream
- offset: i64, // current output stream offset
+fn encode_sink_buffer(
Review Comment:
As a follow on, we might even be able to make this clearer by encapsulating
`buffers` in some sort of other struct
```rust
struct IpcMetadataBuilder {
buffers: Vec<crate::Buffer>,
nodes: Vec<crate::FieldNode>,
}
```
And then pass that through 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]