This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new d58d90eae feat: Implement concurrent write for azdls (#6819)
d58d90eae is described below
commit d58d90eaee0994df5239c9d6ec382d68d8f60103
Author: Xuanwo <[email protected]>
AuthorDate: Tue Nov 25 20:37:11 2025 +0800
feat: Implement concurrent write for azdls (#6819)
* feat: Implement concurrent write for azdls
Signed-off-by: Xuanwo <[email protected]>
* Allow multi
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Don't support abort
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/oio/write/position_write.rs | 12 +++--
core/src/services/azdls/backend.rs | 19 ++++---
core/src/services/azdls/core.rs | 48 +++++++++++++++--
core/src/services/azdls/writer.rs | 89 ++++++++++++++++++++------------
core/src/services/fs/writer.rs | 2 +-
5 files changed, 120 insertions(+), 50 deletions(-)
diff --git a/core/src/raw/oio/write/position_write.rs
b/core/src/raw/oio/write/position_write.rs
index 45897ce61..37c8ad686 100644
--- a/core/src/raw/oio/write/position_write.rs
+++ b/core/src/raw/oio/write/position_write.rs
@@ -51,8 +51,8 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static {
buf: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;
- /// close is used to close the underlying file.
- fn close(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
+ /// close is used to close the underlying file with known final size.
+ fn close(&self, size: u64) -> impl Future<Output = Result<Metadata>> +
MaybeSend;
/// abort is used to abort the underlying abort.
fn abort(&self) -> impl Future<Output = Result<()>> + MaybeSend;
@@ -155,10 +155,12 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
- self.w.write_all_at(offset, buffer).await?;
+ self.w.write_all_at(offset, buffer.clone()).await?;
self.cache = None;
+ self.next_offset += buffer.len() as u64;
}
- self.w.close().await
+ let final_size = self.next_offset;
+ self.w.close(final_size).await
}
async fn abort(&mut self) -> Result<()> {
@@ -227,7 +229,7 @@ mod tests {
Ok(())
}
- async fn close(&self) -> Result<Metadata> {
+ async fn close(&self, _size: u64) -> Result<Metadata> {
Ok(Metadata::default())
}
diff --git a/core/src/services/azdls/backend.rs
b/core/src/services/azdls/backend.rs
index 989a16b36..8ea6c51b6 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -291,6 +291,7 @@ impl Builder for AzdlsBuilder {
write: true,
write_can_append: true,
+ write_can_multi: true,
write_with_if_none_match: true,
write_with_if_not_exists: true,
@@ -378,13 +379,17 @@ impl Access for AzdlsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let w = AzdlsWriter::new(self.core.clone(), args.clone(),
path.to_string());
- let w = if args.append() {
- AzdlsWriters::Two(oio::AppendWriter::new(w))
- } else {
- AzdlsWriters::One(oio::OneShotWriter::new(w))
- };
- Ok((RpWrite::default(), w))
+ if args.append() {
+ let w = AzdlsWriter::new(self.core.clone(), args.clone(),
path.to_string());
+ return Ok((
+ RpWrite::default(),
+ AzdlsWriters::Two(oio::AppendWriter::new(w)),
+ ));
+ }
+
+ let w = AzdlsWriter::create(self.core.clone(), args.clone(),
path.to_string()).await?;
+ let w = oio::PositionWriter::new(self.info().clone(), w,
args.concurrent());
+ Ok((RpWrite::default(), AzdlsWriters::One(w)))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs
index 5a194cc97..e88b757cb 100644
--- a/core/src/services/azdls/core.rs
+++ b/core/src/services/azdls/core.rs
@@ -207,25 +207,32 @@ impl AzdlsCore {
}
/// ref:
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
- pub async fn azdls_update(
+ pub async fn azdls_append(
&self,
path: &str,
size: Option<u64>,
position: u64,
+ flush: bool,
+ close: bool,
body: Buffer,
) -> Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path);
- // - close: Make this is the final action to this file.
- // - flush: Flush the file directly.
- let url = format!(
- "{}/{}/{}?action=append&close=true&flush=true&position={}",
+ let mut url = format!(
+ "{}/{}/{}?action=append&position={}",
self.endpoint,
self.filesystem,
percent_encode_path(&p),
position
);
+ if flush {
+ url.push_str("&flush=true");
+ }
+ if close {
+ url.push_str("&close=true");
+ }
+
let mut req = Request::patch(&url);
if let Some(size) = size {
@@ -241,6 +248,37 @@ impl AzdlsCore {
self.send(req).await
}
+ /// Flush pending data appended by [`azdls_append`].
+ pub async fn azdls_flush(
+ &self,
+ path: &str,
+ position: u64,
+ close: bool,
+ ) -> Result<Response<Buffer>> {
+ let p = build_abs_path(&self.root, path);
+
+ let mut url = format!(
+ "{}/{}/{}?action=flush&position={}",
+ self.endpoint,
+ self.filesystem,
+ percent_encode_path(&p),
+ position
+ );
+
+ if close {
+ url.push_str("&close=true");
+ }
+
+ let mut req = Request::patch(&url)
+ .header(CONTENT_LENGTH, 0)
+ .extension(Operation::Write)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
pub async fn azdls_get_properties(&self, path: &str) ->
Result<Response<Buffer>> {
let p = build_abs_path(&self.root, path)
.trim_end_matches('/')
diff --git a/core/src/services/azdls/writer.rs
b/core/src/services/azdls/writer.rs
index e496bf14d..d46e8e792 100644
--- a/core/src/services/azdls/writer.rs
+++ b/core/src/services/azdls/writer.rs
@@ -26,18 +26,37 @@ use super::error::parse_error;
use crate::raw::*;
use crate::*;
-pub type AzdlsWriters = TwoWays<oio::OneShotWriter<AzdlsWriter>,
oio::AppendWriter<AzdlsWriter>>;
+/// Writer type for azdls: non-append uses PositionWriter, append uses
AppendWriter.
+pub type AzdlsWriters = TwoWays<oio::PositionWriter<AzdlsWriter>,
oio::AppendWriter<AzdlsWriter>>;
+#[derive(Clone)]
pub struct AzdlsWriter {
core: Arc<AzdlsCore>,
-
op: OpWrite,
path: String,
}
impl AzdlsWriter {
pub fn new(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Self {
- AzdlsWriter { core, op, path }
+ Self { core, op, path }
+ }
+
+ pub async fn create(core: Arc<AzdlsCore>, op: OpWrite, path: String) ->
Result<Self> {
+ let writer = Self::new(core, op, path);
+ writer.create_if_needed().await?;
+ Ok(writer)
+ }
+
+ async fn create_if_needed(&self) -> Result<()> {
+ let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
+ match resp.status() {
+ StatusCode::CREATED | StatusCode::OK => Ok(()),
+ StatusCode::CONFLICT if self.op.if_not_exists() => {
+
Err(parse_error(resp).with_operation("Backend::azdls_create_request"))
+ }
+ StatusCode::CONFLICT => Ok(()),
+ _ =>
Err(parse_error(resp).with_operation("Backend::azdls_create_request")),
+ }
}
fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
@@ -59,29 +78,39 @@ impl AzdlsWriter {
}
}
-impl oio::OneShotWrite for AzdlsWriter {
- async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
- let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
-
- let status = resp.status();
- match status {
- StatusCode::CREATED | StatusCode::OK => {}
- _ => {
- return
Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
- }
- }
-
+impl oio::PositionWrite for AzdlsWriter {
+ async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
+ let size = buf.len() as u64;
let resp = self
.core
- .azdls_update(&self.path, Some(bs.len() as u64), 0, bs)
+ .azdls_append(&self.path, Some(size), offset, false, false, buf)
.await?;
- let status = resp.status();
- match status {
- StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()),
- _ =>
Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
+ match resp.status() {
+ StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
+ _ =>
Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
}
}
+
+ async fn close(&self, size: u64) -> Result<Metadata> {
+ // Flush accumulated appends once.
+ let resp = self.core.azdls_flush(&self.path, size, true).await?;
+
+ let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
+ meta.set_content_length(size);
+
+ match resp.status() {
+ StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
+ _ =>
Err(parse_error(resp).with_operation("Backend::azdls_flush_request")),
+ }
+ }
+
+ async fn abort(&self) -> Result<()> {
+ Err(Error::new(
+ ErrorKind::Unsupported,
+ "Abort is not supported for azdls writer",
+ ))
+ }
}
impl oio::AppendWrite for AzdlsWriter {
@@ -100,19 +129,14 @@ impl oio::AppendWrite for AzdlsWriter {
async fn append(&self, offset: u64, size: u64, body: Buffer) ->
Result<Metadata> {
if offset == 0 {
- let resp = self.core.azdls_create(&self.path, FILE,
&self.op).await?;
- let status = resp.status();
- match status {
- StatusCode::CREATED | StatusCode::OK => {}
- _ => {
- return
Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
- }
- }
+ // Only create when starting a new file; avoid 404 when appending
to a non-existent path.
+ self.create_if_needed().await?;
}
+ // append + flush in a single request to minimize roundtrips for
append mode.
let resp = self
.core
- .azdls_update(&self.path, Some(size), offset, body)
+ .azdls_append(&self.path, Some(size), offset, true, false, body)
.await?;
let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
@@ -120,10 +144,11 @@ impl oio::AppendWrite for AzdlsWriter {
if let Some(md5) = md5 {
meta.set_content_md5(md5);
}
- let status = resp.status();
- match status {
+ meta.set_content_length(offset + size);
+
+ match resp.status() {
StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
- _ =>
Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
+ _ =>
Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
}
}
}
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 87374e7b2..7e58e047f 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -155,7 +155,7 @@ impl oio::PositionWrite for FsWriter {
.map_err(new_task_join_error)?
}
- async fn close(&self) -> Result<Metadata> {
+ async fn close(&self, _size: u64) -> Result<Metadata> {
let mut f = self
.f
.try_clone()