This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 8175aa35c chore: Polish some details of layers implementation (#3061)
8175aa35c is described below

commit 8175aa35c056960da0b377ca63375a9ca9862e42
Author: Xuanwo <[email protected]>
AuthorDate: Thu Sep 14 16:02:59 2023 +0800

    chore: Polish some details of layers implementation (#3061)
    
    * Polish docs
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * polish
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Polish error context
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Format code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/java/src/blocking_operator.rs |   4 +-
 bindings/java/src/metadata.rs          |   3 +-
 bindings/java/src/operator.rs          |   1 -
 core/src/layers/complete.rs            | 487 +++++++++++++++++++--------------
 core/src/layers/concurrent_limit.rs    |   8 +-
 core/src/layers/error_context.rs       |  45 +--
 core/src/layers/mod.rs                 |  24 +-
 core/src/layers/type_eraser.rs         |  26 +-
 core/src/raw/adapters/kv/backend.rs    |   3 +-
 core/src/raw/oio/page/api.rs           |   9 +-
 core/src/types/writer.rs               |   4 +-
 11 files changed, 343 insertions(+), 271 deletions(-)

diff --git a/bindings/java/src/blocking_operator.rs 
b/bindings/java/src/blocking_operator.rs
index 6b936e349..5b6c28aba 100644
--- a/bindings/java/src/blocking_operator.rs
+++ b/bindings/java/src/blocking_operator.rs
@@ -24,14 +24,14 @@ use jni::objects::JString;
 use jni::sys::jlong;
 use jni::sys::jstring;
 use jni::JNIEnv;
-
 use opendal::layers::BlockingLayer;
 use opendal::BlockingOperator;
 use opendal::Operator;
 use opendal::Scheme;
 
+use crate::get_global_runtime;
+use crate::jmap_to_hashmap;
 use crate::Result;
-use crate::{get_global_runtime, jmap_to_hashmap};
 
 #[no_mangle]
 pub extern "system" fn Java_org_apache_opendal_BlockingOperator_constructor(
diff --git a/bindings/java/src/metadata.rs b/bindings/java/src/metadata.rs
index 5f2e14ea4..c4f8510d7 100644
--- a/bindings/java/src/metadata.rs
+++ b/bindings/java/src/metadata.rs
@@ -15,7 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use jni::objects::{JClass, JObject};
+use jni::objects::JClass;
+use jni::objects::JObject;
 use jni::sys::jboolean;
 use jni::sys::jlong;
 use jni::JNIEnv;
diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs
index 43ae74cc4..b06b09067 100644
--- a/bindings/java/src/operator.rs
+++ b/bindings/java/src/operator.rs
@@ -25,7 +25,6 @@ use jni::objects::JValue;
 use jni::objects::JValueOwned;
 use jni::sys::jlong;
 use jni::JNIEnv;
-
 use opendal::layers::BlockingLayer;
 use opendal::Operator;
 use opendal::Scheme;
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 84b587efa..82e8066e4 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::cmp;
 use std::fmt::Debug;
 use std::fmt::Formatter;
+use std::io;
 use std::sync::Arc;
 use std::task::ready;
 use std::task::Context;
 use std::task::Poll;
-use std::{cmp, io};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -56,12 +57,12 @@ use crate::*;
 /// capabilities. CompleteLayer will add those capabilities in
 /// a zero cost way.
 ///
-/// Underlying services will return [`AccessorHint`] to indicate the
+/// Underlying services will return [`AccessorInfo`] to indicate the
 /// features that returning readers support.
 ///
 /// - If both `seekable` and `streamable`, return directly.
-/// - If not `streamable`, with [`oio::into_streamable_reader`].
-/// - If not `seekable`, with [`oio::into_reader::by_range`]
+/// - If not `streamable`, with [`oio::into_read_from_stream`].
+/// - If not `seekable`, with [`oio::into_seekable_read_by_range`]
 /// - If neither not supported, wrap both by_range and into_streamable.
 ///
 /// All implementations of Reader should be `zero cost`. In our cases,
@@ -73,10 +74,9 @@ use crate::*;
 ///
 /// ### Read is Seekable
 ///
-/// We use internal `AccessorHint::ReadSeekable` to decide the most
-/// suitable implementations.
+/// We use [`Capability`] to decide the most suitable implementations.
 ///
-/// If there is a hint that `ReadSeekable`, we will open it with given args
+/// If [`Capability`] `read_can_seek` is true, we will open it with given args
 /// directly. Otherwise, we will pick a seekable reader implementation based
 /// on input range for it.
 ///
@@ -92,7 +92,7 @@ use crate::*;
 /// We use internal `AccessorHint::ReadStreamable` to decide the most
 /// suitable implementations.
 ///
-/// If there is a hint that `ReadStreamable`, we will use existing reader
+/// If [`Capability`] `read_can_next` is true, we will use existing reader
 /// directly. Otherwise, we will use transform this reader as a stream.
 ///
 /// ## List Completion
@@ -100,21 +100,19 @@ use crate::*;
 /// There are two styles of list, but not all services support both of
 /// them. CompleteLayer will add those capabilities in a zero cost way.
 ///
-/// Underlying services will return [`AccessorHint`] to indicate the
+/// Underlying services will return [`Capability`] to indicate the
 /// features that returning pagers support.
 ///
-/// - If both `flat` and `hierarchy`, return directly.
-/// - If only `flat`, with [`oio::to_flat_pager`].
-/// - if only `hierarchy`, with [`oio::to_hierarchy_pager`].
-/// - If neither not supported, something must be wrong.
+/// - If both `list_with_delimiter_slash` and `list_without_delimiter`, return 
directly.
+/// - If only `list_without_delimiter`, with [`oio::to_flat_pager`].
+/// - if only `list_with_delimiter_slash`, with [`oio::to_hierarchy_pager`].
+/// - If neither not supported, something must be wrong for `list` is true.
 ///
 /// ## Capability Check
 ///
 /// Before performing any operations, `CompleteLayer` will first check
 /// the operation against capability of the underlying service. If the
 /// operation is not supported, an error will be returned directly.
-///
-/// [`AccessorHint`]: crate::raw::AccessorHint
 pub struct CompleteLayer;
 
 impl<A: Accessor> Layer<A> for CompleteLayer {
@@ -128,7 +126,7 @@ impl<A: Accessor> Layer<A> for CompleteLayer {
     }
 }
 
-/// Provide reader wrapper for backend.
+/// Provide complete wrapper for backend.
 pub struct CompleteReaderAccessor<A: Accessor> {
     meta: AccessorInfo,
     inner: Arc<A>,
@@ -141,6 +139,16 @@ impl<A: Accessor> Debug for CompleteReaderAccessor<A> {
 }
 
 impl<A: Accessor> CompleteReaderAccessor<A> {
+    fn new_unsupported_error(&self, op: impl Into<&'static str>) -> Error {
+        let scheme = self.meta.scheme();
+        let op = op.into();
+        Error::new(
+            ErrorKind::Unsupported,
+            &format!("service {scheme} doesn't support operation {op}"),
+        )
+        .with_operation(op)
+    }
+
     async fn complete_reader(
         &self,
         path: &str,
@@ -148,7 +156,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
     ) -> Result<(RpRead, CompleteReader<A, A::Reader>)> {
         let capability = self.meta.native_capability();
         if !capability.read {
-            return new_capability_unsupported_error(Operation::Read);
+            return Err(self.new_unsupported_error(Operation::Read));
         }
 
         let seekable = capability.read_can_seek;
@@ -201,7 +209,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
     ) -> Result<(RpRead, CompleteReader<A, A::BlockingReader>)> {
         let capability = self.meta.full_capability();
         if !capability.read || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingRead);
+            return Err(self.new_unsupported_error(Operation::BlockingRead));
         }
 
         let seekable = capability.read_can_seek;
@@ -257,11 +265,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
     ) -> Result<(RpList, CompletePager<A, A::Pager>)> {
         let cap = self.meta.full_capability();
         if !cap.list {
-            return Err(
-                Error::new(ErrorKind::Unsupported, "operation is not 
supported")
-                    .with_context("service", self.meta.scheme())
-                    .with_operation("list"),
-            );
+            return Err(self.new_unsupported_error(Operation::List));
         }
 
         let delimiter = args.delimiter();
@@ -306,11 +310,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
     ) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
         let cap = self.meta.full_capability();
         if !cap.list {
-            return Err(
-                Error::new(ErrorKind::Unsupported, "operation is not 
supported")
-                    .with_context("service", self.meta.scheme())
-                    .with_operation("list"),
-            );
+            return Err(self.new_unsupported_error(Operation::BlockingList));
         }
 
         let delimiter = args.delimiter();
@@ -377,51 +377,31 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
         meta
     }
 
-    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        self.complete_reader(path, args).await
-    }
-
-    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        self.complete_blocking_reader(path, args)
-    }
-
-    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+    async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
         let capability = self.meta.full_capability();
-        if !capability.stat {
-            return new_capability_unsupported_error(Operation::Stat);
+        if !capability.create_dir {
+            return Err(self.new_unsupported_error(Operation::CreateDir));
         }
 
-        self.inner.stat(path, args).await.map(|v| {
-            v.map_metadata(|m| {
-                let bit = m.bit();
-                m.with_bit(bit | Metakey::Complete)
-            })
-        })
+        self.inner().create_dir(path, args).await
     }
 
-    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let capability = self.meta.full_capability();
-        if !capability.stat || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingStat);
-        }
-
-        self.inner.blocking_stat(path, args).map(|v| {
-            v.map_metadata(|m| {
-                let bit = m.bit();
-                m.with_bit(bit | Metakey::Complete)
-            })
-        })
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        self.complete_reader(path, args).await
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         let capability = self.meta.full_capability();
         if !capability.write {
-            return new_capability_unsupported_error(Operation::Write);
+            return Err(self.new_unsupported_error(Operation::Write));
         }
         if args.append() && !capability.write_can_append {
             return Err(Error::new(
                 ErrorKind::Unsupported,
-                "write with append enabled is not supported",
+                &format!(
+                    "service {} doesn't support operation write with append",
+                    self.info().scheme()
+                ),
             ));
         }
 
