This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 04ca2f2b0 feat: use exactly equal parts in multipart upload (#4305)
04ca2f2b0 is described below
commit 04ca2f2b0ad964ce8962b8b362da0df932c88091
Author: Will Jones <[email protected]>
AuthorDate: Tue May 30 13:52:45 2023 -0700
feat: use exactly equal parts in multipart upload (#4305)
* refactor: use exactly equal parts in multipart upload
* Improve test
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* Fix lifetime
---------
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
object_store/src/lib.rs | 18 ++++++++++++---
object_store/src/multipart.rs | 52 +++++++++++++++++++++++++++++--------------
2 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index c5bf40cc4..98bbb7adc 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -898,6 +898,8 @@ mod test_util {
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
+ use bytes::{BufMut, BytesMut};
+ use itertools::Itertools;
use tokio::io::AsyncWriteExt;
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
@@ -1308,8 +1310,18 @@ mod tests {
}
}
+ fn get_random_bytes(len: usize) -> Bytes {
+ use rand::Rng;
+ let mut rng = rand::thread_rng();
+ let mut bytes = BytesMut::with_capacity(len);
+ for _ in 0..len {
+ bytes.put_u8(rng.gen());
+ }
+ bytes.freeze()
+ }
+
fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
-
std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length)))
+ std::iter::repeat(get_random_bytes(chunk_length))
.take(num_chunks)
.collect()
}
@@ -1344,8 +1356,8 @@ mod tests {
assert_eq!(bytes_expected, bytes_written);
// Can overwrite some storage
- // Sizes carefully chosen to exactly hit min limit of 5 MiB
- let data = get_vec_of_bytes(242_880, 22);
+ // Sizes chosen to ensure we write three parts
+ let data = (0..7).map(|_| get_random_bytes(3_200_000)).collect_vec();
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 0606fb51e..265803070 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -60,8 +60,11 @@ where
max_concurrency: usize,
/// Buffer that will be sent in next upload.
current_buffer: Vec<u8>,
- /// Minimum size of a part in bytes
- min_part_size: usize,
+ /// Size of each part.
+ ///
+ /// While S3 and Minio support variable part sizes, R2 requires they all be
+ /// exactly the same size.
+ part_size: usize,
/// Index of current part
current_part_idx: usize,
/// The completion task
@@ -85,12 +88,21 @@ where
// Minimum size of 5 MiB
//
https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// https://cloud.google.com/storage/quotas#requests
- min_part_size: 5_242_880,
+ part_size: 10 * 1024 * 1024,
current_part_idx: 0,
completion_task: None,
}
}
+ // Add data to the current buffer, returning the number of bytes added
+ fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) ->
usize {
+ let remaining_capacity = self.part_size - self.current_buffer.len();
+ let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset);
+ self.current_buffer
+ .extend_from_slice(&buf[offset..offset + to_copy]);
+ to_copy
+ }
+
pub fn poll_tasks(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
@@ -158,15 +170,21 @@ where
// Poll current tasks
self.as_mut().poll_tasks(cx)?;
- // If adding buf to pending buffer would trigger send, check
- // whether we have capacity for another task.
- let enough_to_send =
- (buf.len() + self.current_buffer.len()) >= self.min_part_size;
- if enough_to_send && self.tasks.len() < self.max_concurrency {
- // If we do, copy into the buffer and submit the task, and return
ready.
- self.current_buffer.extend_from_slice(buf);
+ let mut offset = 0;
+
+ loop {
+ // Fill up current buffer
+ offset += self.as_mut().add_to_buffer(buf, offset);
- let out_buffer = std::mem::take(&mut self.current_buffer);
+ // If we don't have a full buffer or we have too many tasks, break
+ if self.current_buffer.len() < self.part_size
+ || self.tasks.len() >= self.max_concurrency
+ {
+ break;
+ }
+
+ let new_buffer = Vec::with_capacity(self.part_size);
+ let out_buffer = std::mem::replace(&mut self.current_buffer,
new_buffer);
let inner = Arc::clone(&self.inner);
let part_idx = self.current_part_idx;
self.tasks.push(Box::pin(async move {
@@ -177,14 +195,14 @@ where
// We need to poll immediately after adding to setup waker
self.as_mut().poll_tasks(cx)?;
+ }
- Poll::Ready(Ok(buf.len()))
- } else if !enough_to_send {
- self.current_buffer.extend_from_slice(buf);
- Poll::Ready(Ok(buf.len()))
- } else {
- // Waker registered by call to poll_tasks at beginning
+ // If offset is zero, then we didn't write anything because we didn't
+ // have capacity for more tasks and our buffer is full.
+ if offset == 0 && !buf.is_empty() {
Poll::Pending
+ } else {
+ Poll::Ready(Ok(offset))
}
}