This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 084892f0f feat(core): Implement write if not exists for
azblob,azdls,gcs,oss,cos (#5321)
084892f0f is described below
commit 084892f0f36ef3af7c3178e9df795ee9c6f038a2
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 14 22:37:58 2024 +0800
feat(core): Implement write if not exists for azblob,azdls,gcs,oss,cos
(#5321)
---
core/src/services/azblob/backend.rs | 2 ++
core/src/services/azblob/core.rs | 42 +++++++++++++++++++++++--------------
core/src/services/azblob/error.rs | 2 +-
core/src/services/azdls/backend.rs | 3 +++
core/src/services/azdls/core.rs | 10 ++++++++-
core/src/services/azdls/error.rs | 4 +++-
core/src/services/cos/backend.rs | 2 ++
core/src/services/cos/core.rs | 12 +++++++++++
core/src/services/cos/error.rs | 2 +-
core/src/services/gcs/backend.rs | 2 ++
core/src/services/gcs/core.rs | 7 +++++++
core/src/services/oss/backend.rs | 3 +++
core/src/services/oss/core.rs | 17 +++++++++++++++
core/src/services/oss/error.rs | 2 +-
core/src/services/s3/core.rs | 8 +++----
15 files changed, 93 insertions(+), 25 deletions(-)
diff --git a/core/src/services/azblob/backend.rs
b/core/src/services/azblob/backend.rs
index 638513249..820cfcec0 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -518,6 +518,8 @@ impl Access for AzblobBackend {
write_can_multi: true,
write_with_cache_control: true,
write_with_content_type: true,
+ write_with_if_not_exists: true,
+ write_with_if_none_match: true,
write_with_user_metadata: true,
delete: true,
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
index 867f07651..409342242 100644
--- a/core/src/services/azblob/core.rs
+++ b/core/src/services/azblob/core.rs
@@ -245,18 +245,10 @@ impl AzblobCore {
let mut req = Request::put(&url);
- if let Some(user_metadata) = args.user_metadata() {
- for (key, value) in user_metadata {
- req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
- }
- }
-
- // Set SSE headers.
- req = self.insert_sse_headers(req);
-
- if let Some(cache_control) = args.cache_control() {
- req = req.header(constants::X_MS_BLOB_CACHE_CONTROL,
cache_control);
- }
+ req = req.header(
+ HeaderName::from_static(constants::X_MS_BLOB_TYPE),
+ "BlockBlob",
+ );
if let Some(size) = size {
req = req.header(CONTENT_LENGTH, size)
@@ -266,10 +258,28 @@ impl AzblobCore {
req = req.header(CONTENT_TYPE, ty)
}
- req = req.header(
- HeaderName::from_static(constants::X_MS_BLOB_TYPE),
- "BlockBlob",
- );
+ // Specify the wildcard character (*) to perform the operation only if
+ // the resource does not exist, and fail the operation if it does
exist.
+ if args.if_not_exists() {
+ req = req.header(IF_NONE_MATCH, "*");
+ }
+
+ if let Some(v) = args.if_none_match() {
+ req = req.header(IF_NONE_MATCH, v);
+ }
+
+ if let Some(cache_control) = args.cache_control() {
+ req = req.header(constants::X_MS_BLOB_CACHE_CONTROL,
cache_control);
+ }
+
+ // Set SSE headers.
+ req = self.insert_sse_headers(req);
+
+ if let Some(user_metadata) = args.user_metadata() {
+ for (key, value) in user_metadata {
+ req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
+ }
+ }
// Set body
let req = req.body(body).map_err(new_request_build_error)?;
diff --git a/core/src/services/azblob/error.rs
b/core/src/services/azblob/error.rs
index a8d596952..1ea38ad87 100644
--- a/core/src/services/azblob/error.rs
+++ b/core/src/services/azblob/error.rs
@@ -66,7 +66,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED |
StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/azdls/backend.rs
b/core/src/services/azdls/backend.rs
index 34d7125d6..534c7c190 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -233,6 +233,9 @@ impl Access for AzdlsBackend {
write: true,
write_can_append: true,
+ write_with_if_none_match: true,
+ write_with_if_not_exists: true,
+
create_dir: true,
delete: true,
rename: true,
diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs
index 714a5adc7..90d9f5533 100644
--- a/core/src/services/azdls/core.rs
+++ b/core/src/services/azdls/core.rs
@@ -20,9 +20,9 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::fmt::Write;
-use http::header::CONTENT_DISPOSITION;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
+use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
use http::HeaderName;
use http::HeaderValue;
use http::Request;
@@ -153,6 +153,14 @@ impl AzdlsCore {
req = req.header(CONTENT_DISPOSITION, pos)
}
+ if args.if_not_exists() {
+ req = req.header(IF_NONE_MATCH, "*")
+ }
+
+ if let Some(v) = args.if_none_match() {
+ req = req.header(IF_NONE_MATCH, v)
+ }
+
// Set body
let req = req.body(body).map_err(new_request_build_error)?;
diff --git a/core/src/services/azdls/error.rs b/core/src/services/azdls/error.rs
index de50c5638..95e09bf88 100644
--- a/core/src/services/azdls/error.rs
+++ b/core/src/services/azdls/error.rs
@@ -66,7 +66,9 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch,
false),
+ StatusCode::PRECONDITION_FAILED | StatusCode::CONFLICT => {
+ (ErrorKind::ConditionNotMatch, false)
+ }
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 6065dca95..df297d196 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -258,6 +258,8 @@ impl Access for CosBackend {
write_with_content_type: true,
write_with_cache_control: true,
write_with_content_disposition: true,
+ // TODO: set this to false while version has been enabled.
+ write_with_if_not_exists: true,
// The min multipart size of COS is 1 MiB.
//
// ref:
<https://www.tencentcloud.com/document/product/436/14112>
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index f7dbdef60..4d28b2688 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -172,6 +172,18 @@ impl CosCore {
req = req.header(CONTENT_TYPE, mime)
}
+ // For a bucket which has never enabled versioning, you may use it to
+ // specify whether to prohibit overwriting the object with the same
name
+ // when uploading the object:
+ //
+ // When the x-cos-forbid-overwrite is specified as true, overwriting
the object
+ // with the same name will be prohibited.
+ //
+ // ref: https://www.tencentcloud.com/document/product/436/7749
+ if args.if_not_exists() {
+ req = req.header("x-cos-forbid-overwrite", "true")
+ }
+
let req = req.body(body).map_err(new_request_build_error)?;
Ok(req)
diff --git a/core/src/services/cos/error.rs b/core/src/services/cos/error.rs
index 65639e720..df5f95df5 100644
--- a/core/src/services/cos/error.rs
+++ b/core/src/services/cos/error.rs
@@ -43,7 +43,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED |
StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 7ad6cfc17..5c35abf66 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -365,6 +365,8 @@ impl Access for GcsBackend {
write_can_multi: true,
write_with_content_type: true,
write_with_user_metadata: true,
+ write_with_if_not_exists: true,
+
// The min multipart size of Gcs is 5 MiB.
//
// ref:
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 86d73bc2e..fe6a242f2 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -270,6 +270,13 @@ impl GcsCore {
write!(&mut url, "&predefinedAcl={}", acl).unwrap();
}
+ // Makes the operation conditional on whether the object's current
generation
+ // matches the given value. Setting to 0 makes the operation succeed
only if
+ // there are no live versions of the object.
+ if op.if_not_exists() {
+ write!(&mut url, "&ifGenerationMatch=0").unwrap();
+ }
+
let mut req = Request::post(&url);
req = req.header(CONTENT_LENGTH, size.unwrap_or_default());
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index a627e1d4a..1ed309e4e 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -445,6 +445,9 @@ impl Access for OssBackend {
write_with_cache_control: true,
write_with_content_type: true,
write_with_content_disposition: true,
+ // TODO: set this to false while version has been enabled.
+ write_with_if_not_exists: true,
+
// The min multipart size of OSS is 100 KiB.
//
// ref:
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 2a471ef8c..480ab1398 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -41,6 +41,7 @@ use serde::Deserialize;
use serde::Serialize;
use crate::raw::*;
+use crate::services::oss::core::constants::X_OSS_FORBID_OVERWRITE;
use crate::*;
pub mod constants {
@@ -48,6 +49,8 @@ pub mod constants {
pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str =
"x-oss-server-side-encryption-key-id";
+ pub const X_OSS_FORBID_OVERWRITE: &str = "x-oss-forbid-overwrite";
+
pub const RESPONSE_CONTENT_DISPOSITION: &str =
"response-content-disposition";
pub const OSS_QUERY_VERSION_ID: &str = "versionId";
@@ -181,6 +184,20 @@ impl OssCore {
req = req.header(CACHE_CONTROL, cache_control);
}
+ // TODO: disable if not exists while version has been enabled.
+ //
+ // Specifies whether the object that is uploaded by calling the
PutObject operation
+ // overwrites the existing object that has the same name. When
versioning is enabled
+ // or suspended for the bucket to which you want to upload the object,
the
+ // x-oss-forbid-overwrite header does not take effect. In this case,
the object that
+ // is uploaded by calling the PutObject operation overwrites the
existing object that
+ // has the same name.
+ //
+ // ref:
https://www.alibabacloud.com/help/en/oss/developer-reference/putobject?spm=a2c63.p38356.0.0.39ef75e93o0Xtz
+ if args.if_not_exists() {
+ req = req.header(X_OSS_FORBID_OVERWRITE, "true");
+ }
+
if let Some(user_metadata) = args.user_metadata() {
for (key, value) in user_metadata {
// before insert user defined metadata header, add prefix to
the header name
diff --git a/core/src/services/oss/error.rs b/core/src/services/oss/error.rs
index 15eb26d43..3cfeb66b0 100644
--- a/core/src/services/oss/error.rs
+++ b/core/src/services/oss/error.rs
@@ -42,7 +42,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
let (kind, retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+ StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED |
StatusCode::CONFLICT => {
(ErrorKind::ConditionNotMatch, false)
}
StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index bc93b46e3..745f198f6 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -455,6 +455,10 @@ impl S3Core {
req = req.header(CACHE_CONTROL, cache_control)
}
+ if args.if_not_exists() {
+ req = req.header(IF_NONE_MATCH, "*");
+ }
+
// Set storage class header
if let Some(v) = &self.default_storage_class {
req =
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
@@ -476,10 +480,6 @@ impl S3Core {
req = self.insert_checksum_header(req, &checksum);
}
- if args.if_not_exists() {
- req = req.header(IF_NONE_MATCH, "*");
- }
-
// Set body
let req = req.body(body).map_err(new_request_build_error)?;