This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c0205fdd6 feat(services/fs): impl Sink for Fs (#2626)
c0205fdd6 is described below
commit c0205fdd612307612012377919f88e85cb379e7a
Author: xyJi <[email protected]>
AuthorDate: Thu Jul 13 16:19:55 2023 +0800
feat(services/fs): impl Sink for Fs (#2626)
* impl Sink for Fs
* rename
* upd
---
core/src/services/fs/backend.rs | 1 +
core/src/services/fs/writer.rs | 18 +++++++++++++-----
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 6fcc9ffa6..505fda60d 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -264,6 +264,7 @@ impl Accessor for FsBackend {
read_with_range: true,
write: true,
+ write_can_sink: true,
write_without_content_length: true,
create_dir: true,
delete: true,
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index afa02fa2e..4d31444af 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -22,6 +22,7 @@ use std::path::PathBuf;
use async_trait::async_trait;
use bytes::Bytes;
+use futures::StreamExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
@@ -64,11 +65,18 @@ impl oio::Write for FsWriter<tokio::fs::File> {
Ok(())
}
- async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
+ async fn sink(&mut self, _size: u64, mut s: oio::Streamer) -> Result<()> {
+ while let Some(bs) = s.next().await {
+ let bs = bs?;
+ self.f
+ .seek(SeekFrom::Start(self.pos))
+ .await
+ .map_err(parse_io_error)?;
+ self.f.write_all(&bs).await.map_err(parse_io_error)?;
+ self.pos += bs.len() as u64;
+ }
+
+ Ok(())
}
async fn abort(&mut self) -> Result<()> {