This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ca2f2b0 feat: use exactly equal parts in multipart upload (#4305)
04ca2f2b0 is described below

commit 04ca2f2b0ad964ce8962b8b362da0df932c88091
Author: Will Jones <[email protected]>
AuthorDate: Tue May 30 13:52:45 2023 -0700

    feat: use exactly equal parts in multipart upload (#4305)
    
    * refactor: use exactly equal parts in multipart upload
    
    * Improve test
    
    * Apply suggestions from code review
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * Fix lifetime
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
---
 object_store/src/lib.rs       | 18 ++++++++++++---
 object_store/src/multipart.rs | 52 +++++++++++++++++++++++++++++--------------
 2 files changed, 50 insertions(+), 20 deletions(-)

diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index c5bf40cc4..98bbb7adc 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -898,6 +898,8 @@ mod test_util {
 mod tests {
     use super::*;
     use crate::test_util::flatten_list_stream;
+    use bytes::{BufMut, BytesMut};
+    use itertools::Itertools;
     use tokio::io::AsyncWriteExt;
 
     pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
@@ -1308,8 +1310,18 @@ mod tests {
         }
     }
 
+    fn get_random_bytes(len: usize) -> Bytes {
+        use rand::Rng;
+        let mut rng = rand::thread_rng();
+        let mut bytes = BytesMut::with_capacity(len);
+        for _ in 0..len {
+            bytes.put_u8(rng.gen());
+        }
+        bytes.freeze()
+    }
+
     fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
-        
std::iter::repeat(Bytes::from_iter(std::iter::repeat(b'x').take(chunk_length)))
+        std::iter::repeat(get_random_bytes(chunk_length))
             .take(num_chunks)
             .collect()
     }
@@ -1344,8 +1356,8 @@ mod tests {
         assert_eq!(bytes_expected, bytes_written);
 
         // Can overwrite some storage
-        // Sizes carefully chosen to exactly hit min limit of 5 MiB
-        let data = get_vec_of_bytes(242_880, 22);
+        // Sizes chosen to ensure we write three parts
+        let data = (0..7).map(|_| get_random_bytes(3_200_000)).collect_vec();
         let bytes_expected = data.concat();
         let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
         for chunk in &data {
diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs
index 0606fb51e..265803070 100644
--- a/object_store/src/multipart.rs
+++ b/object_store/src/multipart.rs
@@ -60,8 +60,11 @@ where
     max_concurrency: usize,
     /// Buffer that will be sent in next upload.
     current_buffer: Vec<u8>,
-    /// Minimum size of a part in bytes
-    min_part_size: usize,
+    /// Size of each part.
+    ///
+    /// While S3 and Minio support variable part sizes, R2 requires they all be
+    /// exactly the same size.
+    part_size: usize,
     /// Index of current part
     current_part_idx: usize,
     /// The completion task
@@ -85,12 +88,21 @@ where
             // Minimum size of 5 MiB
             // 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
             // https://cloud.google.com/storage/quotas#requests
-            min_part_size: 5_242_880,
+            part_size: 10 * 1024 * 1024,
             current_part_idx: 0,
             completion_task: None,
         }
     }
 
+    // Add data to the current buffer, returning the number of bytes added
+    fn add_to_buffer(mut self: Pin<&mut Self>, buf: &[u8], offset: usize) -> 
usize {
+        let remaining_capacity = self.part_size - self.current_buffer.len();
+        let to_copy = std::cmp::min(remaining_capacity, buf.len() - offset);
+        self.current_buffer
+            .extend_from_slice(&buf[offset..offset + to_copy]);
+        to_copy
+    }
+
     pub fn poll_tasks(
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
@@ -158,15 +170,21 @@ where
         // Poll current tasks
         self.as_mut().poll_tasks(cx)?;
 
-        // If adding buf to pending buffer would trigger send, check
-        // whether we have capacity for another task.
-        let enough_to_send =
-            (buf.len() + self.current_buffer.len()) >= self.min_part_size;
-        if enough_to_send && self.tasks.len() < self.max_concurrency {
-            // If we do, copy into the buffer and submit the task, and return 
ready.
-            self.current_buffer.extend_from_slice(buf);
+        let mut offset = 0;
+
+        loop {
+            // Fill up current buffer
+            offset += self.as_mut().add_to_buffer(buf, offset);
 
-            let out_buffer = std::mem::take(&mut self.current_buffer);
+            // If we don't have a full buffer or we have too many tasks, break
+            if self.current_buffer.len() < self.part_size
+                || self.tasks.len() >= self.max_concurrency
+            {
+                break;
+            }
+
+            let new_buffer = Vec::with_capacity(self.part_size);
+            let out_buffer = std::mem::replace(&mut self.current_buffer, 
new_buffer);
             let inner = Arc::clone(&self.inner);
             let part_idx = self.current_part_idx;
             self.tasks.push(Box::pin(async move {
@@ -177,14 +195,14 @@ where
 
             // We need to poll immediately after adding to setup waker
             self.as_mut().poll_tasks(cx)?;
+        }
 
-            Poll::Ready(Ok(buf.len()))
-        } else if !enough_to_send {
-            self.current_buffer.extend_from_slice(buf);
-            Poll::Ready(Ok(buf.len()))
-        } else {
-            // Waker registered by call to poll_tasks at beginning
+        // If offset is zero, then we didn't write anything because we didn't
+        // have capacity for more tasks and our buffer is full.
+        if offset == 0 && !buf.is_empty() {
             Poll::Pending
+        } else {
+            Poll::Ready(Ok(offset))
         }
     }
 

Reply via email to