@@ -453,129 +433,156 @@ impl<A: Accessor> LayeredAccessor for 
CompleteReaderAccessor<A> {
         Ok((rp, w))
     }
 
-    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
         let capability = self.meta.full_capability();
-        if !capability.write || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingWrite);
-        }
-        if args.append() && !capability.write_can_append {
-            return Err(Error::new(
-                ErrorKind::Unsupported,
-                "write with append enabled is not supported",
-            ));
+        if !capability.copy {
+            return Err(self.new_unsupported_error(Operation::Copy));
         }
 
-        self.inner
-            .blocking_write(path, args)
-            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
+        self.inner().copy(from, to, args).await
     }
 
-    async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+    async fn rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
         let capability = self.meta.full_capability();
-        if !capability.create_dir {
-            return new_capability_unsupported_error(Operation::CreateDir);
+        if !capability.rename {
+            return Err(self.new_unsupported_error(Operation::Rename));
         }
 
-        self.inner().create_dir(path, args).await
+        self.inner().rename(from, to, args).await
     }
 
-    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         let capability = self.meta.full_capability();
-        if !capability.create_dir || !capability.blocking {
-            return 
new_capability_unsupported_error(Operation::BlockingCreateDir);
+        if !capability.stat {
+            return Err(self.new_unsupported_error(Operation::Stat));
         }
 
-        self.inner().blocking_create_dir(path, args)
+        self.inner.stat(path, args).await.map(|v| {
+            v.map_metadata(|m| {
+                let bit = m.bit();
+                m.with_bit(bit | Metakey::Complete)
+            })
+        })
     }
 
     async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
         let capability = self.meta.full_capability();
         if !capability.delete {
-            return new_capability_unsupported_error(Operation::Delete);
+            return Err(self.new_unsupported_error(Operation::Delete));
         }
 
         self.inner().delete(path, args).await
     }
 
