This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b90a261c2b feat: add `Extensions` to object store `PutMultipartOpts` 
(#7214)
b90a261c2b is described below

commit b90a261c2ba296a3263ec0ffb0df495365bb3b6c
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Feb 27 18:47:09 2025 +0100

    feat: add `Extensions` to object store `PutMultipartOpts` (#7214)
---
 object_store/src/aws/client.rs   | 11 +++++++++--
 object_store/src/azure/client.rs | 11 +++++++++--
 object_store/src/buffered.rs     | 17 +++++++++++++++++
 object_store/src/gcp/client.rs   | 10 +++++++++-
 object_store/src/lib.rs          | 27 ++++++++++++++++++++++++++-
 5 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 6cf5540000..fb2a033c3b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -633,6 +633,12 @@ impl S3Client {
         location: &Path,
         opts: PutMultipartOpts,
     ) -> Result<MultipartId> {
+        let PutMultipartOpts {
+            tags,
+            attributes,
+            extensions,
+        } = opts;
+
         let mut request = self.request(Method::POST, location);
         if let Some(algorithm) = self.config.checksum {
             match algorithm {
@@ -644,8 +650,9 @@ impl S3Client {
         let response = request
             .query(&[("uploads", "")])
             .with_encryption_headers()
-            .with_attributes(opts.attributes)
-            .with_tags(opts.tags)
+            .with_attributes(attributes)
+            .with_tags(tags)
+            .with_extensions(extensions)
             .idempotent(true)
             .send()
             .await?
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index c4d026bcb8..dbeae63460 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -599,6 +599,12 @@ impl AzureClient {
         parts: Vec<PartId>,
         opts: PutMultipartOpts,
     ) -> Result<PutResult> {
+        let PutMultipartOpts {
+            tags,
+            attributes,
+            extensions,
+        } = opts;
+
         let blocks = parts
             .into_iter()
             .map(|part| BlockId::from(part.content_id))
@@ -607,8 +613,9 @@ impl AzureClient {
         let payload = BlockList { blocks }.to_xml().into();
         let response = self
             .put_request(path, payload)
-            .with_attributes(opts.attributes)
-            .with_tags(opts.tags)
+            .with_attributes(attributes)
+            .with_tags(tags)
+            .with_extensions(extensions)
             .query(&[("comp", "blocklist")])
             .idempotent(true)
             .send()
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index fcd7e064e7..a767cb65c9 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -222,6 +222,7 @@ pub struct BufWriter {
     max_concurrency: usize,
     attributes: Option<Attributes>,
     tags: Option<TagSet>,
+    extensions: Option<::http::Extensions>,
     state: BufWriterState,
     store: Arc<dyn ObjectStore>,
 }
@@ -259,6 +260,7 @@ impl BufWriter {
             max_concurrency: 8,
             attributes: None,
             tags: None,
+            extensions: None,
             state: BufWriterState::Buffer(path, PutPayloadMut::new()),
         }
     }
@@ -289,6 +291,19 @@ impl BufWriter {
         }
     }
 
+    /// Set the extensions of the uploaded object
+    ///
+    /// Implementation-specific extensions. Intended for use by 
[`ObjectStore`] implementations
+    /// that need to pass context-specific information (like tracing spans) 
via trait methods.
+    ///
+    /// These extensions are ignored entirely by backends offered through this 
crate.
+    pub fn with_extensions(self, extensions: ::http::Extensions) -> Self {
+        Self {
+            extensions: Some(extensions),
+            ..self
+        }
+    }
+
     /// Write data to the writer in [`Bytes`].
     ///
     /// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra 
copying.
@@ -325,6 +340,7 @@ impl BufWriter {
                         let opts = PutMultipartOpts {
                             attributes: 
self.attributes.take().unwrap_or_default(),
                             tags: self.tags.take().unwrap_or_default(),
+                            extensions: 
self.extensions.take().unwrap_or_default(),
                         };
                         let upload = self.store.put_multipart_opts(&path, 
opts).await?;
                         let mut chunked =
@@ -384,6 +400,7 @@ impl AsyncWrite for BufWriter {
                         let opts = PutMultipartOpts {
                             attributes: 
self.attributes.take().unwrap_or_default(),
                             tags: self.tags.take().unwrap_or_default(),
+                            extensions: 
self.extensions.take().unwrap_or_default(),
                         };
                         let store = Arc::clone(&self.store);
                         self.state = BufWriterState::Prepare(Box::pin(async 
move {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index e514624f8f..1cc72964f8 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -453,9 +453,17 @@ impl GoogleCloudStorageClient {
         path: &Path,
         opts: PutMultipartOpts,
     ) -> Result<MultipartId> {
+        let PutMultipartOpts {
+            // not supported by GCP
+            tags: _,
+            attributes,
+            extensions,
+        } = opts;
+
         let response = self
             .request(Method::POST, path)
-            .with_attributes(opts.attributes)
+            .with_attributes(attributes)
+            .with_extensions(extensions)
             .header(&CONTENT_LENGTH, "0")
             .query(&[("uploads", "")])
             .send()
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 8f05fb3911..5db7e01d7d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1223,7 +1223,7 @@ impl From<Attributes> for PutOptions {
 }
 
 /// Options for [`ObjectStore::put_multipart_opts`]
-#[derive(Debug, Clone, PartialEq, Eq, Default)]
+#[derive(Debug, Clone, Default)]
 pub struct PutMultipartOpts {
     /// Provide a [`TagSet`] for this object
     ///
@@ -1233,8 +1233,33 @@ pub struct PutMultipartOpts {
     ///
     /// Implementations that don't support an attribute should return an error
     pub attributes: Attributes,
+    /// Implementation-specific extensions. Intended for use by 
[`ObjectStore`] implementations
+    /// that need to pass context-specific information (like tracing spans) 
via trait methods.
+    ///
+    /// These extensions are ignored entirely by backends offered through this 
crate.
+    ///
+    /// They are also eclused from [`PartialEq`] and [`Eq`].
+    pub extensions: ::http::Extensions,
+}
+
+impl PartialEq<Self> for PutMultipartOpts {
+    fn eq(&self, other: &Self) -> bool {
+        let Self {
+            tags,
+            attributes,
+            extensions: _,
+        } = self;
+        let Self {
+            tags: other_tags,
+            attributes: other_attributes,
+            extensions: _,
+        } = other;
+        (tags == other_tags) && (attributes == other_attributes)
+    }
 }
 
+impl Eq for PutMultipartOpts {}
+
 impl From<TagSet> for PutMultipartOpts {
     fn from(tags: TagSet) -> Self {
         Self {

Reply via email to