This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 14ae1419 feat(service/fs): add append support for fs (#2296)
14ae1419 is described below
commit 14ae1419198fdcb0cb7f4441107f40f2f412a013
Author: Suyan <[email protected]>
AuthorDate: Tue May 23 17:29:30 2023 +0800
feat(service/fs): add append support for fs (#2296)
* feat(service/fs): add append support for fs
Signed-off-by: suyanhanx <[email protected]>
* fix use
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
core/src/services/fs/{mod.rs => appender.rs} | 37 ++++++++++++++++++++++++----
core/src/services/fs/backend.rs | 20 ++++++++++++++-
core/src/services/fs/mod.rs | 1 +
3 files changed, 52 insertions(+), 6 deletions(-)
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/appender.rs
similarity index 56%
copy from core/src/services/fs/mod.rs
copy to core/src/services/fs/appender.rs
index aa2a5fca..8b0eca96 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/appender.rs
@@ -15,9 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::FsBuilder as Fs;
+use async_trait::async_trait;
+use bytes::Bytes;
-mod error;
-mod pager;
-mod writer;
+use tokio::io::AsyncWriteExt;
+
+use super::error::parse_io_error;
+use crate::raw::*;
+use crate::*;
+
+pub struct FsAppender<F> {
+ f: F,
+}
+
+impl<F> FsAppender<F> {
+ pub fn new(f: F) -> Self {
+ Self { f }
+ }
+}
+
+#[async_trait]
+impl oio::Append for FsAppender<tokio::fs::File> {
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ self.f.write_all(&bs).await.map_err(parse_io_error)?;
+
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.f.sync_all().await.map_err(parse_io_error)?;
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 3e914f55..51585aa5 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -27,6 +27,7 @@ use chrono::DateTime;
use log::debug;
use uuid::Uuid;
+use super::appender::FsAppender;
use super::error::parse_io_error;
use super::pager::FsPager;
use super::writer::FsWriter;
@@ -43,6 +44,7 @@ use crate::*;
/// - [x] stat
/// - [x] read
/// - [x] write
+/// - [x] append
/// - [x] create_dir
/// - [x] delete
/// - [x] copy
@@ -296,7 +298,7 @@ impl Accessor for FsBackend {
type BlockingReader = oio::into_blocking_reader::FdReader<std::fs::File>;
type Writer = FsWriter<tokio::fs::File>;
type BlockingWriter = FsWriter<std::fs::File>;
- type Appender = ();
+ type Appender = FsAppender<tokio::fs::File>;
type Pager = Option<FsPager<tokio::fs::ReadDir>>;
type BlockingPager = Option<FsPager<std::fs::ReadDir>>;
@@ -316,6 +318,8 @@ impl Accessor for FsBackend {
create_dir: true,
delete: true,
+ append: true,
+
list: true,
list_with_delimiter_slash: true,
@@ -434,6 +438,20 @@ impl Accessor for FsBackend {
Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f)))
}
+ async fn append(&self, path: &str, _: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ let path = Self::ensure_write_abs_path(&self.root, path).await?;
+
+ let f = tokio::fs::OpenOptions::new()
+ .create(true)
+ .write(true)
+ .append(true)
+ .open(&path)
+ .await
+ .map_err(parse_io_error)?;
+
+ Ok((RpAppend::new(), FsAppender::new(f)))
+ }
+
async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
let from = self.root.join(from.trim_end_matches('/'));
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs
index aa2a5fca..81e92090 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::FsBuilder as Fs;
+mod appender;
mod error;
mod pager;
mod writer;