This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch azdls_update_file
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit 21f7f1eff2ce7812dd927a62441694be50e55fa8
Author: Xuanwo <[email protected]>
AuthorDate: Tue Nov 25 17:54:14 2025 +0800

    feat: Implement concurrent write for azdls
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/write/position_write.rs | 12 +++--
 core/src/services/azdls/backend.rs       | 16 ++++---
 core/src/services/azdls/core.rs          | 48 +++++++++++++++++--
 core/src/services/azdls/writer.rs        | 79 ++++++++++++++++++++------------
 core/src/services/fs/writer.rs           |  2 +-
 5 files changed, 110 insertions(+), 47 deletions(-)

diff --git a/core/src/raw/oio/write/position_write.rs 
b/core/src/raw/oio/write/position_write.rs
index 45897ce61..37c8ad686 100644
--- a/core/src/raw/oio/write/position_write.rs
+++ b/core/src/raw/oio/write/position_write.rs
@@ -51,8 +51,8 @@ pub trait PositionWrite: Send + Sync + Unpin + 'static {
         buf: Buffer,
     ) -> impl Future<Output = Result<()>> + MaybeSend;
 
-    /// close is used to close the underlying file.
-    fn close(&self) -> impl Future<Output = Result<Metadata>> + MaybeSend;
+    /// close is used to close the underlying file with known final size.
+    fn close(&self, size: u64) -> impl Future<Output = Result<Metadata>> + 
MaybeSend;
 
     /// abort is used to abort the underlying abort.
     fn abort(&self) -> impl Future<Output = Result<()>> + MaybeSend;
