This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 42f6e98a73162405d8576412eb2e3c687f2dd340
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:14:40 2023 +0800

    Migrate azdfs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/azblob/backend.rs |  2 +-
 core/src/services/azdfs/backend.rs  | 31 ++++++++++++++++++-------
 core/src/services/azdfs/core.rs     |  2 +-
 core/src/services/azdfs/writer.rs   | 45 ++++++++++++++++---------------------
 4 files changed, 44 insertions(+), 36 deletions(-)

diff --git a/core/src/services/azblob/backend.rs 
b/core/src/services/azblob/backend.rs
index 3a437b6c8..76aa65480 100644
--- a/core/src/services/azblob/backend.rs
+++ b/core/src/services/azblob/backend.rs
@@ -613,7 +613,7 @@ impl Accessor for AzblobBackend {
         } else {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "azblob write without neither content-length nor append is not 
supported yet",
+                "azblob write with block blobs is not supported yet, refer to 
https://github.com/apache/incubator-opendal/issues/2113 for more details",
             ));
         };
 
diff --git a/core/src/services/azdfs/backend.rs 
b/core/src/services/azdfs/backend.rs
index 3882f4fe2..9edfe3e6d 100644
--- a/core/src/services/azdfs/backend.rs
+++ b/core/src/services/azdfs/backend.rs
@@ -32,6 +32,7 @@ use super::error::parse_error;
 use super::pager::AzdfsPager;
 use super::writer::AzdfsWriter;
 use crate::raw::*;
+use crate::services::azdfs::writer::AzdfsWriters;
 use crate::*;
 
 /// Known endpoint suffix Azure Data Lake Storage Gen2 URI syntax.
@@ -230,7 +231,7 @@ pub struct AzdfsBackend {
 impl Accessor for AzdfsBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = AzdfsWriter;
+    type Writer = oio::TwoWaysWriter<AzdfsWriters, 
oio::AtLeastBufWriter<AzdfsWriters>>;
     type BlockingWriter = ();
     type Pager = AzdfsPager;
     type BlockingPager = ();
@@ -296,17 +297,31 @@ impl Accessor for AzdfsBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
+        let writer = AzdfsWriter::new(self.core.clone(), path, args.clone());
+
+        let w = if args.content_length().is_some() {
+            oio::OneShotWriter::new(writer)
+        } else if args.append() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "write without content length is not supported",
+                "azdfs write with append is not supported yet, refer to 
https://github.com/apache/incubator-opendal/issues/2977 for more details",
             ));
-        }
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "azdfs write without content-length is not supported yet, 
refer to https://github.com/apache/incubator-opendal/issues/2978 for more 
details",
+            ));
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            oio::TwoWaysWriter::Two(
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length()),
+            )
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
 
-        Ok((
-            RpWrite::default(),
-            AzdfsWriter::new(self.core.clone(), args, path.to_string()),
-        ))
+        Ok((RpWrite::default(), w))
     }
 
     async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
diff --git a/core/src/services/azdfs/core.rs b/core/src/services/azdfs/core.rs
index 409216d17..61bab62b3 100644
--- a/core/src/services/azdfs/core.rs
+++ b/core/src/services/azdfs/core.rs
@@ -204,7 +204,7 @@ impl AzdfsCore {
     pub fn azdfs_update_request(
         &self,
         path: &str,
-        size: Option<usize>,
+        size: Option<u64>,
         body: AsyncBody,
     ) -> Result<Request<AsyncBody>> {
         let p = build_abs_path(&self.root, path);
diff --git a/core/src/services/azdfs/writer.rs 
b/core/src/services/azdfs/writer.rs
index e6ae3f84a..9f5d014c0 100644
--- a/core/src/services/azdfs/writer.rs
+++ b/core/src/services/azdfs/writer.rs
@@ -18,27 +18,36 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::AzdfsCore;
 use super::error::parse_error;
+use crate::raw::oio::{Stream, Streamer};
 use crate::raw::*;
 use crate::*;
 
+pub type AzdfsWriters = oio::OneShotWriter<AzdfsWriter>;
+
 pub struct AzdfsWriter {
     core: Arc<AzdfsCore>,
 
-    op: OpWrite,
     path: String,
+    op: OpWrite,
 }
 
 impl AzdfsWriter {
-    pub fn new(core: Arc<AzdfsCore>, op: OpWrite, path: String) -> Self {
-        AzdfsWriter { core, op, path }
+    pub fn new(core: Arc<AzdfsCore>, path: &str, op: OpWrite) -> Self {
+        AzdfsWriter {
+            core,
+            path: path.to_string(),
+            op,
+        }
     }
+}
 
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+#[async_trait]
+impl oio::OneShotWrite for AzdfsWriter {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let mut req = self.core.azdfs_create_request(
             &self.path,
             "file",
@@ -63,9 +72,11 @@ impl AzdfsWriter {
             }
         }
 
-        let mut req =
-            self.core
-                .azdfs_update_request(&self.path, Some(bs.len()), 
AsyncBody::Bytes(bs))?;
+        let mut req = self.core.azdfs_update_request(
+            &self.path,
+            Some(stream.size()),
+            AsyncBody::Stream(stream),
+        )?;
 
         self.core.sign(&mut req).await?;
 
@@ -83,21 +94,3 @@ impl AzdfsWriter {
         }
     }
 }
-
-#[async_trait]
-impl oio::Write for AzdfsWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        Ok(())
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        Ok(())
-    }
-}

Reply via email to