This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ab919a94442be0e5e87e3fc5a4fbf7782669e1a3 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:01:44 2023 +0800 Migrate azblob Signed-off-by: Xuanwo <[email protected]> --- core/src/services/azblob/backend.rs | 30 ++++++-- core/src/services/azblob/writer.rs | 138 +++++++++++++----------------------- 2 files changed, 75 insertions(+), 93 deletions(-) diff --git a/core/src/services/azblob/backend.rs b/core/src/services/azblob/backend.rs index fbd480de6..3a437b6c8 100644 --- a/core/src/services/azblob/backend.rs +++ b/core/src/services/azblob/backend.rs @@ -38,6 +38,7 @@ use super::pager::AzblobPager; use super::writer::AzblobWriter; use crate::raw::*; use crate::services::azblob::core::AzblobCore; +use crate::services::azblob::writer::AzblobWriters; use crate::types::Metadata; use crate::*; @@ -52,6 +53,7 @@ const KNOWN_AZBLOB_ENDPOINT_SUFFIX: &[&str] = &[ ]; const AZBLOB_BATCH_LIMIT: usize = 256; + /// Azure Storage Blob services support. #[doc = include_str!("docs.md")] #[derive(Default, Clone)] @@ -506,7 +508,7 @@ pub struct AzblobBackend { impl Accessor for AzblobBackend { type Reader = IncomingAsyncBody; type BlockingReader = (); - type Writer = AzblobWriter; + type Writer = oio::TwoWaysWriter<AzblobWriters, oio::AtLeastBufWriter<AzblobWriters>>; type BlockingWriter = (); type Pager = AzblobPager; type BlockingPager = (); @@ -602,10 +604,28 @@ impl Accessor for AzblobBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - Ok(( - RpWrite::default(), - AzblobWriter::new(self.core.clone(), args, path.to_string()), - )) + let writer = AzblobWriter::new(self.core.clone(), path, args.clone()); + + let w = if args.content_length().is_some() { + AzblobWriters::One(oio::OneShotWriter::new(writer)) + } else if args.append() { + AzblobWriters::Two(oio::AppendObjectWriter::new(writer)) + } else { + return Err(Error::new( + ErrorKind::Unsupported, + "azblob write without neither content-length nor append is not supported yet", + )); + }; + + let w = if let Some(buffer_size) = args.buffer_size() { + oio::TwoWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(args.content_length()), + ) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((RpWrite::default(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index c2519294a..7e121cd05 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -22,39 +22,42 @@ use http::StatusCode; use super::core::AzblobCore; use super::error::parse_error; -use crate::raw::oio::Stream; +use crate::raw::oio::{Stream, Streamer}; use crate::raw::*; use crate::*; const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset"; +pub type AzblobWriters = + oio::TwoWaysWriter<oio::OneShotWriter<AzblobWriter>, oio::AppendObjectWriter<AzblobWriter>>; + pub struct AzblobWriter { core: Arc<AzblobCore>, - op: OpWrite, path: String, - - position: Option<u64>, + op: OpWrite, } impl AzblobWriter { - pub fn new(core: Arc<AzblobCore>, op: OpWrite, path: String) -> Self { + pub fn new(core: Arc<AzblobCore>, path: &str, op: OpWrite) -> Self { AzblobWriter { core, + path: path.to_string(), op, - path, - position: None, } } +} - async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { +#[async_trait] +impl oio::OneShotWrite for AzblobWriter { + async fn write_once(&self, stream: Streamer) -> Result<()> { let mut req = self.core.azblob_put_blob_request( &self.path, - Some(size), + Some(stream.size()), self.op.content_type(), self.op.cache_control(), - body, + AsyncBody::Stream(stream), )?; self.core.sign(&mut req).await?; @@ -71,13 +74,11 @@ impl AzblobWriter { _ => Err(parse_error(resp).await?), } } +} - async fn current_position(&mut self) -> Result<Option<u64>> { - if let Some(v) = self.position { - return Ok(Some(v)); - } - - // TODO: we should check with current etag to make sure file not changed. +#[async_trait] +impl oio::AppendObjectWrite for AzblobWriter { + async fn offset(&self) -> Result<u64> { let resp = self .core .azblob_get_blob_properties(&self.path, None, None) @@ -86,9 +87,6 @@ impl AzblobWriter { let status = resp.status(); match status { - // Just check the blob type. - // If it is not an appendable blob, return an error. - // We can not get the append position of the blob here. StatusCode::OK => { let headers = resp.headers(); let blob_type = headers.get(X_MS_BLOB_TYPE).and_then(|v| v.to_str().ok()); @@ -98,43 +96,44 @@ impl AzblobWriter { "the blob is not an appendable blob.", )); } - Ok(None) - } - // If the blob is not existing, we need to create one. - StatusCode::NOT_FOUND => { - let mut req = self.core.azblob_init_appendable_blob_request( - &self.path, - self.op.content_type(), - self.op.cache_control(), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.client.send(req).await?; - - let status = resp.status(); - match status { - StatusCode::CREATED => { - // do nothing - } - _ => { - return Err(parse_error(resp).await?); - } - } - self.position = Some(0); - Ok(Some(0)) + parse_content_length(headers).map(|v| v.unwrap_or_default()) } + // Return 0 if the blob is not existing. + StatusCode::NOT_FOUND => Ok(0), _ => Err(parse_error(resp).await?), } } - async fn append_oneshot(&mut self, size: u64, body: AsyncBody) -> Result<()> { - let _ = self.current_position().await?; + async fn append(&self, offset: u64, stream: Streamer) -> Result<()> { + // Init appendable blob if we are writing to a file with offset 0. + if offset == 0 { + let mut req = self.core.azblob_init_appendable_blob_request( + &self.path, + self.op.content_type(), + self.op.cache_control(), + )?; + + self.core.sign(&mut req).await?; + + let resp = self.core.client.send(req).await?; + let status = resp.status(); + match status { + StatusCode::CREATED => { + // do nothing + } + _ => { + return Err(parse_error(resp).await?); + } + } + } - let mut req = - self.core - .azblob_append_blob_request(&self.path, size, self.position, body)?; + let mut req = self.core.azblob_append_blob_request( + &self.path, + stream.size(), + Some(offset), + AsyncBody::Stream(stream), + )?; self.core.sign(&mut req).await?; @@ -142,45 +141,8 @@ impl AzblobWriter { let status = resp.status(); match status { - StatusCode::CREATED => { - let headers = resp.headers(); - let position = headers - .get(X_MS_BLOB_APPEND_OFFSET) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::<u64>().ok()); - self.position = position.map(|v| v + size); - } - _ => { - return Err(parse_error(resp).await?); - } - } - - Ok(()) - } -} - -#[async_trait] -impl oio::Write for AzblobWriter { - async fn write(&mut self, s: oio::Streamer) -> Result<()> { - if self.op.append() { - self.append_oneshot(s.size(), AsyncBody::Stream(s)).await - } else { - if self.op.content_length().is_none() { - return Err(Error::new( - ErrorKind::Unsupported, - "write without content length is not supported", - )); - } - - self.write_oneshot(s.size(), AsyncBody::Stream(s)).await + StatusCode::CREATED => Ok(()), + _ => Err(parse_error(resp).await?), } } - - async fn abort(&mut self) -> Result<()> { - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } }
