This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 01e70a6e feat(services/oss): add append support (#2279)
01e70a6e is described below
commit 01e70a6e6ef94bc80161b19ae44c4a1041b3bf73
Author: Suyan <[email protected]>
AuthorDate: Tue May 23 11:19:39 2023 +0800
feat(services/oss): add append support (#2279)
* feat(services/oss): add append support
Signed-off-by: suyanhanx <[email protected]>
* set capability
Signed-off-by: suyanhanx <[email protected]>
* fix `you`
Signed-off-by: suyanhanx <[email protected]>
* add append for oss
Signed-off-by: suyanhanx <[email protected]>
* fix append
Signed-off-by: suyanhanx <[email protected]>
* try copy from appender
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
core/src/services/oss/appender.rs | 143 ++++++++++++++++++++++++++++++++++++
core/src/services/oss/backend.rs | 15 +++-
core/src/services/oss/core.rs | 49 ++++++++++++
core/src/services/oss/mod.rs | 1 +
core/src/types/capability.rs | 3 +
core/src/types/operator/operator.rs | 6 +-
core/tests/behavior/append.rs | 39 +++++++++-
7 files changed, 251 insertions(+), 5 deletions(-)
diff --git a/core/src/services/oss/appender.rs
b/core/src/services/oss/appender.rs
new file mode 100644
index 00000000..b327d824
--- /dev/null
+++ b/core/src/services/oss/appender.rs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::ops::OpAppend;
+use crate::raw::*;
+use crate::*;
+
+pub const X_OSS_NEXT_APPEND_POSITION: &str = "x-oss-next-append-position";
+
+pub struct OssAppender {
+ core: Arc<OssCore>,
+
+ op: OpAppend,
+ path: String,
+
+ position: Option<u64>,
+}
+
+impl OssAppender {
+ pub fn new(core: Arc<OssCore>, path: &str, op: OpAppend) -> Self {
+ Self {
+ core,
+ op,
+ path: path.to_string(),
+ position: None,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Append for OssAppender {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ // If the position is not set, we need to get the current position.
+ if self.position.is_none() {
+ let resp = self.core.oss_head_object(&self.path, None,
None).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let position = resp
+ .headers()
+ .get(X_OSS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v| v.parse::<u64>().ok())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "missing x-oss-next-append-position, the
object may not be appendable",
+ )
+ })?;
+ self.position = Some(position);
+ }
+ StatusCode::NOT_FOUND => {
+ self.position = Some(0);
+ }
+ _ => {
+ return Err(parse_error(resp).await?);
+ }
+ }
+ }
+
+ let mut req = self.core.oss_append_object_request(
+ &self.path,
+ self.position.expect("position is not set"),
+ bs.len(),
+ &self.op,
+ AsyncBody::Bytes(bs),
+ )?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let position = resp
+ .headers()
+ .get(X_OSS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v| v.parse::<u64>().ok())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "missing x-oss-next-append-position, the object
may not be appendable",
+ )
+ })?;
+ self.position = Some(position);
+ Ok(())
+ }
+ StatusCode::CONFLICT => {
+ // The object is not appendable or the position is not match
with the object's length.
+ // If the position is not match, we could get the current
position and retry.
+ let position = resp
+ .headers()
+ .get(X_OSS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v| v.parse::<u64>().ok())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "missing x-oss-next-append-position, the object
may not be appendable",
+ )
+ })?;
+ self.position = Some(position);
+
+ // Then return the error to the caller, so the caller could
retry.
+ Err(Error::new(
+ ErrorKind::ConditionNotMatch,
+ "the position is not match with the object's length.
position has been updated.",
+ ))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 713236f1..c436fc98 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -30,6 +30,7 @@ use reqsign::AliyunConfig;
use reqsign::AliyunLoader;
use reqsign::AliyunOssSigner;
+use super::appender::OssAppender;
use super::core::*;
use super::error::parse_error;
use super::pager::OssPager;
@@ -441,7 +442,7 @@ impl Accessor for OssBackend {
type BlockingReader = ();
type Writer = OssWriter;
type BlockingWriter = ();
- type Appender = ();
+ type Appender = OssAppender;
type Pager = OssPager;
type BlockingPager = ();
@@ -469,6 +470,11 @@ impl Accessor for OssBackend {
create_dir: true,
copy: true,
+ append: true,
+ append_with_cache_control: true,
+ append_with_content_type: true,
+ append_with_content_disposition: true,
+
list: true,
list_with_delimiter_slash: true,
list_without_delimiter: true,
@@ -533,6 +539,13 @@ impl Accessor for OssBackend {
))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ Ok((
+ RpAppend::default(),
+ OssAppender::new(self.core.clone(), path, args),
+ ))
+ }
+
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
let resp = self.core.oss_copy_object(from, to).await?;
let status = resp.status();
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index 68297dfc..6fd36ce2 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -37,6 +37,7 @@ use reqsign::AliyunOssSigner;
use serde::Deserialize;
use serde::Serialize;
+use crate::ops::OpAppend;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
@@ -186,6 +187,54 @@ impl OssCore {
Ok(req)
}
+ /// Oss append object request
+ ///
+ /// # Note
+ ///
+ /// This request is used to append data to an existing object or create an
appendable object.
+ /// So we must set the `append` and `position` header.
+ ///
+ ///
https://www.alibabacloud.com/help/object-storage-service/latest/appendobject
+ pub fn oss_append_object_request(
+ &self,
+ path: &str,
+ position: u64,
+ size: usize,
+ args: &OpAppend,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let endpoint = self.get_endpoint(false);
+ let url = format!(
+ "{}/{}?append&position={}",
+ endpoint,
+ percent_encode_path(&p),
+ position
+ );
+
+ let mut req = Request::post(&url);
+
+ req = req.header(CONTENT_LENGTH, size);
+
+ if let Some(mime) = args.content_type() {
+ req = req.header(CONTENT_TYPE, mime);
+ }
+
+ if let Some(pos) = args.content_disposition() {
+ req = req.header(CONTENT_DISPOSITION, pos);
+ }
+
+ if let Some(cache_control) = args.cache_control() {
+ req = req.header(CACHE_CONTROL, cache_control)
+ }
+
+ // set sse headers
+ req = self.insert_sse_headers(req);
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+ Ok(req)
+ }
+
pub fn oss_get_object_request(
&self,
path: &str,
diff --git a/core/src/services/oss/mod.rs b/core/src/services/oss/mod.rs
index 9bfd1692..21829ea5 100644
--- a/core/src/services/oss/mod.rs
+++ b/core/src/services/oss/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::OssBuilder as Oss;
+mod appender;
mod core;
mod error;
mod pager;
diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs
index c6b8e981..1153a93f 100644
--- a/core/src/types/capability.rs
+++ b/core/src/types/capability.rs
@@ -151,6 +151,9 @@ impl Debug for Capability {
if self.write {
s.push("Write");
}
+ if self.append {
+ s.push("Append");
+ }
if self.create_dir {
s.push("CreateDir");
}
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index e97ed5dc..da0cf1c7 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -1006,9 +1006,9 @@ impl Operator {
}
let bs = bs.into();
- let (_, mut w) = self.inner().append(&path, args).await?;
- w.append(bs).await?;
- w.close().await?;
+ let (_, mut a) = self.inner().append(&path, args).await?;
+ a.append(bs).await?;
+ a.close().await?;
Ok(())
}
diff --git a/core/tests/behavior/append.rs b/core/tests/behavior/append.rs
index 1c03a3e6..e9e55d2a 100644
--- a/core/tests/behavior/append.rs
+++ b/core/tests/behavior/append.rs
@@ -16,10 +16,13 @@
// under the License.
use anyhow::Result;
+use log::warn;
use opendal::ops::OpAppend;
use opendal::EntryMode;
use opendal::ErrorKind;
use opendal::Operator;
+use sha2::Digest;
+use sha2::Sha256;
use super::utils::*;
@@ -68,6 +71,7 @@ macro_rules! behavior_append_tests {
test_append_with_content_type,
test_append_with_content_disposition,
+ test_appender_futures_copy,
test_fuzz_appender,
);
)*
@@ -101,7 +105,7 @@ pub async fn test_append(op: Operator) -> Result<()> {
/// Test append to a directory path must fail.
pub async fn test_append_with_dir_path(op: Operator) -> Result<()> {
- let path = uuid::Uuid::new_v4().to_string();
+ let path = format!("{}/", uuid::Uuid::new_v4());
let (content, _) = gen_bytes();
let res = op.append(&path, content).await;
@@ -197,6 +201,39 @@ pub async fn test_append_with_content_disposition(op:
Operator) -> Result<()> {
Ok(())
}
+/// Copy data from reader to writer
+pub async fn test_appender_futures_copy(op: Operator) -> Result<()> {
+ let path = uuid::Uuid::new_v4().to_string();
+ let (content, size): (Vec<u8>, usize) =
+ gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
+
+ let mut a = match op.appender(&path).await {
+ Ok(a) => a,
+ Err(err) if err.kind() == ErrorKind::Unsupported => {
+ warn!("service doesn't support write with append");
+ return Ok(());
+ }
+ Err(err) => return Err(err.into()),
+ };
+
+ futures::io::copy(&mut content.as_slice(), &mut a).await?;
+ a.close().await?;
+
+ let meta = op.stat(&path).await.expect("stat must succeed");
+ assert_eq!(meta.content_length(), size as u64);
+
+ let bs = op.read(&path).await?;
+ assert_eq!(bs.len(), size, "read size");
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&bs[..size])),
+ format!("{:x}", Sha256::digest(content)),
+ "read content"
+ );
+
+ op.delete(&path).await.expect("delete must succeed");
+ Ok(())
+}
+
/// Test for fuzzing appender.
pub async fn test_fuzz_appender(op: Operator) -> Result<()> {
let path = uuid::Uuid::new_v4().to_string();