This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 7efe6c20530 Add additional WriteMultipart tests (#5743) (#5746)
7efe6c20530 is described below
commit 7efe6c20530c1d73b90a2a0125e144f93ef3010f
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat May 11 09:34:56 2024 +0100
Add additional WriteMultipart tests (#5743) (#5746)
* Add additional WriteMultipart tests (#5743)
* Clippy
---
object_store/src/integration.rs | 12 +++++++
object_store/src/upload.rs | 69 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 81 insertions(+)
diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs
index d08c4509f36..31b074f4987 100644
--- a/object_store/src/integration.rs
+++ b/object_store/src/integration.rs
@@ -799,6 +799,18 @@ pub async fn stream_get(storage: &DynObjectStore) {
let meta = storage.head(&location).await.unwrap();
assert_eq!(meta.size, 6);
+ let location = Path::from("test_dir/test_put_part_mixed.txt");
+ let upload = storage.put_multipart(&location).await.unwrap();
+ let mut write = WriteMultipart::new(upload);
+ write.put(vec![0; 2].into());
+ write.write(&[1, 2, 3]);
+ write.put(vec![4, 5, 6, 7].into());
+ write.finish().await.unwrap();
+
+ let r = storage.get(&location).await.unwrap();
+ let r = r.bytes().await.unwrap();
+ assert_eq!(r.as_ref(), &[0, 0, 1, 2, 3, 4, 5, 6, 7]);
+
// We can abort an empty write
let location = Path::from("test_dir/test_abort_upload.txt");
let mut upload = storage.put_multipart(&location).await.unwrap();
diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs
index 9805df0dda7..e5f683a034a 100644
--- a/object_store/src/upload.rs
+++ b/object_store/src/upload.rs
@@ -217,9 +217,13 @@ impl WriteMultipart {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
use std::time::Duration;
use futures::FutureExt;
+ use parking_lot::Mutex;
+ use rand::prelude::StdRng;
+ use rand::{Rng, SeedableRng};
use crate::memory::InMemory;
use crate::path::Path;
@@ -246,4 +250,69 @@ mod tests {
assert!(write.wait_for_capacity(10).now_or_never().is_none());
write.wait_for_capacity(10).await.unwrap()
}
+
+ #[derive(Debug, Default)]
+ struct InstrumentedUpload {
+ chunks: Arc<Mutex<Vec<PutPayload>>>,
+ }
+
+ #[async_trait]
+ impl MultipartUpload for InstrumentedUpload {
+ fn put_part(&mut self, data: PutPayload) -> UploadPart {
+ self.chunks.lock().push(data);
+ futures::future::ready(Ok(())).boxed()
+ }
+
+ async fn complete(&mut self) -> Result<PutResult> {
+ Ok(PutResult {
+ e_tag: None,
+ version: None,
+ })
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ unimplemented!()
+ }
+ }
+
+ #[tokio::test]
+ async fn test_write_multipart() {
+ let mut rng = StdRng::seed_from_u64(42);
+
+ for method in [0.0, 0.5, 1.0] {
+ for _ in 0..10 {
+ for chunk_size in [1, 17, 23] {
+ let upload = Box::<InstrumentedUpload>::default();
+ let chunks = Arc::clone(&upload.chunks);
+ let mut write =
WriteMultipart::new_with_chunk_size(upload, chunk_size);
+
+ let mut expected = Vec::with_capacity(1024);
+
+ for _ in 0..50 {
+ let chunk_size = rng.gen_range(0..30);
+ let data: Vec<_> = (0..chunk_size).map(|_|
rng.gen()).collect();
+ expected.extend_from_slice(&data);
+
+ match rng.gen_bool(method) {
+ true => write.put(data.into()),
+ false => write.write(&data),
+ }
+ }
+ write.finish().await.unwrap();
+
+ let chunks = chunks.lock();
+
+ let actual: Vec<_> =
chunks.iter().flatten().flatten().copied().collect();
+ assert_eq!(expected, actual);
+
+ for chunk in chunks.iter().take(chunks.len() - 1) {
+ assert_eq!(chunk.content_length(), chunk_size)
+ }
+
+ let last_chunk = chunks.last().unwrap().content_length();
+ assert!(last_chunk <= chunk_size, "{chunk_size}");
+ }
+ }
+ }
+ }
}