This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 1f466dc62c Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239)
1f466dc62c is described below
commit 1f466dc62c9ad2fbea206b2bfdec40ca783a9c33
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Aug 7 15:44:40 2023 +0100
Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239)
* Support copy_if_not_exists for Cloudflare R2 (#4190)
* Add tests
---
object_store/src/aws/client.rs | 48 +++++++++++++++++++++++++-----
object_store/src/aws/copy.rs | 66 ++++++++++++++++++++++++++++++++++++++++++
object_store/src/aws/mod.rs | 44 +++++++++++++++++++++++-----
3 files changed, 144 insertions(+), 14 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 188897620b..1c35586f8b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -17,7 +17,9 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt};
-use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
+use crate::aws::{
+ AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
+};
use crate::client::get::GetClient;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
@@ -37,7 +39,7 @@ use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
use reqwest::{
header::{CONTENT_LENGTH, CONTENT_TYPE},
- Client as ReqwestClient, Method, Response,
+ Client as ReqwestClient, Method, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
@@ -206,6 +208,7 @@ pub struct S3Config {
pub client_options: ClientOptions,
pub sign_payload: bool,
pub checksum: Option<Checksum>,
+ pub copy_if_not_exists: Option<S3CopyIfNotExists>,
}
impl S3Config {
@@ -424,14 +427,37 @@ impl S3Client {
}
/// Make an S3 Copy request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
- pub async fn copy_request(&self, from: &Path, to: &Path) -> Result<()> {
+ pub async fn copy_request(
+ &self,
+ from: &Path,
+ to: &Path,
+ overwrite: bool,
+ ) -> Result<()> {
let credential = self.get_credential().await?;
let url = self.config.path_url(to);
let source = format!("{}/{}", self.config.bucket, encode_path(from));
- self.client
+ let mut builder = self
+ .client
.request(Method::PUT, url)
- .header("x-amz-copy-source", source)
+ .header("x-amz-copy-source", source);
+
+ if !overwrite {
+ match &self.config.copy_if_not_exists {
+ Some(S3CopyIfNotExists::Header(k, v)) => {
+ builder = builder.header(k, v);
+ }
+ None => {
+ return Err(crate::Error::NotSupported {
+ source: "S3 does not support copy-if-not-exists"
+ .to_string()
+ .into(),
+ })
+ }
+ }
+ }
+
+ builder
.with_aws_sigv4(
credential.as_ref(),
&self.config.region,
@@ -441,8 +467,16 @@ impl S3Client {
)
.send_retry(&self.config.retry_config)
.await
- .context(CopyRequestSnafu {
- path: from.as_ref(),
+ .map_err(|source| match source.status() {
+ Some(StatusCode::PRECONDITION_FAILED) =>
crate::Error::AlreadyExists {
+ source: Box::new(source),
+ path: to.to_string(),
+ },
+ _ => Error::CopyRequest {
+ source,
+ path: from.to_string(),
+ }
+ .into(),
})?;
Ok(())
diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/copy.rs
new file mode 100644
index 0000000000..6b96f992ce
--- /dev/null
+++ b/object_store/src/aws/copy.rs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::config::Parse;
+
+/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for
[`AmazonS3`]
+#[derive(Debug, Clone)]
+#[non_exhaustive]
+pub enum S3CopyIfNotExists {
+ /// Some S3-compatible stores, such as Cloudflare R2, support copy if not
exists
+ /// semantics through custom headers.
+ ///
+ /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy
operation
+ /// with the provided header pair, and expect the store to fail with `412
Precondition Failed`
+ /// if the destination file already exists
+ ///
+ /// Encoded as `header:<HEADER_NAME>:<HEADER_VALUE>` ignoring whitespace
+ ///
+ /// For example `header: cf-copy-destination-if-none-match: *`, would set
+ /// the header `cf-copy-destination-if-none-match` to `*`
+ Header(String, String),
+}
+
+impl std::fmt::Display for S3CopyIfNotExists {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Header(k, v) => write!(f, "header: {}: {}", k, v),
+ }
+ }
+}
+
+impl S3CopyIfNotExists {
+ fn from_str(s: &str) -> Option<Self> {
+ let (variant, value) = s.split_once(':')?;
+ match variant.trim() {
+ "header" => {
+ let (k, v) = value.split_once(':')?;
+ Some(Self::Header(k.trim().to_string(), v.trim().to_string()))
+ }
+ _ => None,
+ }
+ }
+}
+
+impl Parse for S3CopyIfNotExists {
+ fn parse(v: &str) -> crate::Result<Self> {
+ Self::from_str(v).ok_or_else(|| crate::Error::Generic {
+ store: "Config",
+ source: format!("Failed to parse \"{v}\" as
S3CopyIfNotExists").into(),
+ })
+ }
+}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index f6066d45a7..7e16b5a1ba 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -44,7 +44,6 @@ use tokio::io::AsyncWrite;
use tracing::info;
use url::Url;
-pub use crate::aws::checksum::Checksum;
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{
InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
@@ -64,8 +63,12 @@ use crate::{
mod checksum;
mod client;
+mod copy;
mod credential;
+pub use checksum::Checksum;
+pub use copy::S3CopyIfNotExists;
+
//
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
@@ -292,12 +295,11 @@ impl ObjectStore for AmazonS3 {
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to).await
+ self.client.copy_request(from, to, true).await
}
- async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) ->
Result<()> {
- // Will need dynamodb_lock
- Err(crate::Error::NotImplemented)
+ async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+ self.client.copy_request(from, to, false).await
}
}
@@ -390,6 +392,8 @@ pub struct AmazonS3Builder {
client_options: ClientOptions,
/// Credentials
credentials: Option<AwsCredentialProvider>,
+ /// Copy if not exists
+ copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
}
/// Configuration keys for [`AmazonS3Builder`]
@@ -521,6 +525,11 @@ pub enum AmazonS3ConfigKey {
///
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
ContainerCredentialsRelativeUri,
+ /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+ ///
+ /// See [`S3CopyIfNotExists`]
+ CopyIfNotExists,
+
/// Client options
Client(ClientConfigKey),
}
@@ -543,6 +552,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::ContainerCredentialsRelativeUri => {
"aws_container_credentials_relative_uri"
}
+ Self::CopyIfNotExists => "copy_if_not_exists",
Self::Client(opt) => opt.as_ref(),
}
}
@@ -576,6 +586,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_container_credentials_relative_uri" => {
Ok(Self::ContainerCredentialsRelativeUri)
}
+ "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
_ => match s.parse() {
@@ -686,6 +697,9 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::Client(key) => {
self.client_options = self.client_options.with_config(key,
value)
}
+ AmazonS3ConfigKey::CopyIfNotExists => {
+ self.copy_if_not_exists =
Some(ConfigValue::Deferred(value.into()))
+ }
};
self
}
@@ -753,6 +767,9 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
self.container_credentials_relative_uri.clone()
}
+ AmazonS3ConfigKey::CopyIfNotExists => {
+ self.copy_if_not_exists.as_ref().map(ToString::to_string)
+ }
}
}
@@ -935,6 +952,12 @@ impl AmazonS3Builder {
self
}
+ /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+ pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) ->
Self {
+ self.copy_if_not_exists = Some(config.into());
+ self
+ }
+
/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
@@ -945,6 +968,7 @@ impl AmazonS3Builder {
let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
let region = self.region.context(MissingRegionSnafu)?;
let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
+ let copy_if_not_exists = self.copy_if_not_exists.map(|x|
x.get()).transpose()?;
let credentials = if let Some(credentials) = self.credentials {
credentials
@@ -1050,6 +1074,7 @@ impl AmazonS3Builder {
client_options: self.client_options,
sign_payload: !self.unsigned_payload.get()?,
checksum,
+ copy_if_not_exists,
};
let client = Arc::new(S3Client::new(config)?);
@@ -1062,8 +1087,9 @@ impl AmazonS3Builder {
mod tests {
use super::*;
use crate::tests::{
- get_nonexistent_object, get_opts, list_uses_directories_correctly,
- list_with_delimiter, put_get_delete_list_opts, rename_and_copy,
stream_get,
+ copy_if_not_exists, get_nonexistent_object, get_opts,
+ list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts,
+ rename_and_copy, stream_get,
};
use bytes::Bytes;
use std::collections::HashMap;
@@ -1164,6 +1190,7 @@ mod tests {
let config = AmazonS3Builder::from_env();
let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
+ let test_not_exists = config.copy_if_not_exists.is_some();
let integration = config.build().unwrap();
// Localstack doesn't support listing with spaces
https://github.com/localstack/localstack/issues/6328
@@ -1173,6 +1200,9 @@ mod tests {
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
stream_get(&integration).await;
+ if test_not_exists {
+ copy_if_not_exists(&integration).await;
+ }
// run integration test with unsigned payload enabled
let config = AmazonS3Builder::from_env().with_unsigned_payload(true);