This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch azdls_update_file in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 21f7f1eff2ce7812dd927a62441694be50e55fa8 Author: Xuanwo <[email protected]> AuthorDate: Tue Nov 25 17:54:14 2025 +0800 feat: Implement concurrent write for azdls Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/oio/write/position_write.rs | 12 +++-- core/src/services/azdls/backend.rs | 16 ++++--- core/src/services/azdls/core.rs | 48 +++++++++++++++++-- core/src/services/azdls/writer.rs | 79 ++++++++++++++++++++------------ core/src/services/fs/writer.rs | 2 +- 5 files changed, 110 insertions(+), 47 deletions(-) diff --git a/core/src/raw/oio/write/position_write.rs b/core/src/raw/oio/write/position_write.rs index 45897ce61..37c8ad686 100644 --- a/core/src/raw/oio/write/position_write.rs +++ b/core/src/raw/oio/write/position_write.rs @@ -51,8 +51,8 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static { buf: Buffer, ) -> impl Future<Output = Result<()>> + MaybeSend; - /// close is used to close the underlying file. - fn close(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend; + /// close is used to close the underlying file with known final size. + fn close(&self, size: u64) -> impl Future<Output = Result<Metadata>> + MaybeSend; /// abort is used to abort the underlying abort. fn abort(&self) -> impl Future<Output = Result<()>> + MaybeSend; @@ -155,10 +155,12 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> { if let Some(buffer) = self.cache.clone() { let offset = self.next_offset; - self.w.write_all_at(offset, buffer).await?; + self.w.write_all_at(offset, buffer.clone()).await?; self.cache = None; + self.next_offset += buffer.len() as u64; } - self.w.close().await + let final_size = self.next_offset; + self.w.close(final_size).await } async fn abort(&mut self) -> Result<()> { @@ -227,7 +229,7 @@ mod tests { Ok(()) } - async fn close(&self) -> Result<Metadata> { + async fn close(&self, _size: u64) -> Result<Metadata> { Ok(Metadata::default()) } diff --git a/core/src/services/azdls/backend.rs b/core/src/services/azdls/backend.rs index 989a16b36..08468c3e7 100644 --- a/core/src/services/azdls/backend.rs +++ b/core/src/services/azdls/backend.rs @@ -378,13 +378,17 @@ impl Access for AzdlsBackend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + if args.append() { + let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string()); + return Ok(( + RpWrite::default(), + AzdlsWriters::Two(oio::AppendWriter::new(w)), + )); + } + let w = AzdlsWriter::new(self.core.clone(), args.clone(), path.to_string()); - let w = if args.append() { - AzdlsWriters::Two(oio::AppendWriter::new(w)) - } else { - AzdlsWriters::One(oio::OneShotWriter::new(w)) - }; - Ok((RpWrite::default(), w)) + let w = oio::PositionWriter::new(self.info().clone(), w, args.concurrent()); + Ok((RpWrite::default(), AzdlsWriters::One(w))) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs index 5a194cc97..e88b757cb 100644 --- a/core/src/services/azdls/core.rs +++ b/core/src/services/azdls/core.rs @@ -207,25 +207,32 @@ impl AzdlsCore { } /// ref: https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update - pub async fn azdls_update( + pub async fn azdls_append( &self, path: &str, size: Option<u64>, position: u64, + flush: bool, + close: bool, body: Buffer, ) -> Result<Response<Buffer>> { let p = build_abs_path(&self.root, path); - // - close: Make this is the final action to this file. - // - flush: Flush the file directly. - let url = format!( - "{}/{}/{}?action=append&close=true&flush=true&position={}", + let mut url = format!( + "{}/{}/{}?action=append&position={}", self.endpoint, self.filesystem, percent_encode_path(&p), position ); + if flush { + url.push_str("&flush=true"); + } + if close { + url.push_str("&close=true"); + } + let mut req = Request::patch(&url); if let Some(size) = size { @@ -241,6 +248,37 @@ impl AzdlsCore { self.send(req).await } + /// Flush pending data appended by [`azdls_append`]. + pub async fn azdls_flush( + &self, + path: &str, + position: u64, + close: bool, + ) -> Result<Response<Buffer>> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/{}/{}?action=flush&position={}", + self.endpoint, + self.filesystem, + percent_encode_path(&p), + position + ); + + if close { + url.push_str("&close=true"); + } + + let mut req = Request::patch(&url) + .header(CONTENT_LENGTH, 0) + .extension(Operation::Write) + .body(Buffer::new()) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + pub async fn azdls_get_properties(&self, path: &str) -> Result<Response<Buffer>> { let p = build_abs_path(&self.root, path) .trim_end_matches('/') diff --git a/core/src/services/azdls/writer.rs b/core/src/services/azdls/writer.rs index e496bf14d..f15247dcb 100644 --- a/core/src/services/azdls/writer.rs +++ b/core/src/services/azdls/writer.rs @@ -26,18 +26,31 @@ use super::error::parse_error; use crate::raw::*; use crate::*; -pub type AzdlsWriters = TwoWays<oio::OneShotWriter<AzdlsWriter>, oio::AppendWriter<AzdlsWriter>>; +/// Writer type for azdls: non-append uses PositionWriter, append uses AppendWriter. +pub type AzdlsWriters = TwoWays<oio::PositionWriter<AzdlsWriter>, oio::AppendWriter<AzdlsWriter>>; +#[derive(Clone)] pub struct AzdlsWriter { core: Arc<AzdlsCore>, - op: OpWrite, path: String, } impl AzdlsWriter { pub fn new(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Self { - AzdlsWriter { core, op, path } + Self { core, op, path } + } + + async fn ensure_created(&self) -> Result<()> { + let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?; + + match resp.status() { + StatusCode::CREATED | StatusCode::OK => Ok(()), + StatusCode::CONFLICT => { + Err(parse_error(resp).with_operation("Backend::azdls_create_request")) + } + _ => Err(parse_error(resp).with_operation("Backend::azdls_create_request")), + } } fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> { @@ -59,29 +72,40 @@ impl AzdlsWriter { } } -impl oio::OneShotWrite for AzdlsWriter { - async fn write_once(&self, bs: Buffer) -> Result<Metadata> { - let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?; - - let status = resp.status(); - match status { - StatusCode::CREATED | StatusCode::OK => {} - _ => { - return Err(parse_error(resp).with_operation("Backend::azdls_create_request")); - } +impl oio::PositionWrite for AzdlsWriter { + async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> { + if offset == 0 { + self.ensure_created().await?; } + let size = buf.len() as u64; let resp = self .core - .azdls_update(&self.path, Some(bs.len() as u64), 0, bs) + .azdls_append(&self.path, Some(size), offset, false, false, buf) .await?; - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()), - _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")), + match resp.status() { + StatusCode::OK | StatusCode::ACCEPTED => Ok(()), + _ => Err(parse_error(resp).with_operation("Backend::azdls_append_request")), + } + } + + async fn close(&self, size: u64) -> Result<Metadata> { + // Flush accumulated appends once. + let resp = self.core.azdls_flush(&self.path, size, true).await?; + + let mut meta = AzdlsWriter::parse_metadata(resp.headers())?; + meta.set_content_length(size); + + match resp.status() { + StatusCode::OK | StatusCode::ACCEPTED => Ok(meta), + _ => Err(parse_error(resp).with_operation("Backend::azdls_flush_request")), } } + + async fn abort(&self) -> Result<()> { + Ok(()) + } } impl oio::AppendWrite for AzdlsWriter { @@ -100,19 +124,13 @@ impl oio::AppendWrite for AzdlsWriter { async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result<Metadata> { if offset == 0 { - let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?; - let status = resp.status(); - match status { - StatusCode::CREATED | StatusCode::OK => {} - _ => { - return Err(parse_error(resp).with_operation("Backend::azdls_create_request")); - } - } + self.ensure_created().await?; } + // append + flush in a single request to minimize roundtrips for append mode. let resp = self .core - .azdls_update(&self.path, Some(size), offset, body) + .azdls_append(&self.path, Some(size), offset, true, false, body) .await?; let mut meta = AzdlsWriter::parse_metadata(resp.headers())?; @@ -120,10 +138,11 @@ impl oio::AppendWrite for AzdlsWriter { if let Some(md5) = md5 { meta.set_content_md5(md5); } - let status = resp.status(); - match status { + meta.set_content_length(offset + size); + + match resp.status() { StatusCode::OK | StatusCode::ACCEPTED => Ok(meta), - _ => Err(parse_error(resp).with_operation("Backend::azdls_update_request")), + _ => Err(parse_error(resp).with_operation("Backend::azdls_append_request")), } } } diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 87374e7b2..7e58e047f 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -155,7 +155,7 @@ impl oio::PositionWrite for FsWriter { .map_err(new_task_join_error)? } - async fn close(&self) -> Result<Metadata> { + async fn close(&self, _size: u64) -> Result<Metadata> { let mut f = self .f .try_clone()
