This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 3c98470abd8418d2b91fa85de2082ca58bed4120
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:47:28 2023 +0800

    Migrate sftp
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/onedrive/writer.rs |  4 ----
 core/src/services/sftp/writer.rs     | 18 +++++++-----------
 2 files changed, 7 insertions(+), 15 deletions(-)

diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index b616c46e8..c1abb2230 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,14 +16,10 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Buf;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::backend::OnedriveBackend;
 use super::error::parse_error;
-use super::graph_model::OneDriveUploadSessionCreationRequestBody;
-use super::graph_model::OneDriveUploadSessionCreationResponseBody;
 use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs
index ad1447d10..28a49856e 100644
--- a/core/src/services/sftp/writer.rs
+++ b/core/src/services/sftp/writer.rs
@@ -20,6 +20,7 @@ use bytes::Bytes;
 use openssh_sftp_client::file::File;
 
 use crate::raw::oio;
+use crate::raw::oio::StreamExt;
 use crate::Error;
 use crate::ErrorKind;
 use crate::Result;
@@ -32,21 +33,16 @@ impl SftpWriter {
     pub fn new(file: File) -> Self {
         SftpWriter { file }
     }
-
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.file.write_all(&bs).await?;
-
-        Ok(())
-    }
 }
 
 #[async_trait]
 impl oio::Write for SftpWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    async fn write(&mut self, mut s: oio::Streamer) -> Result<()> {
+        while let Some(bs) = s.next().await.transpose()? {
+            self.file.write_all(&bs).await?;
+        }
+
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to