This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 01e70a6e feat(services/oss): add append support (#2279)
01e70a6e is described below

commit 01e70a6e6ef94bc80161b19ae44c4a1041b3bf73
Author: Suyan <[email protected]>
AuthorDate: Tue May 23 11:19:39 2023 +0800

    feat(services/oss): add append support (#2279)
    
    * feat(services/oss): add append support
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * set capability
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * fix `you`
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * add append for oss
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * fix append
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * try copy from appender
    
    Signed-off-by: suyanhanx <[email protected]>
    
    ---------
    
    Signed-off-by: suyanhanx <[email protected]>
---
 core/src/services/oss/appender.rs   | 143 ++++++++++++++++++++++++++++++++++++
 core/src/services/oss/backend.rs    |  15 +++-
 core/src/services/oss/core.rs       |  49 ++++++++++++
 core/src/services/oss/mod.rs        |   1 +
 core/src/types/capability.rs        |   3 +
 core/src/types/operator/operator.rs |   6 +-
 core/tests/behavior/append.rs       |  39 +++++++++-
 7 files changed, 251 insertions(+), 5 deletions(-)

diff --git a/core/src/services/oss/appender.rs 
b/core/src/services/oss/appender.rs
new file mode 100644
index 00000000..b327d824
--- /dev/null
+++ b/core/src/services/oss/appender.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpAppend;
+use crate::raw::*;
+use crate::*;
+
+pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position";
+
+pub struct OssAppender {
+    core: Arc<OssCore>,
+
+    op: OpAppend,
+    path: String,
+
+    position: Option<u64>,
+}
+
+impl OssAppender {
+    pub fn new(core: Arc<OssCore>, path: &str, op: OpAppend) -> Self {
+        Self {
+            core,
+            op,
+            path: path.to_string(),
+            position: None,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Append for OssAppender {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        // If the position is not set, we need to get the current position.
+        if self.position.is_none() {
+            let resp = self.core.oss_head_object(&self.path, None, 
None).await?;
+
+            let status = resp.status();
+            match status {
+                StatusCode::OK => {
+                    let position = resp
+                        .headers()
+                        .get(X_OSS_NEXT_APPEND_POSITION)
+                        .and_then(|v| v.to_str().ok())
+                        .and_then(|v| v.parse::<u64>().ok())
+                        .ok_or_else(|| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                "missing x-oss-next-append-position, the 
object may not be appendable",
+                            )
+                        })?;
+                    self.position = Some(position);
+                }
+                StatusCode::NOT_FOUND => {
+                    self.position = Some(0);
+                }
+                _ => {
+                    return Err(parse_error(resp).await?);
+                }
+            }
+        }
+
+        let mut req = self.core.oss_append_object_request(
+            &self.path,
+            self.position.expect("position is not set"),
+            bs.len(),
+            &self.op,
+            AsyncBody::Bytes(bs),
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let position = resp
+                    .headers()
+                    .get(X_OSS_NEXT_APPEND_POSITION)
+                    .and_then(|v| v.to_str().ok())
+                    .and_then(|v| v.parse::<u64>().ok())
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "missing x-oss-next-append-position, the object 
may not be appendable",
+                        )
+                    })?;
+                self.position = Some(position);
+                Ok(())
+            }
+            StatusCode::CONFLICT => {
+                // The object is not appendable or the position is not match 
with the object's length.
+                // If the position is not match, we could get the current 
position and retry.
+                let position = resp
+                    .headers()
+                    .get(X_OSS_NEXT_APPEND_POSITION)
+                    .and_then(|v| v.to_str().ok())
+                    .and_then(|v| v.parse::<u64>().ok())
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "missing x-oss-next-append-position, the object 
may not be appendable",
+                        )
+                    })?;
+                self.position = Some(position);
+
+                // Then return the error to the caller, so the caller could 
retry.
+                Err(Error::new(
+                    ErrorKind::ConditionNotMatch,
+                    "the position is not match with the object's length. 
position has been updated.",
+                ))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 713236f1..c436fc98 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -30,6 +30,7 @@ use reqsign::AliyunConfig;
 use reqsign::AliyunLoader;
 use reqsign::AliyunOssSigner;
 
+use super::appender::OssAppender;
 use super::core::*;
 use super::error::parse_error;
 use super::pager::OssPager;
@@ -441,7 +442,7 @@ impl Accessor for OssBackend {
     type BlockingReader = ();
     type Writer = OssWriter;
     type BlockingWriter = ();
-    type Appender = ();
+    type Appender = OssAppender;
     type Pager = OssPager;
     type BlockingPager = ();
 
@@ -469,6 +470,11 @@ impl Accessor for OssBackend {
                 create_dir: true,
                 copy: true,
 
+                append: true,
+                append_with_cache_control: true,
+                append_with_content_type: true,
+                append_with_content_disposition: true,
+
                 list: true,
                 list_with_delimiter_slash: true,
                 list_without_delimiter: true,
@@ -533,6 +539,13 @@ impl Accessor for OssBackend {
         ))
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        Ok((
+            RpAppend::default(),
+            OssAppender::new(self.core.clone(), path, args),
+        ))
+    }
+
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
         let resp = self.core.oss_copy_object(from, to).await?;
         let status = resp.status();
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 68297dfc..6fd36ce2 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner;
 use serde::Deserialize;
 use serde::Serialize;
 