-    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         let capability = self.meta.full_capability();
-        if !capability.delete || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingDelete);
+        if !capability.list {
+            return Err(self.new_unsupported_error(Operation::List));
         }
 
-        self.inner().blocking_delete(path, args)
+        self.complete_list(path, args).await
     }
 
-    async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> 
{
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
         let capability = self.meta.full_capability();
-        if !capability.copy {
-            return new_capability_unsupported_error(Operation::Copy);
+        if !capability.batch {
+            return Err(self.new_unsupported_error(Operation::Batch));
         }
 
-        self.inner().copy(from, to, args).await
+        self.inner().batch(args).await
     }
 
-    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
         let capability = self.meta.full_capability();
-        if !capability.copy || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingCopy);
+        if !capability.presign {
+            return Err(self.new_unsupported_error(Operation::Presign));
         }
 
-        self.inner().blocking_copy(from, to, args)
+        self.inner.presign(path, args).await
     }
 
-    async fn rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
+    fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
         let capability = self.meta.full_capability();
-        if !capability.rename {
-            return new_capability_unsupported_error(Operation::Rename);
+        if !capability.create_dir || !capability.blocking {
+            return 
Err(self.new_unsupported_error(Operation::BlockingCreateDir));
         }
 
-        self.inner().rename(from, to, args).await
+        self.inner().blocking_create_dir(path, args)
     }
 
-    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        self.complete_blocking_reader(path, args)
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
         let capability = self.meta.full_capability();
-        if !capability.rename || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingRename);
+        if !capability.write || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingWrite));
         }
 
-        self.inner().blocking_rename(from, to, args)
+        if args.append() && !capability.write_can_append {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                &format!(
+                    "service {} doesn't support operation write with append",
+                    self.info().scheme()
+                ),
+            ));
+        }
+
+        self.inner
+            .blocking_write(path, args)
+            .map(|(rp, w)| (rp, CompleteWriter::new(w)))
     }
 
-    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
         let capability = self.meta.full_capability();
-        if !capability.list {
-            return new_capability_unsupported_error(Operation::List);
+        if !capability.copy || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingCopy));
         }
 
-        self.complete_list(path, args).await
+        self.inner().blocking_copy(from, to, args)
     }
 
-    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
         let capability = self.meta.full_capability();
-        if !capability.list || !capability.blocking {
-            return new_capability_unsupported_error(Operation::BlockingList);
+        if !capability.rename || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingRename));
         }
 
-        self.complete_blocking_list(path, args)
+        self.inner().blocking_rename(from, to, args)
     }
 
-    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         let capability = self.meta.full_capability();
-        if !capability.presign {
-            return new_capability_unsupported_error(Operation::Presign);
+        if !capability.stat || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingStat));
         }
 
-        self.inner.presign(path, args).await
+        self.inner.blocking_stat(path, args).map(|v| {
+            v.map_metadata(|m| {
+                let bit = m.bit();
+                m.with_bit(bit | Metakey::Complete)
+            })
+        })
     }
 
-    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
         let capability = self.meta.full_capability();
-        if !capability.batch {
-            return new_capability_unsupported_error(Operation::Batch);
+        if !capability.delete || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingDelete));
         }
 
-        self.inner().batch(args).await
+        self.inner().blocking_delete(path, args)
+    }
+
+    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+        let capability = self.meta.full_capability();
+        if !capability.list || !capability.blocking {
+            return Err(self.new_unsupported_error(Operation::BlockingList));
+        }
+
+        self.complete_blocking_list(path, args)
     }
 }
 
@@ -786,13 +793,8 @@ where
     }
 }
 
