This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c0205fdd6 feat(services/fs): impl Sink for Fs (#2626)
c0205fdd6 is described below

commit c0205fdd612307612012377919f88e85cb379e7a
Author: xyJi <[email protected]>
AuthorDate: Thu Jul 13 16:19:55 2023 +0800

    feat(services/fs): impl Sink for Fs (#2626)
    
    * impl Sink for Fs
    
    * rename
    
    * upd
---
 core/src/services/fs/backend.rs |  1 +
 core/src/services/fs/writer.rs  | 18 +++++++++++++-----
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6fcc9ffa6..505fda60d 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -264,6 +264,7 @@ impl Accessor for FsBackend {
                 read_with_range: true,
 
                 write: true,
+                write_can_sink: true,
                 write_without_content_length: true,
                 create_dir: true,
                 delete: true,
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index afa02fa2e..4d31444af 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -22,6 +22,7 @@ use std::path::PathBuf;
 
 use async_trait::async_trait;
 use bytes::Bytes;
+use futures::StreamExt;
 use tokio::io::AsyncSeekExt;
 use tokio::io::AsyncWriteExt;
 
@@ -64,11 +65,18 @@ impl oio::Write for FsWriter<tokio::fs::File> {
         Ok(())
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+        while let Some(bs) = s.next().await {
+            let bs = bs?;
+            self.f
+                .seek(SeekFrom::Start(self.pos))
+                .await
+                .map_err(parse_io_error)?;
+            self.f.write_all(&bs).await.map_err(parse_io_error)?;
+            self.pos += bs.len() as u64;
+        }
+
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to