This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b23359c756405526a3cc335403093b2d4c308c9b Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:21:43 2023 +0800 Mirgrate fs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/fs/backend.rs | 17 +++++++++++++++-- core/src/services/fs/writer.rs | 5 +---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index f94f59d96..d1159b736 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -245,7 +245,10 @@ impl FsBackend { impl Accessor for FsBackend { type Reader = oio::FromFileReader<Compat<tokio::fs::File>>; type BlockingReader = oio::FromFileReader<std::fs::File>; - type Writer = FsWriter<tokio::fs::File>; + type Writer = oio::TwoWaysWriter< + FsWriter<tokio::fs::File>, + oio::AtLeastBufWriter<FsWriter<tokio::fs::File>>, + >; type BlockingWriter = FsWriter<std::fs::File>; type Pager = Option<FsPager<tokio::fs::ReadDir>>; type BlockingPager = Option<FsPager<std::fs::ReadDir>>; @@ -389,7 +392,17 @@ impl Accessor for FsBackend { .await .map_err(parse_io_error)?; - Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) + let w = FsWriter::new(target_path, tmp_path, f); + + let w = if let Some(buffer_size) = op.buffer_size() { + oio::TwoWaysWriter::Two( + oio::AtLeastBufWriter::new(w, buffer_size).with_total_size(op.content_length()), + ) + } else { + oio::TwoWaysWriter::One(w) + }; + + Ok((RpWrite::new(), w)) } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index 395261c63..a897c3f00 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -65,10 +65,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { } async fn abort(&mut self) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "output writer doesn't support abort", - )) + Ok(()) } async fn close(&mut self) -> Result<()> {