+use crate::ops::OpAppend;
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
@@ -186,6 +187,54 @@ impl OssCore {
         Ok(req)
     }
 
+    /// Oss append object request
+    ///
+    /// # Note
+    ///
+    /// This request is used to append data to an existing object or create an 
appendable object.
+    /// So we must set the `append` and `position` header.
+    ///
+    /// 
https://www.alibabacloud.com/help/object-storage-service/latest/appendobject
+    pub fn oss_append_object_request(
+        &self,
+        path: &str,
+        position: u64,
+        size: usize,
+        args: &OpAppend,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let endpoint = self.get_endpoint(false);
+        let url = format!(
+            "{}/{}?append&position={}",
+            endpoint,
+            percent_encode_path(&p),
+            position
+        );
+
+        let mut req = Request::post(&url);
+
+        req = req.header(CONTENT_LENGTH, size);
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(CONTENT_TYPE, mime);
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(CONTENT_DISPOSITION, pos);
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(CACHE_CONTROL, cache_control)
+        }
+
+        // set sse headers
+        req = self.insert_sse_headers(req);
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+        Ok(req)
+    }
+
     pub fn oss_get_object_request(
         &self,
         path: &str,
diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs
index 9bfd1692..21829ea5 100644
--- a/core/src/services/oss/mod.rs
+++ b/core/src/services/oss/mod.rs
@@ -18,6 +18,7 @@
 mod backend;
 pub use backend::OssBuilder as Oss;
 
+mod appender;
 mod core;
 mod error;
 mod pager;
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index c6b8e981..1153a93f 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -151,6 +151,9 @@ impl Debug for Capability {
         if self.write {
             s.push("Write");
         }
+        if self.append {
+            s.push("Append");
+        }
         if self.create_dir {
             s.push("CreateDir");
         }
diff --git a/core/src/types/operator/operator.rs 
b/core/src/types/operator/operator.rs
index e97ed5dc..da0cf1c7 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1006,9 +1006,9 @@ impl Operator {
         }
 
         let bs = bs.into();
-        let (_, mut w) = self.inner().append(&path, args).await?;
-        w.append(bs).await?;
-        w.close().await?;
+        let (_, mut a) = self.inner().append(&path, args).await?;
+        a.append(bs).await?;
+        a.close().await?;
 
         Ok(())
     }
diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs
index 1c03a3e6..e9e55d2a 100644
--- a/core/tests/behavior/append.rs
+++ b/core/tests/behavior/append.rs
@@ -16,10 +16,13 @@
 // under the License.
 
 use anyhow::Result;
+use log::warn;
 use opendal::ops::OpAppend;
 use opendal::EntryMode;
 use opendal::ErrorKind;
 use opendal::Operator;
+use sha2::Digest;
+use sha2::Sha256;
 
 use super::utils::*;
 
@@ -68,6 +71,7 @@ macro_rules! behavior_append_tests {
                 test_append_with_content_type,
                 test_append_with_content_disposition,
 
+                test_appender_futures_copy,
                 test_fuzz_appender,
             );
         )*
@@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> {
 
 /// Test append to a directory path must fail.
 pub async fn test_append_with_dir_path(op: Operator) -> Result<()> {
-    let path = uuid::Uuid::new_v4().to_string();
+    let path = format!("{}/", uuid::Uuid::new_v4());
     let (content, _) = gen_bytes();
 
     let res = op.append(&path, content).await;
@@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op: 
Operator) -> Result<()> {
     Ok(())
 }
 
+/// Copy data from reader to writer
+pub async fn test_appender_futures_copy(op: Operator) -> Result<()> {
+    let path = uuid::Uuid::new_v4().to_string();
+    let (content, size): (Vec<u8>, usize) =
+        gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+    let mut a = match op.appender(&path).await {
+        Ok(a) => a,
+        Err(err) if err.kind() == ErrorKind::Unsupported => {
+            warn!("service doesn't support write with append");
+            return Ok(());
+        }
+        Err(err) => return Err(err.into()),
+    };
+
+    futures::io::copy(&mut content.as_slice(), &mut a).await?;
+    a.close().await?;
+
+    let meta = op.stat(&path).await.expect("stat must succeed");
+    assert_eq!(meta.content_length(), size as u64);
+
+    let bs = op.read(&path).await?;
+    assert_eq!(bs.len(), size, "read size");
+    assert_eq!(
+        format!("{:x}", Sha256::digest(&bs[..size])),
+        format!("{:x}", Sha256::digest(content)),
+        "read content"
+    );
+
+    op.delete(&path).await.expect("delete must succeed");
+    Ok(())
+}
+
 /// Test for fuzzing appender.
 pub async fn test_fuzz_appender(op: Operator) -> Result<()> {
     let path = uuid::Uuid::new_v4().to_string();

Reply via email to