metesynnada commented on issue #3740:
URL: https://github.com/apache/arrow-rs/issues/3740#issuecomment-1441734069
I made a benchmark for record batch writing by 3 cases. Ordinary writer,
async writer, and buffered async writer (current discussion)
For low batch size (10) - batch count (1000) with usual schema.
```
Running benches/buffer_bench.rs
(target/release/deps/buffer_bench-f7b991c50b44d14d)
async time: [18.417 ms 18.942 ms 19.526 ms]
change: [-75.407% -74.625% -73.930%] (p = 0.00 <
0.05)
Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high severe
async_buffered time: [17.519 ms 17.718 ms 18.016 ms]
change: [-75.844% -75.540% -75.034%] (p = 0.00 <
0.05)
Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
8 (8.00%) high mild
7 (7.00%) high severe
sync time: [11.640 ms 12.062 ms 12.532 ms]
change: [-85.176% -84.617% -83.999%] (p = 0.00 <
0.05)
Performance has improved.
Found 22 outliers among 100 measurements (22.00%)
5 (5.00%) high mild
17 (17.00%) high severe
```
For larger batch sizes (1000) - batch count (100) with usual schema.
```
Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 7.7s, or reduce sample count to 60.
async time: [74.173 ms 74.648 ms 75.173 ms]
change: [+490.63% +496.74% +502.77%] (p = 0.00 <
0.05)
Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
2 (2.00%) high mild
1 (1.00%) high severe
Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 7.3s, or reduce sample count to 60.
async_buffered time: [72.195 ms 72.435 ms 72.695 ms]
change: [+623.27% +627.13% +630.93%] (p = 0.00 <
0.05)
Performance has regressed.
Found 13 outliers among 100 measurements (13.00%)
1 (1.00%) low mild
8 (8.00%) high mild
4 (4.00%) high severe
Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 7.9s, or reduce sample count to 60.
sync time: [78.149 ms 78.415 ms 78.772 ms]
change: [+730.79% +735.48% +740.48%] (p = 0.00 <
0.05)
Performance has regressed.
Found 11 outliers among 100 measurements (11.00%)
4 (4.00%) low mild
7 (7.00%) high severe
```
I think with the usual batch size (4096) - batch count (100) with usual
schema.
```
Running benches/buffer_bench.rs
(target/release/deps/buffer_bench-f7b991c50b44d14d)
Benchmarking async: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 30.4s, or reduce sample count to 10.
async time: [300.46 ms 301.73 ms 303.03 ms]
change: [+1443.9% +1493.0% +1538.6%] (p = 0.00 <
0.05)
Performance has regressed.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
Benchmarking async_buffered: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 28.1s, or reduce sample count to 10.
async_buffered time: [279.85 ms 281.20 ms 282.96 ms]
change: [+1459.3% +1487.1% +1508.2%] (p = 0.00 <
0.05)
Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
6 (6.00%) high severe
Benchmarking sync: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 31.1s, or reduce sample count to 10.
sync time: [310.66 ms 311.43 ms 312.36 ms]
change: [+2385.1% +2481.8% +2576.3%] (p = 0.00 <
0.05)
Performance has regressed.
Found 7 outliers among 100 measurements (7.00%)
3 (3.00%) high mild
4 (4.00%) high severe
```
I think the buffered version is also scalable immediately into JSON and AVRO:
```
pub struct BufferWriter<W: AsyncWrite + Unpin + Send> {
writer: BufWriter<W>,
buffer: Vec<u8>
}
impl<W: AsyncWrite + Unpin + Send> BufferWriter<W> {
pub fn new(writer: W) -> Self {
BufferWriter {
writer: BufWriter::new(writer),
buffer: Vec::with_capacity(4096)
}
}
pub async fn write(&mut self, batch: &RecordBatch, header: bool) ->
Result<(), ArrowError> {
{
let mut inner_writer = Writer::new_with_header(&mut self.buffer,
header);
inner_writer.write(batch)?;
}
self.writer.write_all(&self.buffer).await?;
self.writer.flush().await?;
self.buffer.clear();
Ok(())
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]