This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 084892f0f feat(core): Implement write if not exists for 
azblob,azdls,gcs,oss,cos (#5321)
084892f0f is described below

commit 084892f0f36ef3af7c3178e9df795ee9c6f038a2
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 14 22:37:58 2024 +0800

    feat(core): Implement write if not exists for azblob,azdls,gcs,oss,cos 
(#5321)
---
 core/src/services/azblob/backend.rs |  2 ++
 core/src/services/azblob/core.rs    | 42 +++++++++++++++++++++++--------------
 core/src/services/azblob/error.rs   |  2 +-
 core/src/services/azdls/backend.rs  |  3 +++
 core/src/services/azdls/core.rs     | 10 ++++++++-
 core/src/services/azdls/error.rs    |  4 +++-
 core/src/services/cos/backend.rs    |  2 ++
 core/src/services/cos/core.rs       | 12 +++++++++++
 core/src/services/cos/error.rs      |  2 +-
 core/src/services/gcs/backend.rs    |  2 ++
 core/src/services/gcs/core.rs       |  7 +++++++
 core/src/services/oss/backend.rs    |  3 +++
 core/src/services/oss/core.rs       | 17 +++++++++++++++
 core/src/services/oss/error.rs      |  2 +-
 core/src/services/s3/core.rs        |  8 +++----
 15 files changed, 93 insertions(+), 25 deletions(-)

diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 638513249..820cfcec0 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -518,6 +518,8 @@ impl Access for AzblobBackend {
                 write_can_multi: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
+                write_with_if_not_exists: true,
+                write_with_if_none_match: true,
                 write_with_user_metadata: true,
 
                 delete: true,
diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs
index 867f07651..409342242 100644
--- a/core/src/services/azblob/core.rs
+++ b/core/src/services/azblob/core.rs
@@ -245,18 +245,10 @@ impl AzblobCore {
 
         let mut req = Request::put(&url);
 
-        if let Some(user_metadata) = args.user_metadata() {
-            for (key, value) in user_metadata {
-                req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
-            }
-        }
-
-        // Set SSE headers.
-        req = self.insert_sse_headers(req);
-
-        if let Some(cache_control) = args.cache_control() {
-            req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, 
cache_control);
-        }
+        req = req.header(
+            HeaderName::from_static(constants::X_MS_BLOB_TYPE),
+            "BlockBlob",
+        );
 
         if let Some(size) = size {
             req = req.header(CONTENT_LENGTH, size)
@@ -266,10 +258,28 @@ impl AzblobCore {
             req = req.header(CONTENT_TYPE, ty)
         }
 
-        req = req.header(
-            HeaderName::from_static(constants::X_MS_BLOB_TYPE),
-            "BlockBlob",
-        );
+        // Specify the wildcard character (*) to perform the operation only if
+        // the resource does not exist, and fail the operation if it does 
exist.
+        if args.if_not_exists() {
+            req = req.header(IF_NONE_MATCH, "*");
+        }
+
+        if let Some(v) = args.if_none_match() {
+            req = req.header(IF_NONE_MATCH, v);
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(constants::X_MS_BLOB_CACHE_CONTROL, 
cache_control);
+        }
+
+        // Set SSE headers.
+        req = self.insert_sse_headers(req);
+
+        if let Some(user_metadata) = args.user_metadata() {
+            for (key, value) in user_metadata {
+                req = req.header(format!("{X_MS_META_PREFIX}{key}"), value)
+            }
+        }
 
         // Set body
         let req = req.body(body).map_err(new_request_build_error)?;
diff --git a/core/src/services/azblob/error.rs 
b/core/src/services/azblob/error.rs
index a8d596952..1ea38ad87 100644
--- a/core/src/services/azblob/error.rs
+++ b/core/src/services/azblob/error.rs
@@ -66,7 +66,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
     let (kind, retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | 
StatusCode::CONFLICT => {
             (ErrorKind::ConditionNotMatch, false)
         }
         StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/azdls/backend.rs 
b/core/src/services/azdls/backend.rs
index 34d7125d6..534c7c190 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -233,6 +233,9 @@ impl Access for AzdlsBackend {
 
                 write: true,
                 write_can_append: true,
+                write_with_if_none_match: true,
+                write_with_if_not_exists: true,
+
                 create_dir: true,
                 delete: true,
                 rename: true,
diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs
index 714a5adc7..90d9f5533 100644
--- a/core/src/services/azdls/core.rs
+++ b/core/src/services/azdls/core.rs
@@ -20,9 +20,9 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::fmt::Write;
 
-use http::header::CONTENT_DISPOSITION;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use http::header::{CONTENT_DISPOSITION, IF_NONE_MATCH};
 use http::HeaderName;
 use http::HeaderValue;
 use http::Request;
@@ -153,6 +153,14 @@ impl AzdlsCore {
             req = req.header(CONTENT_DISPOSITION, pos)
         }
 
+        if args.if_not_exists() {
+            req = req.header(IF_NONE_MATCH, "*")
+        }
+
+        if let Some(v) = args.if_none_match() {
+            req = req.header(IF_NONE_MATCH, v)
+        }
+
         // Set body
         let req = req.body(body).map_err(new_request_build_error)?;
 
diff --git a/core/src/services/azdls/error.rs b/core/src/services/azdls/error.rs
index de50c5638..95e09bf88 100644
--- a/core/src/services/azdls/error.rs
+++ b/core/src/services/azdls/error.rs
@@ -66,7 +66,9 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
     let (kind, retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED => (ErrorKind::ConditionNotMatch, 
false),
+        StatusCode::PRECONDITION_FAILED | StatusCode::CONFLICT => {
+            (ErrorKind::ConditionNotMatch, false)
+        }
         StatusCode::INTERNAL_SERVER_ERROR
         | StatusCode::BAD_GATEWAY
         | StatusCode::SERVICE_UNAVAILABLE
diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs
index 6065dca95..df297d196 100644
--- a/core/src/services/cos/backend.rs
+++ b/core/src/services/cos/backend.rs
@@ -258,6 +258,8 @@ impl Access for CosBackend {
                 write_with_content_type: true,
                 write_with_cache_control: true,
                 write_with_content_disposition: true,
+                // TODO: set this to false while version has been enabled.
+                write_with_if_not_exists: true,
                 // The min multipart size of COS is 1 MiB.
                 //
                 // ref: 
<https://www.tencentcloud.com/document/product/436/14112>
diff --git a/core/src/services/cos/core.rs b/core/src/services/cos/core.rs
index f7dbdef60..4d28b2688 100644
--- a/core/src/services/cos/core.rs
+++ b/core/src/services/cos/core.rs
@@ -172,6 +172,18 @@ impl CosCore {
             req = req.header(CONTENT_TYPE, mime)
         }
 
+        // For a bucket which has never enabled versioning, you may use it to
+        // specify whether to prohibit overwriting the object with the same 
name
+        // when uploading the object:
+        //
+        // When the x-cos-forbid-overwrite is specified as true, overwriting 
the object
+        // with the same name will be prohibited.
+        //
+        // ref: https://www.tencentcloud.com/document/product/436/7749
+        if args.if_not_exists() {
+            req = req.header("x-cos-forbid-overwrite", "true")
+        }
+
         let req = req.body(body).map_err(new_request_build_error)?;
 
         Ok(req)
diff --git a/core/src/services/cos/error.rs b/core/src/services/cos/error.rs
index 65639e720..df5f95df5 100644
--- a/core/src/services/cos/error.rs
+++ b/core/src/services/cos/error.rs
@@ -43,7 +43,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
     let (kind, retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | 
StatusCode::CONFLICT => {
             (ErrorKind::ConditionNotMatch, false)
         }
         StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index 7ad6cfc17..5c35abf66 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -365,6 +365,8 @@ impl Access for GcsBackend {
                 write_can_multi: true,
                 write_with_content_type: true,
                 write_with_user_metadata: true,
+                write_with_if_not_exists: true,
+
                 // The min multipart size of Gcs is 5 MiB.
                 //
                 // ref: 
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 86d73bc2e..fe6a242f2 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -270,6 +270,13 @@ impl GcsCore {
             write!(&mut url, "&predefinedAcl={}", acl).unwrap();
         }
 
+        // Makes the operation conditional on whether the object's current 
generation
+        // matches the given value. Setting to 0 makes the operation succeed 
only if
+        // there are no live versions of the object.
+        if op.if_not_exists() {
+            write!(&mut url, "&ifGenerationMatch=0").unwrap();
+        }
+
         let mut req = Request::post(&url);
 
         req = req.header(CONTENT_LENGTH, size.unwrap_or_default());
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index a627e1d4a..1ed309e4e 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -445,6 +445,9 @@ impl Access for OssBackend {
                 write_with_cache_control: true,
                 write_with_content_type: true,
                 write_with_content_disposition: true,
+                // TODO: set this to false while version has been enabled.
+                write_with_if_not_exists: true,
+
                 // The min multipart size of OSS is 100 KiB.
                 //
                 // ref: 
<https://www.alibabacloud.com/help/en/oss/user-guide/multipart-upload-12>
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 2a471ef8c..480ab1398 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -41,6 +41,7 @@ use serde::Deserialize;
 use serde::Serialize;
 
 use crate::raw::*;
+use crate::services::oss::core::constants::X_OSS_FORBID_OVERWRITE;
 use crate::*;
 
 pub mod constants {
@@ -48,6 +49,8 @@ pub mod constants {
 
     pub const X_OSS_SERVER_SIDE_ENCRYPTION_KEY_ID: &str = 
"x-oss-server-side-encryption-key-id";
 
+    pub const X_OSS_FORBID_OVERWRITE: &str = "x-oss-forbid-overwrite";
+
     pub const RESPONSE_CONTENT_DISPOSITION: &str = 
"response-content-disposition";
 
     pub const OSS_QUERY_VERSION_ID: &str = "versionId";
@@ -181,6 +184,20 @@ impl OssCore {
             req = req.header(CACHE_CONTROL, cache_control);
         }
 
+        // TODO: disable if not exists while version has been enabled.
+        //
+        // Specifies whether the object that is uploaded by calling the 
PutObject operation
+        // overwrites the existing object that has the same name. When 
versioning is enabled
+        // or suspended for the bucket to which you want to upload the object, 
the
+        // x-oss-forbid-overwrite header does not take effect. In this case, 
the object that
+        // is uploaded by calling the PutObject operation overwrites the 
existing object that
+        // has the same name.
+        //
+        // ref: 
https://www.alibabacloud.com/help/en/oss/developer-reference/putobject?spm=a2c63.p38356.0.0.39ef75e93o0Xtz
+        if args.if_not_exists() {
+            req = req.header(X_OSS_FORBID_OVERWRITE, "true");
+        }
+
         if let Some(user_metadata) = args.user_metadata() {
             for (key, value) in user_metadata {
                 // before insert user defined metadata header, add prefix to 
the header name
diff --git a/core/src/services/oss/error.rs b/core/src/services/oss/error.rs
index 15eb26d43..3cfeb66b0 100644
--- a/core/src/services/oss/error.rs
+++ b/core/src/services/oss/error.rs
@@ -42,7 +42,7 @@ pub(super) fn parse_error(resp: Response<Buffer>) -> Error {
     let (kind, retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
+        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED | 
StatusCode::CONFLICT => {
             (ErrorKind::ConditionNotMatch, false)
         }
         StatusCode::INTERNAL_SERVER_ERROR
diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs
index bc93b46e3..745f198f6 100644
--- a/core/src/services/s3/core.rs
+++ b/core/src/services/s3/core.rs
@@ -455,6 +455,10 @@ impl S3Core {
             req = req.header(CACHE_CONTROL, cache_control)
         }
 
+        if args.if_not_exists() {
+            req = req.header(IF_NONE_MATCH, "*");
+        }
+
         // Set storage class header
         if let Some(v) = &self.default_storage_class {
             req = 
req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v);
@@ -476,10 +480,6 @@ impl S3Core {
             req = self.insert_checksum_header(req, &checksum);
         }
 
-        if args.if_not_exists() {
-            req = req.header(IF_NONE_MATCH, "*");
-        }
-
         // Set body
         let req = req.body(body).map_err(new_request_build_error)?;
 

Reply via email to