@@ -155,10 +155,12 @@ impl<W: PositionWrite> oio::Write for PositionWriter<W> {
 
         if let Some(buffer) = self.cache.clone() {
             let offset = self.next_offset;
-            self.w.write_all_at(offset, buffer).await?;
+            self.w.write_all_at(offset, buffer.clone()).await?;
             self.cache = None;
+            self.next_offset += buffer.len() as u64;
         }
-        self.w.close().await
+        let final_size = self.next_offset;
+        self.w.close(final_size).await
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -227,7 +229,7 @@ mod tests {
             Ok(())
         }
 
-        async fn close(&self) -> Result<Metadata> {
+        async fn close(&self, _size: u64) -> Result<Metadata> {
             Ok(Metadata::default())
         }
 
diff --git a/core/src/services/azdls/backend.rs 
b/core/src/services/azdls/backend.rs
index 989a16b36..08468c3e7 100644
--- a/core/src/services/azdls/backend.rs
+++ b/core/src/services/azdls/backend.rs
@@ -378,13 +378,17 @@ impl Access for AzdlsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.append() {
+            let w = AzdlsWriter::new(self.core.clone(), args.clone(), 
path.to_string());
+            return Ok((
+                RpWrite::default(),
+                AzdlsWriters::Two(oio::AppendWriter::new(w)),
+            ));
+        }
+
         let w = AzdlsWriter::new(self.core.clone(), args.clone(), 
path.to_string());
-        let w = if args.append() {
-            AzdlsWriters::Two(oio::AppendWriter::new(w))
-        } else {
-            AzdlsWriters::One(oio::OneShotWriter::new(w))
-        };
-        Ok((RpWrite::default(), w))
+        let w = oio::PositionWriter::new(self.info().clone(), w, 
args.concurrent());
+        Ok((RpWrite::default(), AzdlsWriters::One(w)))
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
diff --git a/core/src/services/azdls/core.rs b/core/src/services/azdls/core.rs
index 5a194cc97..e88b757cb 100644
--- a/core/src/services/azdls/core.rs
+++ b/core/src/services/azdls/core.rs
@@ -207,25 +207,32 @@ impl AzdlsCore {
     }
 
     /// ref: 
https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update
-    pub async fn azdls_update(
+    pub async fn azdls_append(
         &self,
         path: &str,
         size: Option<u64>,
         position: u64,
+        flush: bool,
+        close: bool,
         body: Buffer,
     ) -> Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path);
 
-        // - close: Make this is the final action to this file.
-        // - flush: Flush the file directly.
-        let url = format!(
-            "{}/{}/{}?action=append&close=true&flush=true&position={}",
+        let mut url = format!(
+            "{}/{}/{}?action=append&position={}",
             self.endpoint,
             self.filesystem,
             percent_encode_path(&p),
             position
         );
 
+        if flush {
+            url.push_str("&flush=true");
+        }
+        if close {
+            url.push_str("&close=true");
+        }
+
         let mut req = Request::patch(&url);
 
         if let Some(size) = size {
@@ -241,6 +248,37 @@ impl AzdlsCore {
         self.send(req).await
     }
 
+    /// Flush pending data appended by [`azdls_append`].
+    pub async fn azdls_flush(
+        &self,
+        path: &str,
+        position: u64,
+        close: bool,
+    ) -> Result<Response<Buffer>> {
+        let p = build_abs_path(&self.root, path);
+
+        let mut url = format!(
+            "{}/{}/{}?action=flush&position={}",
+            self.endpoint,
+            self.filesystem,
+            percent_encode_path(&p),
+            position
+        );
+
+        if close {
+            url.push_str("&close=true");
+        }
+
+        let mut req = Request::patch(&url)
+            .header(CONTENT_LENGTH, 0)
+            .extension(Operation::Write)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
     pub async fn azdls_get_properties(&self, path: &str) -> 
Result<Response<Buffer>> {
         let p = build_abs_path(&self.root, path)
             .trim_end_matches('/')
diff --git a/core/src/services/azdls/writer.rs 
b/core/src/services/azdls/writer.rs
index e496bf14d..f15247dcb 100644
--- a/core/src/services/azdls/writer.rs
+++ b/core/src/services/azdls/writer.rs
@@ -26,18 +26,31 @@ use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
 
-pub type AzdlsWriters = TwoWays<oio::OneShotWriter<AzdlsWriter>, 
oio::AppendWriter<AzdlsWriter>>;
+/// Writer type for azdls: non-append uses PositionWriter, append uses 
AppendWriter.
+pub type AzdlsWriters = TwoWays<oio::PositionWriter<AzdlsWriter>, 
oio::AppendWriter<AzdlsWriter>>;
 
+#[derive(Clone)]
 pub struct AzdlsWriter {
     core: Arc<AzdlsCore>,
-
     op: OpWrite,
     path: String,
 }
 
 impl AzdlsWriter {
     pub fn new(core: Arc<AzdlsCore>, op: OpWrite, path: String) -> Self {
-        AzdlsWriter { core, op, path }
+        Self { core, op, path }
+    }
+
+    async fn ensure_created(&self) -> Result<()> {
+        let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
+
+        match resp.status() {
+            StatusCode::CREATED | StatusCode::OK => Ok(()),
+            StatusCode::CONFLICT => {
+                
Err(parse_error(resp).with_operation("Backend::azdls_create_request"))
+            }
+            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_create_request")),
+        }
     }
 
     fn parse_metadata(headers: &http::HeaderMap) -> Result<Metadata> {
@@ -59,29 +72,40 @@ impl AzdlsWriter {
     }
 }
 
-impl oio::OneShotWrite for AzdlsWriter {
-    async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
-        let resp = self.core.azdls_create(&self.path, FILE, &self.op).await?;
-
-        let status = resp.status();
-        match status {
-            StatusCode::CREATED | StatusCode::OK => {}
-            _ => {
-                return 
Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
-            }
+impl oio::PositionWrite for AzdlsWriter {
+    async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
+        if offset == 0 {
+            self.ensure_created().await?;
         }
 
+        let size = buf.len() as u64;
         let resp = self
             .core
-            .azdls_update(&self.path, Some(bs.len() as u64), 0, bs)
+            .azdls_append(&self.path, Some(size), offset, false, false, buf)
             .await?;
 
-        let status = resp.status();
-        match status {
-            StatusCode::OK | StatusCode::ACCEPTED => Ok(Metadata::default()),
-            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
+        match resp.status() {
+            StatusCode::OK | StatusCode::ACCEPTED => Ok(()),
+            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
+        }
+    }
+
+    async fn close(&self, size: u64) -> Result<Metadata> {
+        // Flush accumulated appends once.
+        let resp = self.core.azdls_flush(&self.path, size, true).await?;
+
+        let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
+        meta.set_content_length(size);
+
+        match resp.status() {
+            StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
+            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_flush_request")),
         }
     }
+
+    async fn abort(&self) -> Result<()> {
+        Ok(())
+    }
 }
 
 impl oio::AppendWrite for AzdlsWriter {
@@ -100,19 +124,13 @@ impl oio::AppendWrite for AzdlsWriter {
 
     async fn append(&self, offset: u64, size: u64, body: Buffer) -> 
Result<Metadata> {
         if offset == 0 {
-            let resp = self.core.azdls_create(&self.path, FILE, 
&self.op).await?;
-            let status = resp.status();
-            match status {
-                StatusCode::CREATED | StatusCode::OK => {}
-                _ => {
-                    return 
Err(parse_error(resp).with_operation("Backend::azdls_create_request"));
-                }
-            }
+            self.ensure_created().await?;
         }
 
+        // append + flush in a single request to minimize roundtrips for 
append mode.
         let resp = self
             .core
-            .azdls_update(&self.path, Some(size), offset, body)
+            .azdls_append(&self.path, Some(size), offset, true, false, body)
             .await?;
 
         let mut meta = AzdlsWriter::parse_metadata(resp.headers())?;
@@ -120,10 +138,11 @@ impl oio::AppendWrite for AzdlsWriter {
         if let Some(md5) = md5 {
             meta.set_content_md5(md5);
         }
-        let status = resp.status();
-        match status {
+        meta.set_content_length(offset + size);
+
+        match resp.status() {
             StatusCode::OK | StatusCode::ACCEPTED => Ok(meta),
-            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_update_request")),
+            _ => 
Err(parse_error(resp).with_operation("Backend::azdls_append_request")),
         }
     }
 }
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 87374e7b2..7e58e047f 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -155,7 +155,7 @@ impl oio::PositionWrite for FsWriter {
         .map_err(new_task_join_error)?
     }
 
-    async fn close(&self) -> Result<Metadata> {
+    async fn close(&self, _size: u64) -> Result<Metadata> {
         let mut f = self
             .f
             .try_clone()

Reply via email to