This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 4238621dbba8bb9bfe11f797907e77485db38868
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:18:48 2023 +0800

    Migrate dropbox
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/dropbox/backend.rs | 32 +++++++++++++++++++++-------
 core/src/services/dropbox/core.rs    |  2 +-
 core/src/services/dropbox/writer.rs  | 41 +++++++++++++++---------------------
 3 files changed, 42 insertions(+), 33 deletions(-)

diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index ebb468728..a0315bed5 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -31,6 +31,7 @@ use super::error::parse_error;
 use super::writer::DropboxWriter;
 use crate::raw::*;
 use crate::services::dropbox::error::DropboxErrorResponse;
+use crate::services::dropbox::writer::DropboxWriters;
 use crate::*;
 
 static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
@@ -49,7 +50,7 @@ pub struct DropboxBackend {
 impl Accessor for DropboxBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = DropboxWriter;
+    type Writer = oio::TwoWaysWriter<DropboxWriters, 
oio::AtLeastBufWriter<DropboxWriters>>;
     type BlockingWriter = ();
     type Pager = ();
     type BlockingPager = ();
@@ -106,16 +107,31 @@ impl Accessor for DropboxBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        if args.content_length().is_none() {
+        let writer = DropboxWriter::new(self.core.clone(), path, args.clone());
+
+        let w = if args.content_length().is_some() {
+            oio::OneShotWriter::new(writer)
+        } else if args.append() {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "write without content length is not supported",
+                "dropbox write with append is not supported yet.",
             ));
-        }
-        Ok((
-            RpWrite::default(),
-            DropboxWriter::new(self.core.clone(), args, String::from(path)),
-        ))
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "dropbox write without content-length is not supported yet",
+            ));
+        };
+
+        let w = if let Some(buffer_size) = args.buffer_size() {
+            oio::TwoWaysWriter::Two(
+                oio::AtLeastBufWriter::new(w, 
buffer_size).with_total_size(args.content_length()),
+            )
+        } else {
+            oio::TwoWaysWriter::One(w)
+        };
+
+        Ok((RpWrite::default(), w))
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index 4b3fba758..81d68563f 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -97,7 +97,7 @@ impl DropboxCore {
     pub async fn dropbox_update(
         &self,
         path: &str,
-        size: Option<usize>,
+        size: Option<u64>,
         content_type: Option<&str>,
         body: AsyncBody,
     ) -> Result<Response<IncomingAsyncBody>> {
diff --git a/core/src/services/dropbox/writer.rs 
b/core/src/services/dropbox/writer.rs
index 72309ff0d..a43c89f5a 100644
--- a/core/src/services/dropbox/writer.rs
+++ b/core/src/services/dropbox/writer.rs
@@ -23,31 +23,42 @@ use http::StatusCode;
 
 use super::core::DropboxCore;
 use super::error::parse_error;
+use crate::raw::oio::Streamer;
 use crate::raw::*;
 use crate::*;
 
+pub type DropboxWriters = oio::OneShotWriter<DropboxWriter>;
+
 pub struct DropboxWriter {
     core: Arc<DropboxCore>,
-    op: OpWrite,
     path: String,
+    op: OpWrite,
 }
 
 impl DropboxWriter {
-    pub fn new(core: Arc<DropboxCore>, op: OpWrite, path: String) -> Self {
-        DropboxWriter { core, op, path }
+    pub fn new(core: Arc<DropboxCore>, path: &str, op: OpWrite) -> Self {
+        DropboxWriter {
+            core,
+            path: path.to_string(),
+            op,
+        }
     }
+}
 
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+#[async_trait]
+impl oio::OneShotWrite for DropboxWriter {
+    async fn write_once(&self, stream: Streamer) -> Result<()> {
         let resp = self
             .core
             .dropbox_update(
                 &self.path,
-                Some(bs.len()),
+                Some(stream.size()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Stream(stream),
             )
             .await?;
         let status = resp.status();
+
         match status {
             StatusCode::OK => {
                 resp.into_body().consume().await?;
@@ -57,21 +68,3 @@ impl DropboxWriter {
         }
     }
 }
-
-#[async_trait]
-impl oio::Write for DropboxWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        Ok(())
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        Ok(())
-    }
-}

Reply via email to