This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 08af4710fcd Add `BufWriter::with_attributes` and `::with_tags` in 
`object_store` (#5693)
08af4710fcd is described below

commit 08af4710fcd2b56a7624db9b9d97e6715a952cb0
Author: nett_hier <[email protected]>
AuthorDate: Fri Apr 26 13:06:52 2024 +0200

    Add `BufWriter::with_attributes` and `::with_tags` in `object_store` (#5693)
    
    * Add `BufWriter::with_attributes`
    
    Signed-off-by: netthier <[email protected]>
    
    * Add `BufWriter::with_tags`
    
    Signed-off-by: netthier <[email protected]>
    
    ---------
    
    Signed-off-by: netthier <[email protected]>
---
 object_store/src/aws/mod.rs   | 14 +++++---
 object_store/src/azure/mod.rs | 14 +++++---
 object_store/src/buffered.rs  | 77 ++++++++++++++++++++++++++++++++++++++-----
 object_store/src/lib.rs       | 13 ++++++--
 4 files changed, 99 insertions(+), 19 deletions(-)

diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 7f1edf12faf..5bc6d56e7c7 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -437,10 +437,16 @@ mod tests {
 
         // Object tagging is not supported by S3 Express One Zone
         if config.session_provider.is_none() {
-            tagging(&integration, !config.disable_tagging, |p| {
-                let client = Arc::clone(&integration.client);
-                async move { client.get_object_tagging(&p).await }
-            })
+            tagging(
+                Arc::new(AmazonS3 {
+                    client: Arc::clone(&integration.client),
+                }),
+                !config.disable_tagging,
+                |p| {
+                    let client = Arc::clone(&integration.client);
+                    async move { client.get_object_tagging(&p).await }
+                },
+            )
             .await;
         }
 
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 25ae6dda68a..755f3b1265f 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -296,10 +296,16 @@ mod tests {
         signing(&integration).await;
 
         let validate = !integration.client.config().disable_tagging;
-        tagging(&integration, validate, |p| {
-            let client = Arc::clone(&integration.client);
-            async move { client.get_blob_tagging(&p).await }
-        })
+        tagging(
+            Arc::new(MicrosoftAzure {
+                client: Arc::clone(&integration.client),
+            }),
+            validate,
+            |p| {
+                let client = Arc::clone(&integration.client);
+                async move { client.get_blob_tagging(&p).await }
+            },
+        )
         .await;
 
         // Azurite doesn't support attributes properly
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index d41224177a3..feb84d4d0bc 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -18,7 +18,10 @@
 //! Utilities for performing tokio-style buffered IO
 
 use crate::path::Path;
-use crate::{ObjectMeta, ObjectStore, PutPayloadMut, WriteMultipart};
+use crate::{
+    Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, 
PutPayloadMut, TagSet,
+    WriteMultipart,
+};
 use bytes::Bytes;
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
@@ -217,6 +220,8 @@ impl AsyncBufRead for BufReader {
 pub struct BufWriter {
     capacity: usize,
     max_concurrency: usize,
+    attributes: Option<Attributes>,
+    tags: Option<TagSet>,
     state: BufWriterState,
     store: Arc<dyn ObjectStore>,
 }
@@ -252,6 +257,8 @@ impl BufWriter {
             capacity,
             store,
             max_concurrency: 8,
+            attributes: None,
+            tags: None,
             state: BufWriterState::Buffer(path, PutPayloadMut::new()),
         }
     }
@@ -266,6 +273,22 @@ impl BufWriter {
         }
     }
 
+    /// Set the attributes of the uploaded object
+    pub fn with_attributes(self, attributes: Attributes) -> Self {
+        Self {
+            attributes: Some(attributes),
+            ..self
+        }
+    }
+
+    /// Set the tags of the uploaded object
+    pub fn with_tags(self, tags: TagSet) -> Self {
+        Self {
+            tags: Some(tags),
+            ..self
+        }
+    }
+
     /// Abort this writer, cleaning up any partially uploaded state
     ///
     /// # Panic
@@ -306,9 +329,13 @@ impl AsyncWrite for BufWriter {
                     if b.content_length().saturating_add(buf.len()) >= cap {
                         let buffer = std::mem::take(b);
                         let path = std::mem::take(path);
+                        let opts = PutMultipartOpts {
+                            attributes: 
self.attributes.take().unwrap_or_default(),
+                            tags: self.tags.take().unwrap_or_default(),
+                        };
                         let store = Arc::clone(&self.store);
                         self.state = BufWriterState::Prepare(Box::pin(async 
move {
-                            let upload = store.put_multipart(&path).await?;
+                            let upload = store.put_multipart_opts(&path, 
opts).await?;
                             let mut chunked = 
WriteMultipart::new_with_chunk_size(upload, cap);
                             for chunk in buffer.freeze() {
                                 chunked.put(chunk);
@@ -346,9 +373,14 @@ impl AsyncWrite for BufWriter {
                 BufWriterState::Buffer(p, b) => {
                     let buf = std::mem::take(b);
                     let path = std::mem::take(p);
+                    let opts = PutOptions {
+                        attributes: self.attributes.take().unwrap_or_default(),
+                        tags: self.tags.take().unwrap_or_default(),
+                        ..Default::default()
+                    };
                     let store = Arc::clone(&self.store);
                     self.state = BufWriterState::Flush(Box::pin(async move {
-                        store.put(&path, buf.into()).await?;
+                        store.put_opts(&path, buf.into(), opts).await?;
                         Ok(())
                     }));
                 }
@@ -383,6 +415,7 @@ mod tests {
     use super::*;
     use crate::memory::InMemory;
     use crate::path::Path;
+    use crate::{Attribute, GetOptions};
     use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, 
AsyncWriteExt};
 
     #[tokio::test]
@@ -464,26 +497,54 @@ mod tests {
         }
     }
 
+    // Note: `BufWriter::with_tags` functionality is tested in 
`crate::tests::tagging`
     #[tokio::test]
     async fn test_buf_writer() {
         let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
         let path = Path::from("file.txt");
+        let attributes = Attributes::from_iter([
+            (Attribute::ContentType, "text/html"),
+            (Attribute::CacheControl, "max-age=604800"),
+        ]);
 
         // Test put
-        let mut writer = BufWriter::with_capacity(Arc::clone(&store), 
path.clone(), 30);
+        let mut writer = BufWriter::with_capacity(Arc::clone(&store), 
path.clone(), 30)
+            .with_attributes(attributes.clone());
         writer.write_all(&[0; 20]).await.unwrap();
         writer.flush().await.unwrap();
         writer.write_all(&[0; 5]).await.unwrap();
         writer.shutdown().await.unwrap();
-        assert_eq!(store.head(&path).await.unwrap().size, 25);
+        let response = store
+            .get_opts(
+                &path,
+                GetOptions {
+                    head: true,
+                    ..Default::default()
+                },
+            )
+            .await
+            .unwrap();
+        assert_eq!(response.meta.size, 25);
+        assert_eq!(response.attributes, attributes);
 
         // Test multipart
-        let mut writer = BufWriter::with_capacity(Arc::clone(&store), 
path.clone(), 30);
+        let mut writer = BufWriter::with_capacity(Arc::clone(&store), 
path.clone(), 30)
+            .with_attributes(attributes.clone());
         writer.write_all(&[0; 20]).await.unwrap();
         writer.flush().await.unwrap();
         writer.write_all(&[0; 20]).await.unwrap();
         writer.shutdown().await.unwrap();
-
-        assert_eq!(store.head(&path).await.unwrap().size, 40);
+        let response = store
+            .get_opts(
+                &path,
+                GetOptions {
+                    head: true,
+                    ..Default::default()
+                },
+            )
+            .await
+            .unwrap();
+        assert_eq!(response.meta.size, 40);
+        assert_eq!(response.attributes, attributes);
     }
 }
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index c99e15a4933..9a8f77b4d82 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1314,12 +1314,14 @@ mod test_util {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::buffered::BufWriter;
     use crate::multipart::MultipartStore;
     use crate::test_util::flatten_list_stream;
     use chrono::TimeZone;
     use futures::stream::FuturesUnordered;
     use rand::distributions::Alphanumeric;
     use rand::{thread_rng, Rng};
+    use tokio::io::AsyncWriteExt;
 
     pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
         delete_fixtures(storage).await;
@@ -2365,7 +2367,7 @@ mod tests {
     }
 
     #[cfg(any(feature = "aws", feature = "azure"))]
-    pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: 
bool, get_tags: F)
+    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, 
validate: bool, get_tags: F)
     where
         F: Fn(Path) -> Fut + Send + Sync,
         Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
@@ -2415,19 +2417,24 @@ mod tests {
 
         let multi_path = Path::from("tag_test_multi");
         let mut write = storage
-            .put_multipart_opts(&multi_path, tag_set.into())
+            .put_multipart_opts(&multi_path, tag_set.clone().into())
             .await
             .unwrap();
 
         write.put_part("foo".into()).await.unwrap();
         write.complete().await.unwrap();
 
+        let buf_path = Path::from("tag_test_buf");
+        let mut buf = BufWriter::new(storage, 
buf_path.clone()).with_tags(tag_set);
+        buf.write_all(b"foo").await.unwrap();
+        buf.shutdown().await.unwrap();
+
         // Write should always succeed, but certain configurations may simply 
ignore tags
         if !validate {
             return;
         }
 
-        for path in [path, multi_path] {
+        for path in [path, multi_path, buf_path] {
             let resp = get_tags(path.clone()).await.unwrap();
             let body = resp.bytes().await.unwrap();
 

Reply via email to