This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4b49c344406 Add put_multipart_opts (#5435) (#5652)
4b49c344406 is described below
commit 4b49c344406b63825ef41efbdc85cf09afe35966
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Apr 17 11:51:56 2024 +0100
Add put_multipart_opts (#5435) (#5652)
* Add put_multipart_opts (#5435)
---
object_store/src/aws/client.rs | 203 +++++++++++++++++------------------
object_store/src/aws/mod.rs | 48 ++++++---
object_store/src/azure/client.rs | 93 +++++++++-------
object_store/src/azure/mod.rs | 16 ++-
object_store/src/chunked.rs | 10 +-
object_store/src/gcp/client.rs | 174 ++++++++++++++----------------
object_store/src/gcp/mod.rs | 15 ++-
object_store/src/http/mod.rs | 8 +-
object_store/src/lib.rs | 104 ++++++++++++++++--
object_store/src/limit.rs | 16 ++-
object_store/src/local.rs | 12 ++-
object_store/src/memory.rs | 21 ++--
object_store/src/prefix.rs | 13 ++-
object_store/src/throttle.rs | 14 ++-
object_store/tests/get_range_file.rs | 6 +-
15 files changed, 461 insertions(+), 292 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index e81ef6aa220..4a4dc178d5b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -19,8 +19,8 @@ use crate::aws::builder::S3EncryptionHeaders;
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
use crate::aws::{
- AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists,
STORE,
- STRICT_PATH_ENCODE_SET,
+ AwsAuthorizer, AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists,
COPY_SOURCE_HEADER,
+ STORE, STRICT_PATH_ENCODE_SET, TAGS_HEADER,
};
use crate::client::get::GetClient;
use crate::client::header::{get_etag, HeaderConfig};
@@ -35,16 +35,16 @@ use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
- Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
Path, PutPayload,
- PutResult, Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
Path,
+ PutMultipartOpts, PutPayload, PutResult, Result, RetryConfig, TagSet,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
-use hyper::http;
use hyper::http::HeaderName;
+use hyper::{http, HeaderMap};
use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
@@ -98,9 +98,6 @@ pub(crate) enum Error {
#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },
- #[snafu(display("Error performing create multipart request: {}", source))]
- CreateMultipartRequest { source: crate::client::retry::Error },
-
#[snafu(display("Error getting create multipart response body: {}",
source))]
CreateMultipartResponseBody { source: reqwest::Error },
@@ -289,8 +286,75 @@ impl<'a> Request<'a> {
Self { builder, ..self }
}
- pub fn idempotent(mut self, idempotent: bool) -> Self {
- self.idempotent = idempotent;
+ pub fn headers(self, headers: HeaderMap) -> Self {
+ let builder = self.builder.headers(headers);
+ Self { builder, ..self }
+ }
+
+ pub fn idempotent(self, idempotent: bool) -> Self {
+ Self { idempotent, ..self }
+ }
+
+ pub fn with_encryption_headers(self) -> Self {
+ let headers = self.config.encryption_headers.clone().into();
+ let builder = self.builder.headers(headers);
+ Self { builder, ..self }
+ }
+
+ pub fn with_session_creds(self, use_session_creds: bool) -> Self {
+ Self {
+ use_session_creds,
+ ..self
+ }
+ }
+
+ pub fn with_tags(mut self, tags: TagSet) -> Self {
+ let tags = tags.encoded();
+ if !tags.is_empty() && !self.config.disable_tagging {
+ self.builder = self.builder.header(&TAGS_HEADER, tags);
+ }
+ self
+ }
+
+ pub fn with_attributes(self, attributes: Attributes) -> Self {
+ let mut has_content_type = false;
+ let mut builder = self.builder;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) =
self.config.client_options.get_content_type(self.path) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
+ }
+ Self { builder, ..self }
+ }
+
+ pub fn with_payload(mut self, payload: PutPayload) -> Self {
+ if !self.config.skip_signature || self.config.checksum.is_some() {
+ let mut sha256 = Context::new(&digest::SHA256);
+ payload.iter().for_each(|x| sha256.update(x));
+ let payload_sha256 = sha256.finish();
+
+ if let Some(Checksum::SHA256) = self.config.checksum {
+ self.builder = self.builder.header(
+ "x-amz-checksum-sha256",
+ BASE64_STANDARD.encode(payload_sha256),
+ );
+ }
+ self.payload_sha256 = Some(payload_sha256);
+ }
+
+ let content_length = payload.content_length();
+ self.builder = self.builder.header(CONTENT_LENGTH, content_length);
+ self.payload = Some(payload);
self
}
@@ -335,81 +399,19 @@ impl S3Client {
Ok(Self { config, client })
}
- /// Make an S3 PUT request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
- ///
- /// Returns the ETag
- pub fn put_request<'a>(
- &'a self,
- path: &'a Path,
- payload: PutPayload,
- attributes: Attributes,
- with_encryption_headers: bool,
- ) -> Request<'a> {
+ pub fn request<'a>(&'a self, method: Method, path: &'a Path) ->
Request<'a> {
let url = self.config.path_url(path);
- let mut builder = self.client.request(Method::PUT, url);
- if with_encryption_headers {
- builder =
builder.headers(self.config.encryption_headers.clone().into());
- }
-
- let mut sha256 = Context::new(&digest::SHA256);
- payload.iter().for_each(|x| sha256.update(x));
- let payload_sha256 = sha256.finish();
-
- if let Some(Checksum::SHA256) = self.config.checksum {
- builder = builder.header(
- "x-amz-checksum-sha256",
- BASE64_STANDARD.encode(payload_sha256),
- )
- }
-
- let mut has_content_type = false;
- for (k, v) in &attributes {
- builder = match k {
- Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
- Attribute::ContentType => {
- has_content_type = true;
- builder.header(CONTENT_TYPE, v.as_ref())
- }
- };
- }
-
- if !has_content_type {
- if let Some(value) =
self.config.client_options.get_content_type(path) {
- builder = builder.header(CONTENT_TYPE, value);
- }
- }
-
Request {
path,
- builder: builder.header(CONTENT_LENGTH, payload.content_length()),
- payload: Some(payload),
- payload_sha256: Some(payload_sha256),
+ builder: self.client.request(method, url),
+ payload: None,
+ payload_sha256: None,
config: &self.config,
use_session_creds: true,
idempotent: false,
}
}
- /// Make an S3 Delete request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
- pub async fn delete_request<T: Serialize + ?Sized + Sync>(
- &self,
- path: &Path,
- query: &T,
- ) -> Result<()> {
- let credential = self.config.get_session_credential().await?;
- let url = self.config.path_url(path);
-
- self.client
- .request(Method::DELETE, url)
- .query(query)
- .with_aws_sigv4(credential.authorizer(), None)
- .send_retry(&self.config.retry_config)
- .await
- .map_err(|e| e.error(STORE, path.to_string()))?;
-
- Ok(())
- }
-
/// Make an S3 Delete Objects request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html>
///
/// Produces a vector of results, one for each path in the input vector. If
@@ -513,41 +515,29 @@ impl S3Client {
}
/// Make an S3 Copy request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
- pub fn copy_request<'a>(&'a self, from: &'a Path, to: &Path) ->
Request<'a> {
- let url = self.config.path_url(to);
+ pub fn copy_request<'a>(&'a self, from: &Path, to: &'a Path) ->
Request<'a> {
let source = format!("{}/{}", self.config.bucket, encode_path(from));
-
- let builder = self
- .client
- .request(Method::PUT, url)
- .header("x-amz-copy-source", source)
- .headers(self.config.encryption_headers.clone().into());
-
- Request {
- builder,
- path: from,
- config: &self.config,
- payload: None,
- payload_sha256: None,
- use_session_creds: false,
- idempotent: false,
- }
+ self.request(Method::PUT, to)
+ .idempotent(true)
+ .header(©_SOURCE_HEADER, &source)
+ .headers(self.config.encryption_headers.clone().into())
+ .with_session_creds(false)
}
- pub async fn create_multipart(&self, location: &Path) ->
Result<MultipartId> {
- let credential = self.config.get_session_credential().await?;
- let url = format!("{}?uploads=", self.config.path_url(location),);
-
+ pub async fn create_multipart(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<MultipartId> {
let response = self
- .client
- .request(Method::POST, url)
- .headers(self.config.encryption_headers.clone().into())
- .with_aws_sigv4(credential.authorizer(), None)
- .retryable(&self.config.retry_config)
+ .request(Method::POST, location)
+ .query(&[("uploads", "")])
+ .with_encryption_headers()
+ .with_attributes(opts.attributes)
+ .with_tags(opts.tags)
.idempotent(true)
.send()
- .await
- .context(CreateMultipartRequestSnafu)?
+ .await?
.bytes()
.await
.context(CreateMultipartResponseBodySnafu)?;
@@ -568,7 +558,8 @@ impl S3Client {
let part = (part_idx + 1).to_string();
let response = self
- .put_request(path, data, Attributes::default(), false)
+ .request(Method::PUT, path)
+ .with_payload(data)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true)
.send()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 43bd38a6de2..7f1edf12faf 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -45,10 +45,12 @@ use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta,
- ObjectStore, Path, PutMode, PutOptions, PutPayload, PutResult, Result,
UploadPart,
+ ObjectStore, Path, PutMode, PutMultipartOpts, PutOptions, PutPayload,
PutResult, Result,
+ UploadPart,
};
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
+static COPY_SOURCE_HEADER: HeaderName =
HeaderName::from_static("x-amz-copy-source");
mod builder;
mod checksum;
@@ -156,12 +158,13 @@ impl ObjectStore for AmazonS3 {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let attrs = opts.attributes;
- let mut request = self.client.put_request(location, payload, attrs,
true);
- let tags = opts.tags.encoded();
- if !tags.is_empty() && !self.client.config.disable_tagging {
- request = request.header(&TAGS_HEADER, tags);
- }
+ let request = self
+ .client
+ .request(Method::PUT, location)
+ .with_payload(payload)
+ .with_attributes(opts.attributes)
+ .with_tags(opts.tags)
+ .with_encryption_headers();
match (opts.mode, &self.client.config.conditional_put) {
(PutMode::Overwrite, _) => request.idempotent(true).do_put().await,
@@ -204,8 +207,12 @@ impl ObjectStore for AmazonS3 {
}
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- let upload_id = self.client.create_multipart(location).await?;
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ let upload_id = self.client.create_multipart(location, opts).await?;
Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
@@ -223,7 +230,8 @@ impl ObjectStore for AmazonS3 {
}
async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete_request(location, &()).await
+ self.client.request(Method::DELETE, location).send().await?;
+ Ok(())
}
fn delete_stream<'a>(
@@ -351,15 +359,22 @@ impl MultipartUpload for S3MultiPartUpload {
async fn abort(&mut self) -> Result<()> {
self.state
.client
- .delete_request(&self.state.location, &[("uploadId",
&self.state.upload_id)])
- .await
+ .request(Method::DELETE, &self.state.location)
+ .query(&[("uploadId", &self.state.upload_id)])
+ .idempotent(true)
+ .send()
+ .await?;
+
+ Ok(())
}
}
#[async_trait]
impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
- self.client.create_multipart(path).await
+ self.client
+ .create_multipart(path, PutMultipartOpts::default())
+ .await
}
async fn put_part(
@@ -382,7 +397,12 @@ impl MultipartStore for AmazonS3 {
}
async fn abort_multipart(&self, path: &Path, id: &MultipartId) ->
Result<()> {
- self.client.delete_request(path, &[("uploadId", id)]).await
+ self.client
+ .request(Method::DELETE, path)
+ .query(&[("uploadId", id)])
+ .send()
+ .await?;
+ Ok(())
}
}
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 134609eb262..918fcd047ae 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -28,16 +28,14 @@ use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta,
Path, PutMode,
- PutOptions, PutPayload, PutResult, Result, RetryConfig,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
TagSet,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
-use hyper::header::CACHE_CONTROL;
use hyper::http::HeaderName;
-use reqwest::header::CONTENT_TYPE;
use reqwest::{
header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH},
Client as ReqwestClient, Method, RequestBuilder, Response,
@@ -50,6 +48,8 @@ use std::time::Duration;
use url::Url;
const VERSION_HEADER: &str = "x-ms-version-id";
+static MS_CACHE_CONTROL: HeaderName =
HeaderName::from_static("x-ms-blob-cache-control");
+static MS_CONTENT_TYPE: HeaderName =
HeaderName::from_static("x-ms-blob-content-type");
static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");
@@ -188,10 +188,39 @@ impl<'a> PutRequest<'a> {
Self { builder, ..self }
}
- fn set_idempotent(self, idempotent: bool) -> Self {
+ fn idempotent(self, idempotent: bool) -> Self {
Self { idempotent, ..self }
}
+ fn with_tags(mut self, tags: TagSet) -> Self {
+ let tags = tags.encoded();
+ if !tags.is_empty() && !self.config.disable_tagging {
+ self.builder = self.builder.header(&TAGS_HEADER, tags);
+ }
+ self
+ }
+
+ fn with_attributes(self, attributes: Attributes) -> Self {
+ let mut builder = self.builder;
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(&MS_CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(&MS_CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) =
self.config.client_options.get_content_type(self.path) {
+ builder = builder.header(&MS_CONTENT_TYPE, value);
+ }
+ }
+ Self { builder, ..self }
+ }
+
async fn send(self) -> Result<Response> {
let credential = self.config.get_credential().await?;
let response = self
@@ -233,32 +262,9 @@ impl AzureClient {
self.config.get_credential().await
}
- fn put_request<'a>(
- &'a self,
- path: &'a Path,
- payload: PutPayload,
- attributes: Attributes,
- ) -> PutRequest<'a> {
+ fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) ->
PutRequest<'a> {
let url = self.config.path_url(path);
-
- let mut builder = self.client.request(Method::PUT, url);
-
- let mut has_content_type = false;
- for (k, v) in &attributes {
- builder = match k {
- Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
- Attribute::ContentType => {
- has_content_type = true;
- builder.header(CONTENT_TYPE, v.as_ref())
- }
- };
- }
-
- if !has_content_type {
- if let Some(value) =
self.config.client_options.get_content_type(path) {
- builder = builder.header(CONTENT_TYPE, value);
- }
- }
+ let builder = self.client.request(Method::PUT, url);
PutRequest {
path,
@@ -276,10 +282,13 @@ impl AzureClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload, opts.attributes);
+ let builder = self
+ .put_request(path, payload)
+ .with_attributes(opts.attributes)
+ .with_tags(opts.tags);
let builder = match &opts.mode {
- PutMode::Overwrite => builder.set_idempotent(true),
+ PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
PutMode::Update(v) => {
let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
@@ -287,11 +296,6 @@ impl AzureClient {
}
};
- let builder = match (opts.tags.encoded(), self.config.disable_tagging)
{
- ("", _) | (_, true) => builder,
- (tags, false) => builder.header(&TAGS_HEADER, tags),
- };
-
let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
Ok(get_put_result(response.headers(),
VERSION_HEADER).context(MetadataSnafu)?)
}
@@ -306,9 +310,9 @@ impl AzureClient {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);
- self.put_request(path, payload, Attributes::default())
+ self.put_request(path, payload)
.query(&[("comp", "block"), ("blockid", &block_id)])
- .set_idempotent(true)
+ .idempotent(true)
.send()
.await?;
@@ -316,7 +320,12 @@ impl AzureClient {
}
/// PUT a block list
<https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list>
- pub async fn put_block_list(&self, path: &Path, parts: Vec<PartId>) ->
Result<PutResult> {
+ pub async fn put_block_list(
+ &self,
+ path: &Path,
+ parts: Vec<PartId>,
+ opts: PutMultipartOpts,
+ ) -> Result<PutResult> {
let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
@@ -324,9 +333,11 @@ impl AzureClient {
let payload = BlockList { blocks }.to_xml().into();
let response = self
- .put_request(path, payload, Attributes::default())
+ .put_request(path, payload)
+ .with_attributes(opts.attributes)
+ .with_tags(opts.tags)
.query(&[("comp", "blocklist")])
- .set_idempotent(true)
+ .idempotent(true)
.send()
.await?;
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 3bb57c45aa6..25ae6dda68a 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -27,7 +27,7 @@ use crate::{
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore,
- PutOptions, PutPayload, PutResult, Result, UploadPart,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use futures::stream::BoxStream;
@@ -95,9 +95,14 @@ impl ObjectStore for MicrosoftAzure {
self.client.put_blob(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(AzureMultiPartUpload {
part_idx: 0,
+ opts,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
@@ -196,6 +201,7 @@ impl Signer for MicrosoftAzure {
struct AzureMultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
+ opts: PutMultipartOpts,
}
#[derive(Debug)]
@@ -223,7 +229,7 @@ impl MultipartUpload for AzureMultiPartUpload {
self.state
.client
- .put_block_list(&self.state.location, parts)
+ .put_block_list(&self.state.location, parts, std::mem::take(&mut
self.opts))
.await
}
@@ -255,7 +261,9 @@ impl MultipartStore for MicrosoftAzure {
_: &MultipartId,
parts: Vec<PartId>,
) -> Result<PutResult> {
- self.client.put_block_list(path, parts).await
+ self.client
+ .put_block_list(path, parts, Default::default())
+ .await
}
async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 9abe49dbfce..a3bd7626787 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -29,7 +29,7 @@ use futures::StreamExt;
use crate::path::Path;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
- PutOptions, PutResult,
+ PutMultipartOpts, PutOptions, PutResult,
};
use crate::{PutPayload, Result};
@@ -75,6 +75,14 @@ impl ObjectStore for ChunkedStore {
self.inner.put_multipart(location).await
}
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ self.inner.put_multipart_opts(location, opts).await
+ }
+
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let r = self.inner.get_opts(location, options).await?;
let stream = match r.payload {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index 4ee03eaad62..9c39efe6b23 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,8 +29,8 @@ use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::util::hex_encode;
use crate::{
- Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
PutMode, PutOptions,
- PutPayload, PutResult, Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
PutMode,
+ PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
@@ -39,7 +39,7 @@ use bytes::Buf;
use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::header::HeaderName;
-use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
+use reqwest::{Client, Method, RequestBuilder, Response, StatusCode};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
@@ -66,14 +66,8 @@ enum Error {
path: String,
},
- #[snafu(display("Error performing delete request {}: {}", path, source))]
- DeleteRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error performing put request {}: {}", path, source))]
- PutRequest {
+ #[snafu(display("Error performing request {}: {}", path, source))]
+ Request {
source: crate::client::retry::Error,
path: String,
},
@@ -120,9 +114,9 @@ enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
- Error::GetRequest { source, path }
- | Error::DeleteRequest { source, path }
- | Error::PutRequest { source, path } => source.error(STORE, path),
+ Error::GetRequest { source, path } | Error::Request { source, path
} => {
+ source.error(STORE, path)
+ }
_ => Self::Generic {
store: STORE,
source: Box::new(err),
@@ -171,15 +165,15 @@ impl GoogleCloudStorageConfig {
}
/// A builder for a put request allowing customisation of the headers and
query string
-pub struct PutRequest<'a> {
+pub struct Request<'a> {
path: &'a Path,
config: &'a GoogleCloudStorageConfig,
- payload: PutPayload,
+ payload: Option<PutPayload>,
builder: RequestBuilder,
idempotent: bool,
}
-impl<'a> PutRequest<'a> {
+impl<'a> Request<'a> {
fn header(self, k: &HeaderName, v: &str) -> Self {
let builder = self.builder.header(k, v);
Self { builder, ..self }
@@ -190,26 +184,58 @@ impl<'a> PutRequest<'a> {
Self { builder, ..self }
}
- fn set_idempotent(mut self, idempotent: bool) -> Self {
+ fn idempotent(mut self, idempotent: bool) -> Self {
self.idempotent = idempotent;
self
}
- async fn send(self) -> Result<PutResult> {
+ fn with_attributes(self, attributes: Attributes) -> Self {
+ let mut builder = self.builder;
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ let value = self.config.client_options.get_content_type(self.path);
+ builder = builder.header(CONTENT_TYPE,
value.unwrap_or(DEFAULT_CONTENT_TYPE))
+ }
+ Self { builder, ..self }
+ }
+
+ fn with_payload(self, payload: PutPayload) -> Self {
+ let content_length = payload.content_length();
+ Self {
+ builder: self.builder.header(CONTENT_LENGTH, content_length),
+ payload: Some(payload),
+ ..self
+ }
+ }
+
+ async fn send(self) -> Result<Response> {
let credential = self.config.credentials.get_credential().await?;
- let response = self
+ let resp = self
.builder
.bearer_auth(&credential.bearer)
- .header(CONTENT_LENGTH, self.payload.content_length())
.retryable(&self.config.retry_config)
.idempotent(self.idempotent)
- .payload(Some(self.payload))
+ .payload(self.payload)
.send()
.await
- .context(PutRequestSnafu {
+ .context(RequestSnafu {
path: self.path.as_ref(),
})?;
+ Ok(resp)
+ }
+ async fn do_put(self) -> Result<PutResult> {
+ let response = self.send().await?;
Ok(get_put_result(response.headers(),
VERSION_HEADER).context(MetadataSnafu)?)
}
}
@@ -324,36 +350,13 @@ impl GoogleCloudStorageClient {
/// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
///
/// Returns the new ETag
- pub fn put_request<'a>(
- &'a self,
- path: &'a Path,
- payload: PutPayload,
- attributes: Attributes,
- ) -> PutRequest<'a> {
- let url = self.object_url(path);
- let mut builder = self.client.request(Method::PUT, url);
-
- let mut has_content_type = false;
- for (k, v) in &attributes {
- builder = match k {
- Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
- Attribute::ContentType => {
- has_content_type = true;
- builder.header(CONTENT_TYPE, v.as_ref())
- }
- };
- }
+ pub fn request<'a>(&'a self, method: Method, path: &'a Path) ->
Request<'a> {
+ let builder = self.client.request(method, self.object_url(path));
- if !has_content_type {
- let opts = &self.config.client_options;
- let value =
opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE);
- builder = builder.header(CONTENT_TYPE, value)
- }
-
- PutRequest {
+ Request {
path,
builder,
- payload,
+ payload: None,
config: &self.config,
idempotent: false,
}
@@ -365,10 +368,13 @@ impl GoogleCloudStorageClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload, opts.attributes);
+ let builder = self
+ .request(Method::PUT, path)
+ .with_payload(payload)
+ .with_attributes(opts.attributes);
let builder = match &opts.mode {
- PutMode::Overwrite => builder.set_idempotent(true),
+ PutMode::Overwrite => builder.idempotent(true),
PutMode::Create => builder.header(&VERSION_MATCH, "0"),
PutMode::Update(v) => {
let etag = v.version.as_ref().context(MissingVersionSnafu)?;
@@ -376,7 +382,7 @@ impl GoogleCloudStorageClient {
}
};
- match (opts.mode, builder.send().await) {
+ match (opts.mode, builder.do_put().await) {
(PutMode::Create, Err(crate::Error::Precondition { path, source
})) => {
Err(crate::Error::AlreadyExists { path, source })
}
@@ -399,10 +405,11 @@ impl GoogleCloudStorageClient {
("uploadId", upload_id),
];
let result = self
- .put_request(path, data, Attributes::new())
+ .request(Method::PUT, path)
+ .with_payload(data)
.query(query)
- .set_idempotent(true)
- .send()
+ .idempotent(true)
+ .do_put()
.await?;
Ok(PartId {
@@ -411,30 +418,18 @@ impl GoogleCloudStorageClient {
}
/// Initiate a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
- pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId>
{
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let content_type = self
- .config
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
+ pub async fn multipart_initiate(
+ &self,
+ path: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<MultipartId> {
let response = self
- .client
- .request(Method::POST, &url)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, "0")
+ .request(Method::POST, path)
+ .with_attributes(opts.attributes)
+ .header(&CONTENT_LENGTH, "0")
.query(&[("uploads", "")])
- .retryable(&self.config.retry_config)
- .idempotent(true)
.send()
- .await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?;
+ .await?;
let data = response.bytes().await.context(PutResponseBodySnafu)?;
let result: InitiateMultipartUploadResult =
@@ -451,12 +446,12 @@ impl GoogleCloudStorageClient {
self.client
.request(Method::DELETE, &url)
.bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, "application/octet-stream")
- .header(header::CONTENT_LENGTH, "0")
+ .header(CONTENT_TYPE, "application/octet-stream")
+ .header(CONTENT_LENGTH, "0")
.query(&[("uploadId", multipart_id)])
.send_retry(&self.config.retry_config)
.await
- .context(PutRequestSnafu {
+ .context(RequestSnafu {
path: path.as_ref(),
})?;
@@ -472,9 +467,9 @@ impl GoogleCloudStorageClient {
if completed_parts.is_empty() {
// GCS doesn't allow empty multipart uploads
let result = self
- .put_request(path, Default::default(), Attributes::new())
- .set_idempotent(true)
- .send()
+ .request(Method::PUT, path)
+ .idempotent(true)
+ .do_put()
.await?;
self.multipart_cleanup(path, multipart_id).await?;
return Ok(result);
@@ -523,18 +518,7 @@ impl GoogleCloudStorageClient {
/// Perform a delete request
<https://cloud.google.com/storage/docs/xml-api/delete-object>
pub async fn delete_request(&self, path: &Path) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let builder = self.client.request(Method::DELETE, url);
- builder
- .bearer_auth(&credential.bearer)
- .send_retry(&self.config.retry_config)
- .await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?;
-
+ self.request(Method::DELETE, path).send().await?;
Ok(())
}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index af6e671cbc3..0ec6e7e8264 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -42,7 +42,8 @@ use crate::gcp::credential::GCSAuthorizer;
use crate::signer::Signer;
use crate::{
multipart::PartId, path::Path, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload,
- ObjectMeta, ObjectStore, PutOptions, PutPayload, PutResult, Result,
UploadPart,
+ ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload,
PutResult, Result,
+ UploadPart,
};
use async_trait::async_trait;
use client::GoogleCloudStorageClient;
@@ -156,8 +157,12 @@ impl ObjectStore for GoogleCloudStorage {
self.client.put(location, payload, opts).await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
- let upload_id = self.client.multipart_initiate(location).await?;
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ let upload_id = self.client.multipart_initiate(location, opts).await?;
Ok(Box::new(GCSMultipartUpload {
part_idx: 0,
@@ -206,7 +211,9 @@ impl ObjectStore for GoogleCloudStorage {
#[async_trait]
impl MultipartStore for GoogleCloudStorage {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
- self.client.multipart_initiate(path).await
+ self.client
+ .multipart_initiate(path, PutMultipartOpts::default())
+ .await
}
async fn put_part(
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index d6ba4f4d913..404211e578d 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -44,7 +44,7 @@ use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result,
RetryConfig,
+ ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult,
Result, RetryConfig,
};
mod client;
@@ -118,7 +118,11 @@ impl ObjectStore for HttpStore {
})
}
- async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart_opts(
+ &self,
+ _location: &Path,
+ _opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
Err(crate::Error::NotImplemented)
}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b492d93894a..ad72bd29ef7 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -597,7 +597,20 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
///
/// Client should prefer [`ObjectStore::put`] for small payloads, as
streaming uploads
/// typically require multiple separate requests. See [`MultipartUpload`]
for more information
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>>;
+ async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ self.put_multipart_opts(location, PutMultipartOpts::default())
+ .await
+ }
+
+ /// Perform a multipart upload with options
+ ///
+ /// Client should prefer [`ObjectStore::put`] for small payloads, as
streaming uploads
+ /// typically require multiple separate requests. See [`MultipartUpload`]
for more information
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>>;
/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -785,6 +798,14 @@ macro_rules! as_ref_impl {
self.as_ref().put_multipart(location).await
}
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ self.as_ref().put_multipart_opts(location, opts).await
+ }
+
async fn get(&self, location: &Path) -> Result<GetResult> {
self.as_ref().get(location).await
}
@@ -1144,6 +1165,46 @@ impl From<TagSet> for PutOptions {
}
}
+impl From<Attributes> for PutOptions {
+ fn from(attributes: Attributes) -> Self {
+ Self {
+ attributes,
+ ..Default::default()
+ }
+ }
+}
+
+/// Options for [`ObjectStore::put_multipart_opts`]
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct PutMultipartOpts {
+ /// Provide a [`TagSet`] for this object
+ ///
+ /// Implementations that don't support object tagging should ignore this
+ pub tags: TagSet,
+ /// Provide a set of [`Attributes`]
+ ///
+ /// Implementations that don't support an attribute should return an error
+ pub attributes: Attributes,
+}
+
+impl From<TagSet> for PutMultipartOpts {
+ fn from(tags: TagSet) -> Self {
+ Self {
+ tags,
+ ..Default::default()
+ }
+ }
+}
+
+impl From<Attributes> for PutMultipartOpts {
+ fn from(attributes: Attributes) -> Self {
+ Self {
+ attributes,
+ ..Default::default()
+ }
+ }
+}
+
/// Result for a put request
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PutResult {
@@ -1688,10 +1749,7 @@ mod tests {
]);
let path = Path::from("attributes");
- let opts = PutOptions {
- attributes: attributes.clone(),
- ..Default::default()
- };
+ let opts = attributes.clone().into();
match integration.put_opts(&path, "foo".into(), opts).await {
Ok(_) => {
let r = integration.get(&path).await.unwrap();
@@ -1700,6 +1758,19 @@ mod tests {
Err(Error::NotImplemented) => {}
Err(e) => panic!("{e}"),
}
+
+ let opts = attributes.clone().into();
+ match integration.put_multipart_opts(&path, opts).await {
+ Ok(mut w) => {
+ w.put_part("foo".into()).await.unwrap();
+ w.complete().await.unwrap();
+
+ let r = integration.get(&path).await.unwrap();
+ assert_eq!(r.attributes, attributes);
+ }
+ Err(Error::NotImplemented) => {}
+ Err(e) => panic!("{e}"),
+ }
}
pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
@@ -2332,21 +2403,32 @@ mod tests {
let path = Path::from("tag_test");
storage
- .put_opts(&path, "test".into(), tag_set.into())
+ .put_opts(&path, "test".into(), tag_set.clone().into())
.await
.unwrap();
+ let multi_path = Path::from("tag_test_multi");
+ let mut write = storage
+ .put_multipart_opts(&multi_path, tag_set.into())
+ .await
+ .unwrap();
+
+ write.put_part("foo".into()).await.unwrap();
+ write.complete().await.unwrap();
+
// Write should always succeed, but certain configurations may simply
ignore tags
if !validate {
return;
}
- let resp = get_tags(path.clone()).await.unwrap();
- let body = resp.bytes().await.unwrap();
+ for path in [path, multi_path] {
+ let resp = get_tags(path.clone()).await.unwrap();
+ let body = resp.bytes().await.unwrap();
- let mut resp: Tagging =
quick_xml::de::from_reader(body.reader()).unwrap();
- resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
- assert_eq!(resp.list.tags, tags);
+ let mut resp: Tagging =
quick_xml::de::from_reader(body.reader()).unwrap();
+ resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
+ assert_eq!(resp.list.tags, tags);
+ }
}
async fn delete_fixtures(storage: &DynObjectStore) {
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index b94aa05b8b6..f3e1d4296fe 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,8 @@
use crate::{
BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, Path, PutOptions, PutPayload, PutResult, Result, StreamExt,
UploadPart,
+ ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult,
Result, StreamExt,
+ UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -91,6 +92,19 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
upload,
}))
}
+
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ let upload = self.inner.put_multipart_opts(location, opts).await?;
+ Ok(Box::new(LimitUpload {
+ semaphore: Arc::clone(&self.semaphore),
+ upload,
+ }))
+ }
+
async fn get(&self, location: &Path) -> Result<GetResult> {
let permit =
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
let r = self.inner.get(location).await?;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index a3695ad9174..8dec5bee0a2 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -39,7 +39,7 @@ use crate::{
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
Attributes, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
- ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result,
UploadPart,
+ ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult,
Result, UploadPart,
};
/// A specialized `Error` for filesystem object store-related errors
@@ -404,7 +404,15 @@ impl ObjectStore for LocalFileSystem {
.await
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ if !opts.attributes.is_empty() {
+ return Err(crate::Error::NotImplemented);
+ }
+
let dest = self.path_to_filesystem(location)?;
let (file, src) = new_staged_upload(&dest)?;
Ok(Box::new(LocalUpload::new(src, dest, file)))
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index e34b28fd27c..daf14e17510 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -31,8 +31,8 @@ use crate::multipart::{MultipartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult,
MultipartId,
- MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult,
Result,
- UpdateVersion, UploadPart,
+ MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts,
PutOptions, PutResult,
+ Result, UpdateVersion, UploadPart,
};
use crate::{GetOptions, PutPayload};
@@ -223,9 +223,14 @@ impl ObjectStore for InMemory {
})
}
- async fn put_multipart(&self, location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(InMemoryUpload {
location: location.clone(),
+ attributes: opts.attributes,
parts: vec![],
storage: Arc::clone(&self.storage),
}))
@@ -487,6 +492,7 @@ impl InMemory {
#[derive(Debug)]
struct InMemoryUpload {
location: Path,
+ attributes: Attributes,
parts: Vec<PutPayload>,
storage: Arc<RwLock<Storage>>,
}
@@ -503,10 +509,11 @@ impl MultipartUpload for InMemoryUpload {
let mut buf = Vec::with_capacity(cap);
let parts = self.parts.iter().flatten();
parts.for_each(|x| buf.extend_from_slice(x));
- let etag = self
- .storage
- .write()
- .insert(&self.location, buf.into(), Attributes::new());
+ let etag = self.storage.write().insert(
+ &self.location,
+ buf.into(),
+ std::mem::take(&mut self.attributes),
+ );
Ok(PutResult {
e_tag: Some(etag.to_string()),
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 1d1ffeed8c6..7c9ea5804c3 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -22,8 +22,8 @@ use std::ops::Range;
use crate::path::Path;
use crate::{
- GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutOptions,
- PutPayload, PutResult, Result,
+ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOpts,
+ PutOptions, PutPayload, PutResult, Result,
};
#[doc(hidden)]
@@ -100,6 +100,15 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.put_multipart(&full_path).await
}
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ let full_path = self.full_path(location);
+ self.inner.put_multipart_opts(&full_path, opts).await
+ }
+
async fn get(&self, location: &Path) -> Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get(&full_path).await
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index d089784668e..38b6d7c3bf4 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -23,7 +23,7 @@ use std::{convert::TryInto, sync::Arc};
use crate::multipart::{MultipartStore, PartId};
use crate::{
path::Path, GetResult, GetResultPayload, ListResult, MultipartId,
MultipartUpload, ObjectMeta,
- ObjectStore, PutOptions, PutPayload, PutResult, Result,
+ ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
};
use crate::{GetOptions, UploadPart};
use async_trait::async_trait;
@@ -171,6 +171,18 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
}))
}
+ async fn put_multipart_opts(
+ &self,
+ location: &Path,
+ opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
+ let upload = self.inner.put_multipart_opts(location, opts).await?;
+ Ok(Box::new(ThrottledUpload {
+ upload,
+ sleep: self.config().wait_put_per_call,
+ }))
+ }
+
async fn get(&self, location: &Path) -> Result<GetResult> {
sleep(self.config().wait_get_per_call).await;
diff --git a/object_store/tests/get_range_file.rs
b/object_store/tests/get_range_file.rs
index 59c59340045..c5550ac2172 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -46,7 +46,11 @@ impl ObjectStore for MyStore {
self.0.put_opts(location, payload, opts).await
}
- async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn
MultipartUpload>> {
+ async fn put_multipart_opts(
+ &self,
+ _location: &Path,
+ _opts: PutMultipartOpts,
+ ) -> Result<Box<dyn MultipartUpload>> {
todo!()
}