This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b90a261c2b feat: add `Extensions` to object store `PutMultipartOpts`
(#7214)
b90a261c2b is described below
commit b90a261c2ba296a3263ec0ffb0df495365bb3b6c
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Feb 27 18:47:09 2025 +0100
feat: add `Extensions` to object store `PutMultipartOpts` (#7214)
---
object_store/src/aws/client.rs | 11 +++++++++--
object_store/src/azure/client.rs | 11 +++++++++--
object_store/src/buffered.rs | 17 +++++++++++++++++
object_store/src/gcp/client.rs | 10 +++++++++-
object_store/src/lib.rs | 27 ++++++++++++++++++++++++++-
5 files changed, 70 insertions(+), 6 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 6cf5540000..fb2a033c3b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -633,6 +633,12 @@ impl S3Client {
location: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
+ let PutMultipartOpts {
+ tags,
+ attributes,
+ extensions,
+ } = opts;
+
let mut request = self.request(Method::POST, location);
if let Some(algorithm) = self.config.checksum {
match algorithm {
@@ -644,8 +650,9 @@ impl S3Client {
let response = request
.query(&[("uploads", "")])
.with_encryption_headers()
- .with_attributes(opts.attributes)
- .with_tags(opts.tags)
+ .with_attributes(attributes)
+ .with_tags(tags)
+ .with_extensions(extensions)
.idempotent(true)
.send()
.await?
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index c4d026bcb8..dbeae63460 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -599,6 +599,12 @@ impl AzureClient {
parts: Vec<PartId>,
opts: PutMultipartOpts,
) -> Result<PutResult> {
+ let PutMultipartOpts {
+ tags,
+ attributes,
+ extensions,
+ } = opts;
+
let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
@@ -607,8 +613,9 @@ impl AzureClient {
let payload = BlockList { blocks }.to_xml().into();
let response = self
.put_request(path, payload)
- .with_attributes(opts.attributes)
- .with_tags(opts.tags)
+ .with_attributes(attributes)
+ .with_tags(tags)
+ .with_extensions(extensions)
.query(&[("comp", "blocklist")])
.idempotent(true)
.send()
diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs
index fcd7e064e7..a767cb65c9 100644
--- a/object_store/src/buffered.rs
+++ b/object_store/src/buffered.rs
@@ -222,6 +222,7 @@ pub struct BufWriter {
max_concurrency: usize,
attributes: Option<Attributes>,
tags: Option<TagSet>,
+ extensions: Option<::http::Extensions>,
state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
@@ -259,6 +260,7 @@ impl BufWriter {
max_concurrency: 8,
attributes: None,
tags: None,
+ extensions: None,
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
}
}
@@ -289,6 +291,19 @@ impl BufWriter {
}
}
+ /// Set the extensions of the uploaded object
+ ///
+ /// Implementation-specific extensions. Intended for use by
[`ObjectStore`] implementations
+ /// that need to pass context-specific information (like tracing spans)
via trait methods.
+ ///
+ /// These extensions are ignored entirely by backends offered through this
crate.
+ pub fn with_extensions(self, extensions: ::http::Extensions) -> Self {
+ Self {
+ extensions: Some(extensions),
+ ..self
+ }
+ }
+
/// Write data to the writer in [`Bytes`].
///
/// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra
copying.
@@ -325,6 +340,7 @@ impl BufWriter {
let opts = PutMultipartOpts {
attributes:
self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
+ extensions:
self.extensions.take().unwrap_or_default(),
};
let upload = self.store.put_multipart_opts(&path,
opts).await?;
let mut chunked =
@@ -384,6 +400,7 @@ impl AsyncWrite for BufWriter {
let opts = PutMultipartOpts {
attributes:
self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
+ extensions:
self.extensions.take().unwrap_or_default(),
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async
move {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index e514624f8f..1cc72964f8 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -453,9 +453,17 @@ impl GoogleCloudStorageClient {
path: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
+ let PutMultipartOpts {
+ // not supported by GCP
+ tags: _,
+ attributes,
+ extensions,
+ } = opts;
+
let response = self
.request(Method::POST, path)
- .with_attributes(opts.attributes)
+ .with_attributes(attributes)
+ .with_extensions(extensions)
.header(&CONTENT_LENGTH, "0")
.query(&[("uploads", "")])
.send()
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 8f05fb3911..5db7e01d7d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1223,7 +1223,7 @@ impl From<Attributes> for PutOptions {
}
/// Options for [`ObjectStore::put_multipart_opts`]
-#[derive(Debug, Clone, PartialEq, Eq, Default)]
+#[derive(Debug, Clone, Default)]
pub struct PutMultipartOpts {
/// Provide a [`TagSet`] for this object
///
@@ -1233,8 +1233,33 @@ pub struct PutMultipartOpts {
///
/// Implementations that don't support an attribute should return an error
pub attributes: Attributes,
+ /// Implementation-specific extensions. Intended for use by
[`ObjectStore`] implementations
+ /// that need to pass context-specific information (like tracing spans)
via trait methods.
+ ///
+ /// These extensions are ignored entirely by backends offered through this
crate.
+ ///
+ /// They are also eclused from [`PartialEq`] and [`Eq`].
+ pub extensions: ::http::Extensions,
+}
+
+impl PartialEq<Self> for PutMultipartOpts {
+ fn eq(&self, other: &Self) -> bool {
+ let Self {
+ tags,
+ attributes,
+ extensions: _,
+ } = self;
+ let Self {
+ tags: other_tags,
+ attributes: other_attributes,
+ extensions: _,
+ } = other;
+ (tags == other_tags) && (attributes == other_attributes)
+ }
}
+impl Eq for PutMultipartOpts {}
+
impl From<TagSet> for PutMultipartOpts {
fn from(tags: TagSet) -> Self {
Self {