This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new bb993f87f feat(services/obs): add append support (#2422)
bb993f87f is described below
commit bb993f87f1815341998745db0bc75f84bde899c0
Author: clundro <[email protected]>
AuthorDate: Tue Jun 6 11:45:52 2023 +0800
feat(services/obs): add append support (#2422)
* add init appender for obs
Signed-off-by: clundro <[email protected]>
* add more func
Signed-off-by: clundro <[email protected]>
* tune
Signed-off-by: clundro <[email protected]>
* fmt check
Signed-off-by: clundro <[email protected]>
---------
Signed-off-by: clundro <[email protected]>
---
core/src/services/obs/appender.rs | 141 ++++++++++++++++++++++++++++++++++++++
core/src/services/obs/backend.rs | 16 ++++-
core/src/services/obs/core.rs | 38 +++++++++-
core/src/services/obs/mod.rs | 1 +
4 files changed, 193 insertions(+), 3 deletions(-)
diff --git a/core/src/services/obs/appender.rs
b/core/src/services/obs/appender.rs
new file mode 100644
index 000000000..1031910d5
--- /dev/null
+++ b/core/src/services/obs/appender.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::core::*;
+use super::error::parse_error;
+use crate::raw::*;
+use crate::*;
+
+pub const X_OBS_NEXT_APPEND_POSITION: &str = "x-obs-next-append-position";
+
+pub struct ObsAppender {
+ core: Arc<ObsCore>,
+
+ op: OpAppend,
+ path: String,
+
+ position: Option<u64>,
+}
+
+impl ObsAppender {
+ pub fn new(core: Arc<ObsCore>, path: &str, op: OpAppend) -> Self {
+ Self {
+ core,
+ op,
+ path: path.to_string(),
+ position: None,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Append for ObsAppender {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ // If the position is not set, we need to get the current position.
+ if self.position.is_none() {
+ let resp = self.core.obs_head_object(&self.path, None,
None).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let position = resp
+ .headers()
+ .get( X_OBS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v|v.parse::<u64>().ok())
+ .ok_or_else(|| Error::new(ErrorKind::Unexpected,
"missing x-obs-next-append-position, the object may not be appendable"))?;
+ self.position = Some(position);
+ }
+
+ StatusCode::NOT_FOUND => {
+ self.position = Some(0);
+ }
+
+ _ => {
+ return Err(parse_error(resp).await?);
+ }
+ }
+ }
+
+ let mut req = self.core.obs_append_object_request(
+ &self.path,
+ self.position.expect("position is not set"),
+ bs.len(),
+ &self.op,
+ AsyncBody::Bytes(bs),
+ )?;
+
+ self.core.sign(&mut req).await?;
+
+ let resp = self.core.send(req).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let position = resp
+ .headers()
+ .get(X_OBS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v| v.parse::<u64>().ok())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "missing x-obs-next-append-position, the object
may not be appendable",
+ )
+ })?;
+ self.position = Some(position);
+ Ok(())
+ }
+
+ StatusCode::CONFLICT => {
+ // The object is not appendable or the position is not match
with the object's length.
+ // If the position is not match, we could get the current
position and retry.
+ let position = resp
+ .headers()
+ .get(X_OBS_NEXT_APPEND_POSITION)
+ .and_then(|v| v.to_str().ok())
+ .and_then(|v| v.parse::<u64>().ok())
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "missing x-obs-next-append-position, the object
may not be appendable",
+ )
+ })?;
+ self.position = Some(position);
+
+ // Then return the error to the caller, so the caller could
retry.
+ Err(Error::new(
+ ErrorKind::ConditionNotMatch,
+ "the position is not match with the object's length. position
has been updated.",
+ ))
+ }
+
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs
index a1a1288fb..78d15d147 100644
--- a/core/src/services/obs/backend.rs
+++ b/core/src/services/obs/backend.rs
@@ -27,10 +27,10 @@ use reqsign::HuaweicloudObsConfig;
use reqsign::HuaweicloudObsCredentialLoader;
use reqsign::HuaweicloudObsSigner;
-use super::core::ObsCore;
use super::error::parse_error;
use super::pager::ObsPager;
use super::writer::ObsWriter;
+use super::{appender::ObsAppender, core::ObsCore};
use crate::raw::*;
use crate::*;
@@ -300,7 +300,7 @@ impl Accessor for ObsBackend {
type BlockingReader = ();
type Writer = ObsWriter;
type BlockingWriter = ();
- type Appender = ();
+ type Appender = ObsAppender;
type Pager = ObsPager;
type BlockingPager = ();
@@ -324,6 +324,11 @@ impl Accessor for ObsBackend {
write_with_content_type: true,
write_with_cache_control: true,
+ append: true,
+ append_with_cache_control: true,
+ append_with_content_type: true,
+ append_with_content_disposition: true,
+
delete: true,
create_dir: true,
copy: true,
@@ -426,6 +431,13 @@ impl Accessor for ObsBackend {
))
}
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ Ok((
+ RpAppend::default(),
+ ObsAppender::new(self.core.clone(), path, args),
+ ))
+ }
+
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
let resp = self.core.obs_copy_object(from, to).await?;
diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs
index 32491c597..29e2d783b 100644
--- a/core/src/services/obs/core.rs
+++ b/core/src/services/obs/core.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::time::Duration;
-use http::header::CACHE_CONTROL;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
+use http::header::{CACHE_CONTROL, CONTENT_DISPOSITION};
use http::Request;
use http::Response;
use reqsign::HuaweicloudObsCredential;
@@ -232,6 +232,42 @@ impl ObsCore {
self.send(req).await
}
+ pub fn obs_append_object_request(
+ &self,
+ path: &str,
+ position: u64,
+ size: usize,
+ args: &OpAppend,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/{}?append&position={}",
+ self.endpoint,
+ percent_encode_path(&p),
+ position
+ );
+
+ let mut req = Request::post(&url);
+
+ req = req.header(CONTENT_LENGTH, size);
+
+ if let Some(mime) = args.content_type() {
+ req = req.header(CONTENT_TYPE, mime);
+ }
+
+ if let Some(pos) = args.content_disposition() {
+ req = req.header(CONTENT_DISPOSITION, pos);
+ }
+
+ if let Some(cache_control) = args.cache_control() {
+ req = req.header(CACHE_CONTROL, cache_control)
+ }
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+ Ok(req)
+ }
+
pub async fn obs_copy_object(
&self,
from: &str,
diff --git a/core/src/services/obs/mod.rs b/core/src/services/obs/mod.rs
index 9e44b65ef..f6fc0606f 100644
--- a/core/src/services/obs/mod.rs
+++ b/core/src/services/obs/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::ObsBuilder as Obs;
+mod appender;
mod core;
mod error;
mod pager;