This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch polish-details in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ab8c0e9e8d6232683f4ff910bbf7338806447f01 Author: Xuanwo <[email protected]> AuthorDate: Thu Sep 14 14:56:15 2023 +0800 Polish docs Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 213 ++++++++++++++++++++--------------------- core/src/layers/mod.rs | 24 +++-- core/src/layers/type_eraser.rs | 26 ++--- 3 files changed, 130 insertions(+), 133 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 84b587efa..33565c9c8 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -56,12 +56,12 @@ use crate::*; /// capabilities. CompleteLayer will add those capabilities in /// a zero cost way. /// -/// Underlying services will return [`AccessorHint`] to indicate the +/// Underlying services will return [`AccessorInfo`] to indicate the /// features that returning readers support. /// /// - If both `seekable` and `streamable`, return directly. -/// - If not `streamable`, with [`oio::into_streamable_reader`]. -/// - If not `seekable`, with [`oio::into_reader::by_range`] +/// - If not `streamable`, with [`oio::into_read_from_stream`]. +/// - If not `seekable`, with [`oio::into_seekable_read_by_range`] /// - If neither not supported, wrap both by_range and into_streamable. /// /// All implementations of Reader should be `zero cost`. In our cases, @@ -73,10 +73,9 @@ use crate::*; /// /// ### Read is Seekable /// -/// We use internal `AccessorHint::ReadSeekable` to decide the most -/// suitable implementations. +/// We use [`Capability`] to decide the most suitable implementations. /// -/// If there is a hint that `ReadSeekable`, we will open it with given args +/// If [`Capability`] `read_can_seek` is true, we will open it with given args /// directly. Otherwise, we will pick a seekable reader implementation based /// on input range for it. /// @@ -92,7 +91,7 @@ use crate::*; /// We use internal `AccessorHint::ReadStreamable` to decide the most /// suitable implementations. /// -/// If there is a hint that `ReadStreamable`, we will use existing reader +/// If [`Capability`] `read_can_next` is true, we will use existing reader /// directly. Otherwise, we will use transform this reader as a stream. /// /// ## List Completion @@ -100,21 +99,19 @@ use crate::*; /// There are two styles of list, but not all services support both of /// them. CompleteLayer will add those capabilities in a zero cost way. /// -/// Underlying services will return [`AccessorHint`] to indicate the +/// Underlying services will return [`Capability`] to indicate the /// features that returning pagers support. /// -/// - If both `flat` and `hierarchy`, return directly. -/// - If only `flat`, with [`oio::to_flat_pager`]. -/// - if only `hierarchy`, with [`oio::to_hierarchy_pager`]. -/// - If neither not supported, something must be wrong. +/// - If both `list_with_delimiter_slash` and `list_without_delimiter`, return directly. +/// - If only `list_without_delimiter`, with [`oio::to_flat_pager`]. +/// - if only `list_with_delimiter_slash`, with [`oio::to_hierarchy_pager`]. +/// - If neither not supported, something must be wrong for `list` is true. /// /// ## Capability Check /// /// Before performing any operations, `CompleteLayer` will first check /// the operation against capability of the underlying service. If the /// operation is not supported, an error will be returned directly. -/// -/// [`AccessorHint`]: crate::raw::AccessorHint pub struct CompleteLayer; impl<A: Accessor> Layer<A> for CompleteLayer { @@ -128,7 +125,7 @@ impl<A: Accessor> Layer<A> for CompleteLayer { } } -/// Provide reader wrapper for backend. +/// Provide complete wrapper for backend. pub struct CompleteReaderAccessor<A: Accessor> { meta: AccessorInfo, inner: Arc<A>, @@ -377,40 +374,17 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { meta } - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.complete_reader(path, args).await - } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.complete_blocking_reader(path, args) - } - - async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { let capability = self.meta.full_capability(); - if !capability.stat { - return new_capability_unsupported_error(Operation::Stat); + if !capability.create_dir { + return new_capability_unsupported_error(Operation::CreateDir); } - self.inner.stat(path, args).await.map(|v| { - v.map_metadata(|m| { - let bit = m.bit(); - m.with_bit(bit | Metakey::Complete) - }) - }) + self.inner().create_dir(path, args).await } - fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> { - let capability = self.meta.full_capability(); - if !capability.stat || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingStat); - } - - self.inner.blocking_stat(path, args).map(|v| { - v.map_metadata(|m| { - let bit = m.bit(); - m.with_bit(bit | Metakey::Complete) - }) - }) + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + self.complete_reader(path, args).await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -453,39 +427,36 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { Ok((rp, w)) } - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { let capability = self.meta.full_capability(); - if !capability.write || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingWrite); - } - if args.append() && !capability.write_can_append { - return Err(Error::new( - ErrorKind::Unsupported, - "write with append enabled is not supported", - )); + if !capability.copy { + return new_capability_unsupported_error(Operation::Copy); } - self.inner - .blocking_write(path, args) - .map(|(rp, w)| (rp, CompleteWriter::new(w))) + self.inner().copy(from, to, args).await } - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { let capability = self.meta.full_capability(); - if !capability.create_dir { - return new_capability_unsupported_error(Operation::CreateDir); + if !capability.rename { + return new_capability_unsupported_error(Operation::Rename); } - self.inner().create_dir(path, args).await + self.inner().rename(from, to, args).await } - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { + async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { let capability = self.meta.full_capability(); - if !capability.create_dir || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingCreateDir); + if !capability.stat { + return new_capability_unsupported_error(Operation::Stat); } - self.inner().blocking_create_dir(path, args) + self.inner.stat(path, args).await.map(|v| { + v.map_metadata(|m| { + let bit = m.bit(); + m.with_bit(bit | Metakey::Complete) + }) + }) } async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> { @@ -497,85 +468,111 @@ impl<A: Accessor> LayeredAccessor for CompleteReaderAccessor<A> { self.inner().delete(path, args).await } - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> { + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { let capability = self.meta.full_capability(); - if !capability.delete || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingDelete); + if !capability.list { + return new_capability_unsupported_error(Operation::List); } - self.inner().blocking_delete(path, args) + self.complete_list(path, args).await } - async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { + async fn batch(&self, args: OpBatch) -> Result<RpBatch> { let capability = self.meta.full_capability(); - if !capability.copy { - return new_capability_unsupported_error(Operation::Copy); + if !capability.batch { + return new_capability_unsupported_error(Operation::Batch); } - self.inner().copy(from, to, args).await + self.inner().batch(args).await } - fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { + async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { let capability = self.meta.full_capability(); - if !capability.copy || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingCopy); + if !capability.presign { + return new_capability_unsupported_error(Operation::Presign); } - self.inner().blocking_copy(from, to, args) + self.inner.presign(path, args).await } - async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { + fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { let capability = self.meta.full_capability(); - if !capability.rename { - return new_capability_unsupported_error(Operation::Rename); + if !capability.create_dir || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingCreateDir); } - self.inner().rename(from, to, args).await + self.inner().blocking_create_dir(path, args) } - fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.complete_blocking_reader(path, args) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { let capability = self.meta.full_capability(); - if !capability.rename || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingRename); + if !capability.write || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingWrite); + } + if args.append() && !capability.write_can_append { + return Err(Error::new( + ErrorKind::Unsupported, + "write with append enabled is not supported", + )); } - self.inner().blocking_rename(from, to, args) + self.inner + .blocking_write(path, args) + .map(|(rp, w)| (rp, CompleteWriter::new(w))) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { let capability = self.meta.full_capability(); - if !capability.list { - return new_capability_unsupported_error(Operation::List); + if !capability.copy || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingCopy); } - self.complete_list(path, args).await + self.inner().blocking_copy(from, to, args) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { let capability = self.meta.full_capability(); - if !capability.list || !capability.blocking { - return new_capability_unsupported_error(Operation::BlockingList); + if !capability.rename || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingRename); } - self.complete_blocking_list(path, args) + self.inner().blocking_rename(from, to, args) } - async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { + fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> { let capability = self.meta.full_capability(); - if !capability.presign { - return new_capability_unsupported_error(Operation::Presign); + if !capability.stat || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingStat); } - self.inner.presign(path, args).await + self.inner.blocking_stat(path, args).map(|v| { + v.map_metadata(|m| { + let bit = m.bit(); + m.with_bit(bit | Metakey::Complete) + }) + }) } - async fn batch(&self, args: OpBatch) -> Result<RpBatch> { + fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> { let capability = self.meta.full_capability(); - if !capability.batch { - return new_capability_unsupported_error(Operation::Batch); + if !capability.delete || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingDelete); } - self.inner().batch(args).await + self.inner().blocking_delete(path, args) + } + + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { + let capability = self.meta.full_capability(); + if !capability.list || !capability.blocking { + return new_capability_unsupported_error(Operation::BlockingList); + } + + self.complete_blocking_list(path, args) } } @@ -849,8 +846,8 @@ mod tests { info } - async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> { - Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) + async fn create_dir(&self, _: &str, _: OpCreateDir) -> Result<RpCreateDir> { + Ok(RpCreateDir {}) } async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { @@ -865,8 +862,12 @@ mod tests { Ok(RpCopy {}) } - async fn create_dir(&self, _: &str, _: OpCreateDir) -> Result<RpCreateDir> { - Ok(RpCreateDir {}) + async fn rename(&self, _: &str, _: &str, _: OpRename) -> Result<RpRename> { + Ok(RpRename {}) + } + + async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> { + Ok(RpStat::new(Metadata::new(EntryMode::Unknown))) } async fn delete(&self, _: &str, _: OpDelete) -> Result<RpDelete> { @@ -877,10 +878,6 @@ mod tests { Ok((RpList {}, ())) } - async fn rename(&self, _: &str, _: &str, _: OpRename) -> Result<RpRename> { - Ok(RpRename {}) - } - async fn presign(&self, _: &str, _: OpPresign) -> Result<RpPresign> { Ok(RpPresign::new(PresignedRequest::new( HttpMethod::POST, diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 2c738fcdd..cd9414ba6 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -17,6 +17,15 @@ //! `Layer` is the mechanism to intercept operations. +mod type_eraser; +pub(crate) use type_eraser::TypeEraseLayer; + +mod error_context; +pub(crate) use error_context::ErrorContextLayer; + +mod complete; +pub(crate) use complete::CompleteLayer; + mod concurrent_limit; pub use concurrent_limit::ConcurrentLimitLayer; @@ -61,18 +70,8 @@ mod minitrace; #[cfg(feature = "layers-minitrace")] pub use self::minitrace::MinitraceLayer; -mod type_eraser; -pub(crate) use type_eraser::TypeEraseLayer; - -mod error_context; -pub(crate) use error_context::ErrorContextLayer; - -mod complete; -pub(crate) use complete::CompleteLayer; - #[cfg(feature = "layers-madsim")] mod madsim; - #[cfg(feature = "layers-madsim")] pub use self::madsim::MadsimLayer; #[cfg(feature = "layers-madsim")] @@ -80,11 +79,11 @@ pub use self::madsim::MadsimServer; #[cfg(feature = "layers-otel-trace")] mod oteltrace; +#[cfg(feature = "layers-otel-trace")] +pub use self::oteltrace::OtelTraceLayer; #[cfg(feature = "layers-throttle")] mod throttle; -#[cfg(feature = "layers-otel-trace")] -pub use self::oteltrace::OtelTraceLayer; #[cfg(feature = "layers-throttle")] pub use self::throttle::ThrottleLayer; @@ -95,6 +94,5 @@ pub use self::await_tree::AwaitTreeLayer; #[cfg(feature = "layers-async-backtrace")] mod async_backtrace; - #[cfg(feature = "layers-async-backtrace")] pub use self::async_backtrace::AsyncBacktraceLayer; diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs index 8d58221ea..9561700c7 100644 --- a/core/src/layers/type_eraser.rs +++ b/core/src/layers/type_eraser.rs @@ -25,6 +25,8 @@ use crate::*; /// TypeEraseLayer will erase the types on internal accessor. /// +/// For example, we will erase `Self::Reader` to `oio::Reader` (`Box<dyn oio::Read>`). +/// /// # Notes /// /// TypeEraseLayer is not a public accessible layer that can be used by @@ -71,12 +73,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { .map(|(rp, r)| (rp, Box::new(r) as oio::Reader)) } - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.inner - .blocking_read(path, args) - .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader)) - } - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { self.inner .write(path, args) @@ -84,12 +80,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { .map(|(rp, w)| (rp, Box::new(w) as oio::Writer)) } - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.inner - .blocking_write(path, args) - .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter)) - } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { self.inner .list(path, args) @@ -97,6 +87,18 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> { .map(|(rp, p)| (rp, Box::new(p) as oio::Pager)) } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { + self.inner + .blocking_read(path, args) + .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader)) + } + + fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { + self.inner + .blocking_write(path, args) + .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter)) + } + fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner .blocking_list(path, args)