-fn new_capability_unsupported_error<R>(operation: Operation) -> Result<R> {
-    Err(Error::new(ErrorKind::Unsupported, "operation is not 
supported").with_operation(operation))
-}
-
 #[cfg(test)]
 mod tests {
-    use std::collections::HashMap;
     use std::time::Duration;
 
     use async_trait::async_trait;
@@ -801,33 +803,6 @@ mod tests {
 
     use super::*;
 
-    #[derive(Default)]
-    struct MockBuilder {
-        capability: Capability,
-    }
-
-    impl MockBuilder {
-        fn with_capacity(mut self, capability: Capability) -> Self {
-            self.capability = capability;
-            self
-        }
-    }
-
-    impl Builder for MockBuilder {
-        const SCHEME: Scheme = Scheme::Custom("mock");
-        type Accessor = MockService;
-
-        fn from_map(_: HashMap<String, String>) -> Self {
-            Self::default()
-        }
-
-        fn build(&mut self) -> Result<Self::Accessor> {
-            Ok(MockService {
-                capability: self.capability,
-            })
-        }
-    }
-
     #[derive(Debug)]
     struct MockService {
         capability: Capability,
@@ -835,12 +810,12 @@ mod tests {
 
     #[async_trait]
     impl Accessor for MockService {
-        type Reader = ();
-        type BlockingReader = ();
-        type Writer = ();
-        type BlockingWriter = ();
-        type Pager = ();
-        type BlockingPager = ();
+        type Reader = oio::Reader;
+        type BlockingReader = oio::BlockingReader;
+        type Writer = oio::Writer;
+        type BlockingWriter = oio::BlockingWriter;
+        type Pager = oio::Pager;
+        type BlockingPager = oio::BlockingPager;
 
         fn info(&self) -> AccessorInfo {
             let mut info = AccessorInfo::default();
@@ -849,24 +824,28 @@ mod tests {
             info
         }
 
-        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
-            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
+        async fn create_dir(&self, _: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+            Ok(RpCreateDir {})
         }
 
         async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-            Ok((RpRead::new(0), ()))
+            Ok((RpRead::new(0), Box::new(())))
         }
 
         async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-            Ok((RpWrite::new(), ()))
+            Ok((RpWrite::new(), Box::new(())))
         }
 
         async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result<RpCopy> {
             Ok(RpCopy {})
         }
 
-        async fn create_dir(&self, _: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-            Ok(RpCreateDir {})
+        async fn rename(&self, _: &str, _: &str, _: OpRename) -> 
Result<RpRename> {
+            Ok(RpRename {})
+        }
+
+        async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
+            Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
         }
 
         async fn delete(&self, _: &str, _: OpDelete) -> Result<RpDelete> {
@@ -874,11 +853,7 @@ mod tests {
         }
 
         async fn list(&self, _: &str, _: OpList) -> Result<(RpList, 
Self::Pager)> {
-            Ok((RpList {}, ()))
-        }
-
-        async fn rename(&self, _: &str, _: &str, _: OpRename) -> 
Result<RpRename> {
-            Ok(RpRename {})
+            Ok((RpList {}, Box::new(())))
         }
 
         async fn presign(&self, _: &str, _: OpPresign) -> Result<RpPresign> {
@@ -890,50 +865,144 @@ mod tests {
         }
     }
 
-    /// Perform the test against different capability preconditions.
-    macro_rules! capability_test {
-        ($cap:ident, |$arg:ident| { $($body:tt)* }) => {
-            paste::item! {
-                #[tokio::test]
-                async fn [<test_capability_ $cap>]() {
-                    let res_builder = |$arg: Operator| async move {
-                        let res = { $($body)* };
-                        res.await.err()
-                    };
-
-                    let builder = 
MockBuilder::default().with_capacity(Capability {
-                        $cap: false,
-                        ..Default::default()
-                    });
-                    let op = Operator::new(builder).expect("should 
build").finish();
-                    let res = res_builder(op.clone()).await;
-                    assert_eq!(res.expect("should not be None").kind(), 
ErrorKind::Unsupported);
-
-                    let builder = 
MockBuilder::default().with_capacity(Capability {
-                        $cap: true,
-                        ..Default::default()
-                    });
-                    let op = Operator::new(builder).expect("should 
build").finish();
-                    let res = res_builder(op.clone()).await;
-                    assert!(res.is_none());
-                }
-            }
-        };
+    fn new_test_operator(capability: Capability) -> Operator {
+        let srv = MockService { capability };
+
+        Operator::from_inner(Arc::new(srv)).layer(CompleteLayer)
+    }
+
+    #[tokio::test]
+    async fn test_read() {
+        let op = new_test_operator(Capability::default());
+        let res = op.read("path").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            read: true,
+            ..Default::default()
+        });
+        let res = op.read("path").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_stat() {
+        let op = new_test_operator(Capability::default());
+        let res = op.stat("path").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            stat: true,
+            ..Default::default()
+        });
+        let res = op.stat("path").await;
+        assert!(res.is_ok())
     }
 
-    capability_test!(stat, |op| { op.stat("/path/to/mock_file") });
-    capability_test!(read, |op| { op.read("/path/to/mock_file") });
-    capability_test!(write, |op| { op.writer("/path/to/mock_file") });
-    capability_test!(create_dir, |op| { op.create_dir("/path/to/mock_dir/") });
-    capability_test!(delete, |op| { op.delete("/path/to/mock_file") });
-    capability_test!(copy, |op| {
-        op.copy("/path/to/mock_file", "/path/to/mock_file_2")
-    });
-    capability_test!(rename, |op| {
-        op.rename("/path/to/mock_file", "/path/to/mock_file_2")
-    });
-    capability_test!(list, |op| { op.lister("/path/to/mock_dir/") });
-    capability_test!(presign, |op| {
-        op.presign_read("/path/to/mock_file", Duration::from_secs(1))
-    });
+    #[tokio::test]
+    async fn test_writer() {
+        let op = new_test_operator(Capability::default());
+        let res = op.write("path", vec![]).await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            write: true,
+            ..Default::default()
+        });
+        let res = op.writer("path").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_create_dir() {
+        let op = new_test_operator(Capability::default());
+        let res = op.create_dir("path/").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            create_dir: true,
+            ..Default::default()
+        });
+        let res = op.create_dir("path/").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_delete() {
+        let op = new_test_operator(Capability::default());
+        let res = op.delete("path").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            delete: true,
+            ..Default::default()
+        });
+        let res = op.delete("path").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_copy() {
+        let op = new_test_operator(Capability::default());
+        let res = op.copy("path_a", "path_b").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            copy: true,
+            ..Default::default()
+        });
+        let res = op.copy("path_a", "path_b").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_rename() {
+        let op = new_test_operator(Capability::default());
+        let res = op.rename("path_a", "path_b").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            rename: true,
+            ..Default::default()
+        });
+        let res = op.rename("path_a", "path_b").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_list() {
+        let op = new_test_operator(Capability::default());
+        let res = op.list("path/").await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            list: true,
+            ..Default::default()
+        });
+        let res = op.list("path/").await;
+        assert!(res.is_ok())
+    }
+
+    #[tokio::test]
+    async fn test_presign() {
+        let op = new_test_operator(Capability::default());
+        let res = op.presign_read("path", Duration::from_secs(1)).await;
+        assert!(res.is_err());
+        assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+        let op = new_test_operator(Capability {
+            presign: true,
+            ..Default::default()
+        });
+        let res = op.presign_read("path", Duration::from_secs(1)).await;
+        assert!(res.is_ok())
+    }
 }
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index b09408d45..c20ef4108 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -289,13 +289,13 @@ impl<R: oio::Write> oio::Write for 
ConcurrentLimitWrapper<R> {
         self.inner.poll_write(cx, bs)
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.poll_abort(cx)
-    }
-
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
         self.inner.poll_close(cx)
     }
