This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 93842246c8 feat: add `Extensions` to object store `PutOptions` (#7213)
93842246c8 is described below
commit 93842246c84f8e14f4789c1a02b66c019438cef7
Author: Marco Neumann <[email protected]>
AuthorDate: Thu Feb 27 18:11:22 2025 +0100
feat: add `Extensions` to object store `PutOptions` (#7213)
Follow-up to #7170.
---
object_store/src/aws/client.rs | 5 +++++
object_store/src/aws/mod.rs | 14 +++++++++++---
object_store/src/azure/client.rs | 19 ++++++++++++++++---
object_store/src/gcp/client.rs | 20 +++++++++++++++++---
object_store/src/lib.rs | 29 ++++++++++++++++++++++++++++-
5 files changed, 77 insertions(+), 10 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index d8132d019a..6cf5540000 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -389,6 +389,11 @@ impl Request<'_> {
Self { builder, ..self }
}
+ pub(crate) fn with_extensions(self, extensions: ::http::Extensions) ->
Self {
+ let builder = self.builder.extensions(extensions);
+ Self { builder, ..self }
+ }
+
pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
if (!self.config.skip_signature && self.config.sign_payload)
|| self.config.checksum.is_some()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 2c1852b4bb..76e298fe1b 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -159,15 +159,23 @@ impl ObjectStore for AmazonS3 {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
+ let PutOptions {
+ mode,
+ tags,
+ attributes,
+ extensions,
+ } = opts;
+
let request = self
.client
.request(Method::PUT, location)
.with_payload(payload)
- .with_attributes(opts.attributes)
- .with_tags(opts.tags)
+ .with_attributes(attributes)
+ .with_tags(tags)
+ .with_extensions(extensions)
.with_encryption_headers();
- match (opts.mode, &self.client.config.conditional_put) {
+ match (mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
(PutMode::Create, S3ConditionalPut::Disabled) =>
Err(Error::NotImplemented),
(PutMode::Create, S3ConditionalPut::ETagMatch) => {
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 13e40bbf96..c4d026bcb8 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -257,6 +257,11 @@ impl PutRequest<'_> {
Self { builder, ..self }
}
+ fn with_extensions(self, extensions: ::http::Extensions) -> Self {
+ let builder = self.builder.extensions(extensions);
+ Self { builder, ..self }
+ }
+
async fn send(self) -> Result<HttpResponse> {
let credential = self.config.get_credential().await?;
let sensitive = credential
@@ -540,12 +545,20 @@ impl AzureClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
+ let PutOptions {
+ mode,
+ tags,
+ attributes,
+ extensions,
+ } = opts;
+
let builder = self
.put_request(path, payload)
- .with_attributes(opts.attributes)
- .with_tags(opts.tags);
+ .with_attributes(attributes)
+ .with_extensions(extensions)
+ .with_tags(tags);
- let builder = match &opts.mode {
+ let builder = match &mode {
PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
PutMode::Update(v) => {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index a52ad3663f..e514624f8f 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -231,6 +231,11 @@ impl Request<'_> {
}
}
+ fn with_extensions(self, extensions: ::http::Extensions) -> Self {
+ let builder = self.builder.extensions(extensions);
+ Self { builder, ..self }
+ }
+
async fn send(self) -> Result<HttpResponse> {
let credential = self.config.credentials.get_credential().await?;
let resp = self
@@ -384,12 +389,21 @@ impl GoogleCloudStorageClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
+ let PutOptions {
+ mode,
+ // not supported by GCP
+ tags: _,
+ attributes,
+ extensions,
+ } = opts;
+
let builder = self
.request(Method::PUT, path)
.with_payload(payload)
- .with_attributes(opts.attributes);
+ .with_attributes(attributes)
+ .with_extensions(extensions);
- let builder = match &opts.mode {
+ let builder = match &mode {
PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&VERSION_MATCH, "0"),
PutMode::Update(v) => {
@@ -398,7 +412,7 @@ impl GoogleCloudStorageClient {
}
};
- match (opts.mode, builder.do_put().await) {
+ match (mode, builder.do_put().await) {
(PutMode::Create, Err(crate::Error::Precondition { path, source
})) => {
Err(crate::Error::AlreadyExists { path, source })
}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 21352f5761..8f05fb3911 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -1154,7 +1154,7 @@ impl From<PutResult> for UpdateVersion {
}
/// Options for a put request
-#[derive(Debug, Clone, PartialEq, Eq, Default)]
+#[derive(Debug, Clone, Default)]
pub struct PutOptions {
/// Configure the [`PutMode`] for this operation
pub mode: PutMode,
@@ -1166,8 +1166,35 @@ pub struct PutOptions {
///
/// Implementations that don't support an attribute should return an error
pub attributes: Attributes,
+ /// Implementation-specific extensions. Intended for use by
[`ObjectStore`] implementations
+ /// that need to pass context-specific information (like tracing spans)
via trait methods.
+ ///
+ /// These extensions are ignored entirely by backends offered through this
crate.
+ ///
+ /// They are also eclused from [`PartialEq`] and [`Eq`].
+ pub extensions: ::http::Extensions,
}
+impl PartialEq<Self> for PutOptions {
+ fn eq(&self, other: &Self) -> bool {
+ let Self {
+ mode,
+ tags,
+ attributes,
+ extensions: _,
+ } = self;
+ let Self {
+ mode: other_mode,
+ tags: other_tags,
+ attributes: other_attributes,
+ extensions: _,
+ } = other;
+ (mode == other_mode) && (tags == other_tags) && (attributes ==
other_attributes)
+ }
+}
+
+impl Eq for PutOptions {}
+
impl From<PutMode> for PutOptions {
fn from(mode: PutMode) -> Self {
Self {