This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new bb993f87f feat(services/obs): add append support (#2422)
bb993f87f is described below

commit bb993f87f1815341998745db0bc75f84bde899c0
Author: clundro <[email protected]>
AuthorDate: Tue Jun 6 11:45:52 2023 +0800

    feat(services/obs): add append support (#2422)
    
    * add init appender for obs
    
    Signed-off-by: clundro <[email protected]>
    
    * add more func
    
    Signed-off-by: clundro <[email protected]>
    
    * tune
    
    Signed-off-by: clundro <[email protected]>
    
    * fmt check
    
    Signed-off-by: clundro <[email protected]>
    
    ---------
    
    Signed-off-by: clundro <[email protected]>
---
 core/src/services/obs/appender.rs | 141 ++++++++++++++++++++++++++++++++++++++
 core/src/services/obs/backend.rs  |  16 ++++-
 core/src/services/obs/core.rs     |  38 +++++++++-
 core/src/services/obs/mod.rs      |   1 +
 4 files changed, 193 insertions(+), 3 deletions(-)

diff --git a/core/src/services/obs/appender.rs 
b/core/src/services/obs/appender.rs
new file mode 100644
index 000000000..1031910d5
--- /dev/null
+++ b/core/src/services/obs/appender.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::raw::*;
+use crate::*;
+
+pub const X_OBS_NEXT_APPEND_POSITION: &str = "x-obs-next-append-position";
+
+pub struct ObsAppender {
+    core: Arc<ObsCore>,
+
+    op: OpAppend,
+    path: String,
+
+    position: Option<u64>,
+}
+
+impl ObsAppender {
+    pub fn new(core: Arc<ObsCore>, path: &str, op: OpAppend) -> Self {
+        Self {
+            core,
+            op,
+            path: path.to_string(),
+            position: None,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Append for ObsAppender {
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        // If the position is not set, we need to get the current position.
+        if self.position.is_none() {
+            let resp = self.core.obs_head_object(&self.path, None, 
None).await?;
+
+            let status = resp.status();
+            match status {
+                StatusCode::OK => {
+                    let position = resp
+                        .headers()
+                        .get( X_OBS_NEXT_APPEND_POSITION)
+                        .and_then(|v| v.to_str().ok())
+                        .and_then(|v|v.parse::<u64>().ok())
+                        .ok_or_else(|| Error::new(ErrorKind::Unexpected, 
"missing x-obs-next-append-position, the object may not be appendable"))?;
+                    self.position = Some(position);
+                }
+
+                StatusCode::NOT_FOUND => {
+                    self.position = Some(0);
+                }
+
+                _ => {
+                    return Err(parse_error(resp).await?);
+                }
+            }
+        }
+
+        let mut req = self.core.obs_append_object_request(
+            &self.path,
+            self.position.expect("position is not set"),
+            bs.len(),
+            &self.op,
+            AsyncBody::Bytes(bs),
+        )?;
+
+        self.core.sign(&mut req).await?;
+
+        let resp = self.core.send(req).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let position = resp
+                    .headers()
+                    .get(X_OBS_NEXT_APPEND_POSITION)
+                    .and_then(|v| v.to_str().ok())
+                    .and_then(|v| v.parse::<u64>().ok())
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "missing x-obs-next-append-position, the object 
may not be appendable",
+                        )
+                    })?;
+                self.position = Some(position);
+                Ok(())
+            }
+
+            StatusCode::CONFLICT => {
+                // The object is not appendable or the position is not match 
with the object's length.
+                // If the position is not match, we could get the current 
position and retry.
+                let position = resp
+                    .headers()
+                    .get(X_OBS_NEXT_APPEND_POSITION)
+                    .and_then(|v| v.to_str().ok())
+                    .and_then(|v| v.parse::<u64>().ok())
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "missing x-obs-next-append-position, the object 
may not be appendable",
+                        )
+                    })?;
+                self.position = Some(position);
+
+                // Then return the error to the caller, so the caller could 
retry.
+                Err(Error::new(
+                ErrorKind::ConditionNotMatch,
+                "the position is not match with the object's length. position 
has been updated.",
+            ))
+            }
+
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index a1a1288fb..78d15d147 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -27,10 +27,10 @@ use reqsign::HuaweicloudObsConfig;
 use reqsign::HuaweicloudObsCredentialLoader;
 use reqsign::HuaweicloudObsSigner;
 
-use super::core::ObsCore;
 use super::error::parse_error;
 use super::pager::ObsPager;
 use super::writer::ObsWriter;
+use super::{appender::ObsAppender, core::ObsCore};
 use crate::raw::*;
 use crate::*;
 
@@ -300,7 +300,7 @@ impl Accessor for ObsBackend {
     type BlockingReader = ();
     type Writer = ObsWriter;
     type BlockingWriter = ();
-    type Appender = ();
+    type Appender = ObsAppender;
     type Pager = ObsPager;
     type BlockingPager = ();
 
@@ -324,6 +324,11 @@ impl Accessor for ObsBackend {
                 write_with_content_type: true,
                 write_with_cache_control: true,
 
+                append: true,
+                append_with_cache_control: true,
+                append_with_content_type: true,
+                append_with_content_disposition: true,
+
                 delete: true,
                 create_dir: true,
                 copy: true,
@@ -426,6 +431,13 @@ impl Accessor for ObsBackend {
         ))
     }
 
+    async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend, 
Self::Appender)> {
+        Ok((
+            RpAppend::default(),
+            ObsAppender::new(self.core.clone(), path, args),
+        ))
+    }
+
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
         let resp = self.core.obs_copy_object(from, to).await?;
 
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index 32491c597..29e2d783b 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::time::Duration;
 
-use http::header::CACHE_CONTROL;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
 use http::header::IF_MATCH;
 use http::header::IF_NONE_MATCH;
+use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION};
 use http::Request;
 use http::Response;
 use reqsign::HuaweicloudObsCredential;
@@ -232,6 +232,42 @@ impl ObsCore {
         self.send(req).await
     }
 
+    pub fn obs_append_object_request(
+        &self,
+        path: &str,
+        position: u64,
+        size: usize,
+        args: &OpAppend,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/{}?append&position={}",
+            self.endpoint,
+            percent_encode_path(&p),
+            position
+        );
+
+        let mut req = Request::post(&url);
+
+        req = req.header(CONTENT_LENGTH, size);
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(CONTENT_TYPE, mime);
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(CONTENT_DISPOSITION, pos);
+        }
+
+        if let Some(cache_control) = args.cache_control() {
+            req = req.header(CACHE_CONTROL, cache_control)
+        }
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+        Ok(req)
+    }
+
     pub async fn obs_copy_object(
         &self,
         from: &str,
diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs
index 9e44b65ef..f6fc0606f 100644
--- a/core/src/services/obs/mod.rs
+++ b/core/src/services/obs/mod.rs
@@ -18,6 +18,7 @@
 mod backend;
 pub use backend::ObsBuilder as Obs;
 
+mod appender;
 mod core;
 mod error;
 mod pager;

Reply via email to