This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 90292ce9d95cd3d78853ef0d86a4dfd6e75024ae Author: Xuanwo <[email protected]> AuthorDate: Sat Sep 9 03:12:52 2023 +0800 Fix azblob Signed-off-by: Xuanwo <[email protected]> --- core/src/services/azblob/backend.rs | 15 ++++--- core/src/services/azblob/core.rs | 15 +------ core/src/services/azblob/writer.rs | 84 +++++++++++-------------------------- 3 files changed, 37 insertions(+), 77 deletions(-) diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index 4b04d5c7e..7e5fe259e 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -38,6 +38,7 @@ use super::pager::AzblobPager; use super::writer::AzblobWriter; use crate::raw::*; use crate::services::azblob::core::AzblobCore; +use crate::services::azblob::writer::AzblobWriters; use crate::types::Metadata; use crate::*; @@ -506,7 +507,7 @@ pub struct AzblobBackend { impl Accessor for AzblobBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = AzblobWriter; + type Writer = AzblobWriters; type BlockingWriter = (); type Pager = AzblobPager; type BlockingPager = (); @@ -601,10 +602,14 @@ impl Accessor for AzblobBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - AzblobWriter::new(self.core.clone(), args, path.to_string()), - )) + let w = AzblobWriter::new(self.core.clone(), args.clone(), path.to_string()); + let w = if args.append() { + AzblobWriters::Two(oio::AppendObjectWriter::new(w)) + } else { + AzblobWriters::One(oio::OneShotWriter::new(w)) + }; + + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/azblob/core.rs b/core/src/services/azblob/core.rs index 6382817bb..5eb46f464 100644 --- a/core/src/services/azblob/core.rs +++ b/core/src/services/azblob/core.rs @@ -352,12 +352,6 @@ impl AzblobCore { /// /// - The maximum size of the content could be appended is 4MB. /// - `Append Block` succeeds only if the blob already exists. - /// - It does not need to provide append position. - /// - But it could use append position to verify the content is appended to the right position. - /// - /// Since the `appendpos` only returned by the append operation response, - /// we could not use it when we want to append content to the blob first time. - /// (The first time of the appender, not the blob) /// /// # Reference /// @@ -365,8 +359,8 @@ impl AzblobCore { pub fn azblob_append_blob_request( &self, path: &str, + position: u64, size: u64, - position: Option<u64>, body: AsyncBody, ) -> Result<Request<AsyncBody>> { let p = build_abs_path(&self.root, path); @@ -385,12 +379,7 @@ impl AzblobCore { req = req.header(CONTENT_LENGTH, size); - if let Some(pos) = position { - req = req.header( - HeaderName::from_static(constants::X_MS_BLOB_CONDITION_APPENDPOS), - pos.to_string(), - ); - } + req = req.header(constants::X_MS_BLOB_CONDITION_APPENDPOS, position); let req = req.body(body).map_err(new_request_build_error)?; diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 15a63caca..029278ad5 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -16,9 +16,9 @@ // under the License. use std::sync::Arc; -use std::task::{Context, Poll}; use async_trait::async_trait; +use bytes::Bytes; use http::StatusCode; use super::core::AzblobCore; @@ -27,34 +27,32 @@ use crate::raw::*; use crate::*; const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset"; + +pub type AzblobWriters = + oio::TwoWaysWriter<oio::OneShotWriter<AzblobWriter>, oio::AppendObjectWriter<AzblobWriter>>; pub struct AzblobWriter { core: Arc<AzblobCore>, op: OpWrite, path: String, - - position: Option<u64>, } impl AzblobWriter { pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self { - AzblobWriter { - core, - op, - path, - position: None, - } + AzblobWriter { core, op, path } } +} - async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { +#[async_trait] +impl oio::OneShotWrite for AzblobWriter { + async fn write_once(&self, bs: Bytes) -> Result<()> { let mut req = self.core.azblob_put_blob_request( &self.path, - Some(size), + Some(bs.len() as u64), self.op.content_type(), self.op.cache_control(), - body, + AsyncBody::Bytes(bs), )?; self.core.sign(&mut req).await?; @@ -71,13 +69,11 @@ impl AzblobWriter { _ => Err(parse_error(resp).await?), } } +} - async fn current_position(&mut self) -> Result<Option<u64>> { - if let Some(v) = self.position { - return Ok(Some(v)); - } - - // TODO: we should check with current etag to make sure file not changed. +#[async_trait] +impl oio::AppendObjectWrite for AzblobWriter { + async fn offset(&self) -> Result<u64> { let resp = self .core .azblob_get_blob_properties(&self.path, None, None) @@ -86,9 +82,6 @@ impl AzblobWriter { let status = resp.status(); match status { - // Just check the blob type. - // If it is not an appendable blob, return an error. - // We can not get the append position of the blob here. StatusCode::OK => { let headers = resp.headers(); let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok()); @@ -98,9 +91,9 @@ impl AzblobWriter { "the blob is not an appendable blob.", )); } - Ok(None) + + Ok(parse_content_length(headers)?.unwrap_or_default()) } - // If the blob is not existing, we need to create one. StatusCode::NOT_FOUND => { let mut req = self.core.azblob_init_appendable_blob_request( &self.path, @@ -121,20 +114,16 @@ impl AzblobWriter { return Err(parse_error(resp).await?); } } - - self.position = Some(0); - Ok(Some(0)) + Ok(0) } _ => Err(parse_error(resp).await?), } } - async fn append_oneshot(&mut self, size: u64, body: AsyncBody) -> Result<()> { - let _ = self.current_position().await?; - - let mut req = - self.core - .azblob_append_blob_request(&self.path, size, self.position, body)?; + async fn append(&self, offset: u64, size: u64, body: AsyncBody) -> Result<()> { + let mut req = self + .core + .azblob_append_blob_request(&self.path, offset, size, body)?; self.core.sign(&mut req).await?; @@ -143,33 +132,10 @@ impl AzblobWriter { let status = resp.status(); match status { StatusCode::CREATED => { - let headers = resp.headers(); - let position = headers - .get(X_MS_BLOB_APPEND_OFFSET) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::<u64>().ok()); - self.position = position.map(|v| v + size); - } - _ => { - return Err(parse_error(resp).await?); + resp.into_body().consume().await?; + Ok(()) } + _ => Err(parse_error(resp).await?), } - - Ok(()) - } -} - -#[async_trait] -impl oio::Write for AzblobWriter { - fn poll_write(&mut self, _: &mut Context<'_>, _: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - todo!() - } - - fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) - } - - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { - Poll::Ready(Ok(())) } }
