This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8027430dba feat(services/compfs): implement auxiliary functions (#4778)
8027430dba is described below
commit 8027430dba32cc57311e30bd925853a1c94f8ae2
Author: Pop <[email protected]>
AuthorDate: Thu Jun 20 19:24:54 2024 +0900
feat(services/compfs): implement auxiliary functions (#4778)
* fix reader
* builder
* implement aux fn's
---
core/src/services/compfs/backend.rs | 106 ++++++++++++++++++++++++++++++++++--
core/src/services/compfs/core.rs | 4 ++
core/src/services/compfs/lister.rs | 2 +-
core/src/services/compfs/reader.rs | 5 +-
core/src/services/compfs/writer.rs | 2 +-
5 files changed, 109 insertions(+), 10 deletions(-)
diff --git a/core/src/services/compfs/backend.rs
b/core/src/services/compfs/backend.rs
index 65bd189b68..9926e04f97 100644
--- a/core/src/services/compfs/backend.rs
+++ b/core/src/services/compfs/backend.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use compio::{dispatcher::Dispatcher, fs::OpenOptions};
+
use super::{core::CompfsCore, lister::CompfsLister, reader::CompfsReader,
writer::CompfsWriter};
+
use crate::raw::*;
use crate::*;
@@ -42,7 +45,7 @@ impl CompfsBuilder {
impl Builder for CompfsBuilder {
const SCHEME: Scheme = Scheme::Compfs;
- type Accessor = ();
+ type Accessor = CompfsBackend;
fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = CompfsBuilder::default();
@@ -53,7 +56,27 @@ impl Builder for CompfsBuilder {
}
fn build(&mut self) -> Result<Self::Accessor> {
- todo!()
+ let root = match self.root.take() {
+ Some(root) => Ok(root),
+ None => Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "root is not specified",
+ )),
+ }?;
+ let dispatcher = Dispatcher::new().map_err(|_| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "failed to initiate compio dispatcher",
+ )
+ })?;
+ let core = CompfsCore {
+ root,
+ dispatcher,
+ buf_pool: oio::PooledBuf::new(16),
+ };
+ Ok(CompfsBackend {
+ core: Arc::new(core),
+ })
}
}
@@ -89,7 +112,6 @@ impl Access for CompfsBackend {
copy: true,
rename: true,
- blocking: true,
..Default::default()
});
@@ -97,8 +119,79 @@ impl Access for CompfsBackend {
am
}
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ let path = self.core.prepare_path(path);
+
+ self.core
+ .exec(move || async move { compio::fs::create_dir_all(path).await
})
+ .await?;
+
+ Ok(RpCreateDir::default())
+ }
+
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let path = self.core.prepare_path(path);
+
+ let meta = self
+ .core
+ .exec(move || async move { compio::fs::metadata(path).await })
+ .await?;
+ let ty = meta.file_type();
+ let mode = if ty.is_dir() {
+ EntryMode::DIR
+ } else if ty.is_file() {
+ EntryMode::FILE
+ } else {
+ EntryMode::Unknown
+ };
+ let last_mod = meta.modified().map_err(new_std_io_error)?.into();
+ let ret = Metadata::new(mode).with_last_modified(last_mod);
+
+ Ok(RpStat::new(ret))
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let path = self.core.prepare_path(path);
+
+ self.core
+ .exec(move || async move { compio::fs::remove_file(path).await })
+ .await?;
+
+ Ok(RpDelete::default())
+ }
+
+ async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
+ let from = self.core.prepare_path(from);
+ let to = self.core.prepare_path(to);
+
+ self.core
+ .exec(move || async move {
+ let from = OpenOptions::new().read(true).open(from).await?;
+ let to =
OpenOptions::new().write(true).create(true).open(to).await?;
+
+ let (mut from, mut to) = (Cursor::new(from), Cursor::new(to));
+ compio::io::copy(&mut from, &mut to).await?;
+
+ Ok(())
+ })
+ .await?;
+
+ Ok(RpCopy::default())
+ }
+
+ async fn rename(&self, from: &str, to: &str, _: OpRename) ->
Result<RpRename> {
+ let from = self.core.prepare_path(from);
+ let to = self.core.prepare_path(to);
+
+ self.core
+ .exec(move || async move { compio::fs::rename(from, to).await })
+ .await?;
+
+ Ok(RpRename::default())
+ }
+
async fn read(&self, path: &str, op: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let path = self.core.root.join(path.trim_end_matches('/'));
+ let path = self.core.prepare_path(path);
let file = self
.core
@@ -110,7 +203,7 @@ impl Access for CompfsBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let path = self.core.root.join(path.trim_end_matches('/'));
+ let path = self.core.prepare_path(path);
let append = args.append();
let file = self
.core
@@ -130,7 +223,8 @@ impl Access for CompfsBackend {
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
- let path = self.core.root.join(path.trim_end_matches('/'));
+ let path = self.core.prepare_path(path);
+
let read_dir = match self
.core
.exec_blocking(move || std::fs::read_dir(path))
diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs
index 60c68106f1..e600edbe7d 100644
--- a/core/src/services/compfs/core.rs
+++ b/core/src/services/compfs/core.rs
@@ -44,6 +44,10 @@ pub(super) struct CompfsCore {
}
impl CompfsCore {
+ pub fn prepare_path(&self, path: &str) -> PathBuf {
+ self.root.join(path.trim_end_matches('/'))
+ }
+
pub async fn exec<Fn, Fut, R>(&self, f: Fn) -> crate::Result<R>
where
Fn: FnOnce() -> Fut + Send + 'static,
diff --git a/core/src/services/compfs/lister.rs
b/core/src/services/compfs/lister.rs
index 7a380d48b7..ac12fc7820 100644
--- a/core/src/services/compfs/lister.rs
+++ b/core/src/services/compfs/lister.rs
@@ -28,7 +28,7 @@ pub struct CompfsLister {
}
impl CompfsLister {
- pub fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
+ pub(super) fn new(core: Arc<CompfsCore>, read_dir: ReadDir) -> Self {
Self {
core,
read_dir: Some(read_dir),
diff --git a/core/src/services/compfs/reader.rs
b/core/src/services/compfs/reader.rs
index 758c110894..421efd8af1 100644
--- a/core/src/services/compfs/reader.rs
+++ b/core/src/services/compfs/reader.rs
@@ -31,7 +31,7 @@ pub struct CompfsReader {
}
impl CompfsReader {
- pub fn new(core: Arc<CompfsCore>, file: compio::fs::File, range:
BytesRange) -> Self {
+ pub(super) fn new(core: Arc<CompfsCore>, file: compio::fs::File, range:
BytesRange) -> Self {
Self { core, file, range }
}
}
@@ -40,13 +40,14 @@ impl oio::Read for CompfsReader {
async fn read(&mut self) -> Result<Buffer> {
let mut bs = self.core.buf_pool.get();
+ let pos = self.range.offset();
let len = self.range.size().expect("range size is always Some");
bs.reserve(len as _);
let f = self.file.clone();
let mut bs = self
.core
.exec(move || async move {
- let (_, bs) = buf_try!(@try f.read_at(bs, len).await);
+ let (_, bs) = buf_try!(@try f.read_at(bs, pos).await);
Ok(bs)
})
.await?;
diff --git a/core/src/services/compfs/writer.rs
b/core/src/services/compfs/writer.rs
index 95a9751910..d46c1b69b7 100644
--- a/core/src/services/compfs/writer.rs
+++ b/core/src/services/compfs/writer.rs
@@ -30,7 +30,7 @@ pub struct CompfsWriter {
}
impl CompfsWriter {
- pub fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
+ pub(super) fn new(core: Arc<CompfsCore>, file: Cursor<File>) -> Self {
Self { core, file }
}
}