This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 3c98470abd8418d2b91fa85de2082ca58bed4120 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:47:28 2023 +0800 Migrate sftp Signed-off-by: Xuanwo <[email protected]> --- core/src/services/onedrive/writer.rs | 4 ---- core/src/services/sftp/writer.rs | 18 +++++++----------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index b616c46e8..c1abb2230 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -16,14 +16,10 @@ // under the License. use async_trait::async_trait; -use bytes::Buf; -use bytes::Bytes; use http::StatusCode; use super::backend::OnedriveBackend; use super::error::parse_error; -use super::graph_model::OneDriveUploadSessionCreationRequestBody; -use super::graph_model::OneDriveUploadSessionCreationResponseBody; use crate::raw::oio::Stream; use crate::raw::*; use crate::*; diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index ad1447d10..28a49856e 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -20,6 +20,7 @@ use bytes::Bytes; use openssh_sftp_client::file::File; use crate::raw::oio; +use crate::raw::oio::StreamExt; use crate::Error; use crate::ErrorKind; use crate::Result; @@ -32,21 +33,16 @@ impl SftpWriter { pub fn new(file: File) -> Self { SftpWriter { file } } - - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.file.write_all(&bs).await?; - - Ok(()) - } } #[async_trait] impl oio::Write for SftpWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn write(&mut self, mut s: oio::Streamer) -> Result<()> { + while let Some(bs) = s.next().await.transpose()? { + self.file.write_all(&bs).await?; + } + + Ok(()) } async fn abort(&mut self) -> Result<()> {
