This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 7efe6c20530 Add additional WriteMultipart tests (#5743) (#5746)
7efe6c20530 is described below

commit 7efe6c20530c1d73b90a2a0125e144f93ef3010f
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat May 11 09:34:56 2024 +0100

    Add additional WriteMultipart tests (#5743) (#5746)
    
    * Add additional WriteMultipart tests (#5743)
    
    * Clippy
---
 object_store/src/integration.rs | 12 +++++++
 object_store/src/upload.rs      | 69 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 81 insertions(+)

diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs
index d08c4509f36..31b074f4987 100644
--- a/object_store/src/integration.rs
+++ b/object_store/src/integration.rs
@@ -799,6 +799,18 @@ pub async fn stream_get(storage: &DynObjectStore) {
     let meta = storage.head(&location).await.unwrap();
     assert_eq!(meta.size, 6);
 
+    let location = Path::from("test_dir/test_put_part_mixed.txt");
+    let upload = storage.put_multipart(&location).await.unwrap();
+    let mut write = WriteMultipart::new(upload);
+    write.put(vec![0; 2].into());
+    write.write(&[1, 2, 3]);
+    write.put(vec![4, 5, 6, 7].into());
+    write.finish().await.unwrap();
+
+    let r = storage.get(&location).await.unwrap();
+    let r = r.bytes().await.unwrap();
+    assert_eq!(r.as_ref(), &[0, 0, 1, 2, 3, 4, 5, 6, 7]);
+
     // We can abort an empty write
     let location = Path::from("test_dir/test_abort_upload.txt");
     let mut upload = storage.put_multipart(&location).await.unwrap();
diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs
index 9805df0dda7..e5f683a034a 100644
--- a/object_store/src/upload.rs
+++ b/object_store/src/upload.rs
@@ -217,9 +217,13 @@ impl WriteMultipart {
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
     use std::time::Duration;
 
     use futures::FutureExt;
+    use parking_lot::Mutex;
+    use rand::prelude::StdRng;
+    use rand::{Rng, SeedableRng};
 
     use crate::memory::InMemory;
     use crate::path::Path;
@@ -246,4 +250,69 @@ mod tests {
         assert!(write.wait_for_capacity(10).now_or_never().is_none());
         write.wait_for_capacity(10).await.unwrap()
     }
+
+    #[derive(Debug, Default)]
+    struct InstrumentedUpload {
+        chunks: Arc<Mutex<Vec<PutPayload>>>,
+    }
+
+    #[async_trait]
+    impl MultipartUpload for InstrumentedUpload {
+        fn put_part(&mut self, data: PutPayload) -> UploadPart {
+            self.chunks.lock().push(data);
+            futures::future::ready(Ok(())).boxed()
+        }
+
+        async fn complete(&mut self) -> Result<PutResult> {
+            Ok(PutResult {
+                e_tag: None,
+                version: None,
+            })
+        }
+
+        async fn abort(&mut self) -> Result<()> {
+            unimplemented!()
+        }
+    }
+
+    #[tokio::test]
+    async fn test_write_multipart() {
+        let mut rng = StdRng::seed_from_u64(42);
+
+        for method in [0.0, 0.5, 1.0] {
+            for _ in 0..10 {
+                for chunk_size in [1, 17, 23] {
+                    let upload = Box::<InstrumentedUpload>::default();
+                    let chunks = Arc::clone(&upload.chunks);
+                    let mut write = 
WriteMultipart::new_with_chunk_size(upload, chunk_size);
+
+                    let mut expected = Vec::with_capacity(1024);
+
+                    for _ in 0..50 {
+                        let chunk_size = rng.gen_range(0..30);
+                        let data: Vec<_> = (0..chunk_size).map(|_| 
rng.gen()).collect();
+                        expected.extend_from_slice(&data);
+
+                        match rng.gen_bool(method) {
+                            true => write.put(data.into()),
+                            false => write.write(&data),
+                        }
+                    }
+                    write.finish().await.unwrap();
+
+                    let chunks = chunks.lock();
+
+                    let actual: Vec<_> = 
chunks.iter().flatten().flatten().copied().collect();
+                    assert_eq!(expected, actual);
+
+                    for chunk in chunks.iter().take(chunks.len() - 1) {
+                        assert_eq!(chunk.content_length(), chunk_size)
+                    }
+
+                    let last_chunk = chunks.last().unwrap().content_length();
+                    assert!(last_chunk <= chunk_size, "{chunk_size}");
+                }
+            }
+        }
+    }
 }

Reply via email to