This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push: new 60be5ebbf refactor(services/obs): Implement Write Returns Metadata for obs (#5912) 60be5ebbf is described below commit 60be5ebbf7a4e38cc7135c751e6bc7fd7bf59220 Author: Ziyi Tan <ajb459684...@gmail.com> AuthorDate: Mon Mar 31 16:18:03 2025 +0800 refactor(services/obs): Implement Write Returns Metadata for obs (#5912) Signed-off-by: Ziy1-Tan <ajb459684...@gmail.com> --- core/src/services/obs/backend.rs | 11 ++++++++++ core/src/services/obs/config.rs | 2 ++ core/src/services/obs/core.rs | 12 +++++++++++ core/src/services/obs/writer.rs | 43 +++++++++++++++++++++++++++++++++++----- 4 files changed, 63 insertions(+), 5 deletions(-) diff --git a/core/src/services/obs/backend.rs b/core/src/services/obs/backend.rs index 01e01ae4b..748e37d41 100644 --- a/core/src/services/obs/backend.rs +++ b/core/src/services/obs/backend.rs @@ -131,6 +131,13 @@ impl ObsBuilder { self } + /// Set bucket versioning status for this backend + pub fn enable_versioning(mut self, enabled: bool) -> Self { + self.config.enable_versioning = enabled; + + self + } + /// Specify the http client that used by this service. /// /// # Notes @@ -350,6 +357,10 @@ impl Access for ObsBackend { meta.with_user_metadata(user_meta); } + if let Some(v) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? { + meta.set_version(v); + } + Ok(RpStat::new(meta)) } StatusCode::NOT_FOUND if path.ends_with('/') => { diff --git a/core/src/services/obs/config.rs b/core/src/services/obs/config.rs index 261cffe38..c92369cbc 100644 --- a/core/src/services/obs/config.rs +++ b/core/src/services/obs/config.rs @@ -36,6 +36,8 @@ pub struct ObsConfig { pub secret_access_key: Option<String>, /// Bucket for obs. pub bucket: Option<String>, + /// Is bucket versioning enabled for this bucket + pub enable_versioning: bool, } impl Debug for ObsConfig { diff --git a/core/src/services/obs/core.rs b/core/src/services/obs/core.rs index 30ba6cab6..f12c0a80c 100644 --- a/core/src/services/obs/core.rs +++ b/core/src/services/obs/core.rs @@ -40,6 +40,7 @@ use crate::*; pub mod constants { pub const X_OBS_META_PREFIX: &str = "x-obs-meta-"; + pub const X_OBS_VERSION_ID: &str = "x-obs-version-id"; } pub struct ObsCore { @@ -478,6 +479,17 @@ pub struct CompleteMultipartUploadRequestPart { pub etag: String, } +/// Output of `CompleteMultipartUpload` operation +#[derive(Debug, Default, Deserialize)] +#[serde[default, rename_all = "PascalCase"]] +pub struct CompleteMultipartUploadResult { + pub location: String, + pub bucket: String, + pub key: String, + #[serde(rename = "ETag")] + pub etag: String, +} + #[derive(Default, Debug, Deserialize)] #[serde(default, rename_all = "PascalCase")] pub struct ListObjectsOutput { diff --git a/core/src/services/obs/writer.rs b/core/src/services/obs/writer.rs index cbb523864..a93113a3e 100644 --- a/core/src/services/obs/writer.rs +++ b/core/src/services/obs/writer.rs @@ -17,7 +17,8 @@ use std::sync::Arc; -use http::StatusCode; +use bytes::Buf; +use http::{HeaderMap, HeaderValue, StatusCode}; use super::core::*; use super::error::parse_error; @@ -42,6 +43,21 @@ impl ObsWriter { op, } } + + fn parse_metadata(headers: &HeaderMap<HeaderValue>) -> Result<Metadata> { + let mut meta = Metadata::default(); + if let Some(etag) = parse_etag(headers)? { + meta.set_etag(etag); + } + if let Some(md5) = parse_content_md5(headers)? { + meta.set_content_md5(md5); + } + if let Some(version) = parse_header_to_str(headers, constants::X_OBS_VERSION_ID)? { + meta.set_version(version); + } + + Ok(meta) + } } impl oio::MultipartWrite for ObsWriter { @@ -54,10 +70,12 @@ impl oio::MultipartWrite for ObsWriter { let resp = self.core.send(req).await?; + let meta = Self::parse_metadata(resp.headers())?; + let status = resp.status(); match status { - StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()), + StatusCode::CREATED | StatusCode::OK => Ok(meta), _ => Err(parse_error(resp)), } } @@ -131,15 +149,22 @@ impl oio::MultipartWrite for ObsWriter { }) .collect(); - let resp = self + let mut resp = self .core .obs_complete_multipart_upload(&self.path, upload_id, parts) .await?; + let mut meta = Self::parse_metadata(resp.headers())?; + + let result: CompleteMultipartUploadResult = + quick_xml::de::from_reader(resp.body_mut().reader()) + .map_err(new_xml_deserialize_error)?; + meta.set_etag(&result.etag); + let status = resp.status(); match status { - StatusCode::OK => Ok(Metadata::default()), + StatusCode::OK => Ok(meta), _ => Err(parse_error(resp)), } } @@ -190,10 +215,18 @@ impl oio::AppendWrite for ObsWriter { let resp = self.core.send(req).await?; + let mut meta = Metadata::default(); + if let Some(md5) = parse_content_md5(resp.headers())? { + meta.set_content_md5(md5); + } + if let Some(version) = parse_header_to_str(resp.headers(), constants::X_OBS_VERSION_ID)? { + meta.set_version(version); + } + let status = resp.status(); match status { - StatusCode::OK => Ok(Metadata::default()), + StatusCode::OK => Ok(meta), _ => Err(parse_error(resp)), } }