+
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        self.inner.poll_abort(cx)
+    }
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 1fd5e608c..9dae975f9 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -97,13 +97,13 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
 
         self.inner
             .read(path, args)
-            .map_ok(|(rp, os)| {
+            .map_ok(|(rp, r)| {
                 (
                     rp,
                     ErrorContextWrapper {
                         scheme: self.meta.scheme(),
                         path: path.to_string(),
-                        inner: os,
+                        inner: r,
                     },
                 )
             })
@@ -119,13 +119,13 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         self.inner
             .write(path, args)
-            .map_ok(|(rp, os)| {
+            .map_ok(|(rp, w)| {
                 (
                     rp,
                     ErrorContextWrapper {
                         scheme: self.meta.scheme(),
                         path: path.to_string(),
-                        inner: os,
+                        inner: w,
                     },
                 )
             })
@@ -186,13 +186,13 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         self.inner
             .list(path, args)
-            .map_ok(|(rp, os)| {
+            .map_ok(|(rp, p)| {
                 (
                     rp,
                     ErrorContextWrapper {
                         scheme: self.meta.scheme(),
                         path: path.to_string(),
-                        inner: os,
+                        inner: p,
                     },
                 )
             })
@@ -204,14 +204,6 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
             .await
     }
 
-    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
-        self.inner.presign(path, args).await.map_err(|err| {
-            err.with_operation(Operation::Presign)
-                .with_context("service", self.meta.scheme())
-                .with_context("path", path)
-        })
-    }
-
     async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
         self.inner
             .batch(args)
@@ -238,6 +230,14 @@ impl<A: Accessor> LayeredAccessor for 
ErrorContextAccessor<A> {
             .await
     }
 
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        self.inner.presign(path, args).await.map_err(|err| {
+            err.with_operation(Operation::Presign)
+                .with_context("service", self.meta.scheme())
+                .with_context("path", path)
+        })
+    }
+
     fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
         self.inner.blocking_create_dir(path, args).map_err(|err| {
             err.with_operation(Operation::BlockingCreateDir)
@@ -353,6 +353,7 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
             err.with_operation(ReadOperation::Read)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
+                .with_context("read_buf", buf.len().to_string())
         })
     }
 
@@ -379,6 +380,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
             err.with_operation(ReadOperation::BlockingRead)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
+                .with_context("read_buf", buf.len().to_string())
         })
     }
 
@@ -387,6 +389,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for 
ErrorContextWrapper<T> {
             err.with_operation(ReadOperation::BlockingSeek)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
+                .with_context("seek", format!("{pos:?}"))
         })
     }
 
@@ -408,20 +411,21 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T> 
{
             err.with_operation(WriteOperation::Write)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
+                .with_context("write_buf", bs.remaining().to_string())
         })
     }
 
-    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.poll_abort(cx).map_err(|err| {
-            err.with_operation(WriteOperation::Abort)
+    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        self.inner.poll_close(cx).map_err(|err| {
+            err.with_operation(WriteOperation::Close)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
         })
     }
 
-    fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        self.inner.poll_close(cx).map_err(|err| {
-            err.with_operation(WriteOperation::Close)
+    fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+        self.inner.poll_abort(cx).map_err(|err| {
+            err.with_operation(WriteOperation::Abort)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
         })
@@ -434,6 +438,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for 
ErrorContextWrapper<T> {
             err.with_operation(WriteOperation::BlockingWrite)
                 .with_context("service", self.scheme)
                 .with_context("path", &self.path)
+                .with_context("write_buf", bs.remaining().to_string())
         })
     }
 
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 2c738fcdd..cd9414ba6 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -17,6 +17,15 @@
 
 //! `Layer` is the mechanism to intercept operations.
 
+mod type_eraser;
+pub(crate) use type_eraser::TypeEraseLayer;
+
+mod error_context;
+pub(crate) use error_context::ErrorContextLayer;
+
+mod complete;
+pub(crate) use complete::CompleteLayer;
+
 mod concurrent_limit;
 pub use concurrent_limit::ConcurrentLimitLayer;
 
@@ -61,18 +70,8 @@ mod minitrace;
 #[cfg(feature = "layers-minitrace")]
 pub use self::minitrace::MinitraceLayer;
 
-mod type_eraser;
-pub(crate) use type_eraser::TypeEraseLayer;
-
-mod error_context;
-pub(crate) use error_context::ErrorContextLayer;
-
-mod complete;
-pub(crate) use complete::CompleteLayer;
-
 #[cfg(feature = "layers-madsim")]
 mod madsim;
-
 #[cfg(feature = "layers-madsim")]
 pub use self::madsim::MadsimLayer;
 #[cfg(feature = "layers-madsim")]
@@ -80,11 +79,11 @@ pub use self::madsim::MadsimServer;
 
 #[cfg(feature = "layers-otel-trace")]
 mod oteltrace;
+#[cfg(feature = "layers-otel-trace")]
+pub use self::oteltrace::OtelTraceLayer;
 
 #[cfg(feature = "layers-throttle")]
 mod throttle;
-#[cfg(feature = "layers-otel-trace")]
-pub use self::oteltrace::OtelTraceLayer;
 #[cfg(feature = "layers-throttle")]
 pub use self::throttle::ThrottleLayer;
 
@@ -95,6 +94,5 @@ pub use self::await_tree::AwaitTreeLayer;
 
 #[cfg(feature = "layers-async-backtrace")]
 mod async_backtrace;
-
 #[cfg(feature = "layers-async-backtrace")]
 pub use self::async_backtrace::AsyncBacktraceLayer;
diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs
index 8d58221ea..9561700c7 100644
--- a/core/src/layers/type_eraser.rs
+++ b/core/src/layers/type_eraser.rs
@@ -25,6 +25,8 @@ use crate::*;
 
 /// TypeEraseLayer will erase the types on internal accessor.
 ///
+/// For example, we will erase `Self::Reader` to `oio::Reader` (`Box<dyn 
oio::Read>`).
+///
 /// # Notes
 ///
 /// TypeEraseLayer is not a public accessible layer that can be used by
@@ -71,12 +73,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
             .map(|(rp, r)| (rp, Box::new(r) as oio::Reader))
     }
 
-    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
-        self.inner
-            .blocking_read(path, args)
-            .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader))
-    }
-
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         self.inner
             .write(path, args)
@@ -84,12 +80,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
             .map(|(rp, w)| (rp, Box::new(w) as oio::Writer))
     }
 
-    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
-        self.inner
-            .blocking_write(path, args)
-            .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
-    }
-
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         self.inner
             .list(path, args)
@@ -97,6 +87,18 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
             .map(|(rp, p)| (rp, Box::new(p) as oio::Pager))
     }
 
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        self.inner
+            .blocking_read(path, args)
+            .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader))
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        self.inner
+            .blocking_write(path, args)
+            .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
+    }
+
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
         self.inner
             .blocking_list(path, args)
diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index f7a40e698..211e9c9ef 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,7 +21,8 @@ use std::task::Context;
 use std::task::Poll;
 
 use async_trait::async_trait;
-use bytes::{Bytes, BytesMut};
+use bytes::Bytes;
+use bytes::BytesMut;
 use futures::future::BoxFuture;
 use futures::FutureExt;
 
diff --git a/core/src/raw/oio/page/api.rs b/core/src/raw/oio/page/api.rs
index 18bbe4c5d..62a265404 100644
--- a/core/src/raw/oio/page/api.rs
+++ b/core/src/raw/oio/page/api.rs
@@ -72,9 +72,9 @@ pub trait Page: Send + Sync + 'static {
 pub type Pager = Box<dyn Page>;
 
 #[async_trait]
-impl Page for Pager {
+impl<P: Page + ?Sized> Page for Box<P> {
     async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
-        self.as_mut().next().await
+        (**self).next().await
     }
 }
 
@@ -107,9 +107,9 @@ pub trait BlockingPage: Send + 'static {
 /// BlockingPager is a boxed [`BlockingPage`]
 pub type BlockingPager = Box<dyn BlockingPage>;
 
-impl BlockingPage for BlockingPager {
+impl<P: BlockingPage + ?Sized> BlockingPage for Box<P> {
     fn next(&mut self) -> Result<Option<Vec<Entry>>> {
-        self.as_mut().next()
+        (**self).next()
     }
 }
 
@@ -119,7 +119,6 @@ impl BlockingPage for () {
     }
 }
 
-#[async_trait]
 impl<P: BlockingPage> BlockingPage for Option<P> {
     fn next(&mut self) -> Result<Option<Vec<Entry>>> {
         match self {
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index b350f9625..92a6de396 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -122,9 +122,7 @@ impl Writer {
     ///
     /// #[tokio::main]
     /// async fn sink_example(op: Operator) -> Result<()> {
-    ///     let mut w = op
-    ///         .writer_with("path/to/file")
-    ///         .await?;
+    ///     let mut w = op.writer_with("path/to/file").await?;
     ///     let stream = stream::iter(vec![vec![0; 4096], vec![1; 
4096]]).map(Ok);
     ///     w.sink(stream).await?;
     ///     w.close().await?;


Reply via email to