This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8175aa35c chore: Polish some details of layers implementation (#3061)
8175aa35c is described below
commit 8175aa35c056960da0b377ca63375a9ca9862e42
Author: Xuanwo <[email protected]>
AuthorDate: Thu Sep 14 16:02:59 2023 +0800
chore: Polish some details of layers implementation (#3061)
* Polish docs
Signed-off-by: Xuanwo <[email protected]>
* Save work
Signed-off-by: Xuanwo <[email protected]>
* polish
Signed-off-by: Xuanwo <[email protected]>
* save work
Signed-off-by: Xuanwo <[email protected]>
* Polish error context
Signed-off-by: Xuanwo <[email protected]>
* Format code
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/java/src/blocking_operator.rs | 4 +-
bindings/java/src/metadata.rs | 3 +-
bindings/java/src/operator.rs | 1 -
core/src/layers/complete.rs | 487 +++++++++++++++++++--------------
core/src/layers/concurrent_limit.rs | 8 +-
core/src/layers/error_context.rs | 45 +--
core/src/layers/mod.rs | 24 +-
core/src/layers/type_eraser.rs | 26 +-
core/src/raw/adapters/kv/backend.rs | 3 +-
core/src/raw/oio/page/api.rs | 9 +-
core/src/types/writer.rs | 4 +-
11 files changed, 343 insertions(+), 271 deletions(-)
diff --git a/bindings/java/src/blocking_operator.rs
b/bindings/java/src/blocking_operator.rs
index 6b936e349..5b6c28aba 100644
--- a/bindings/java/src/blocking_operator.rs
+++ b/bindings/java/src/blocking_operator.rs
@@ -24,14 +24,14 @@ use jni::objects::JString;
use jni::sys::jlong;
use jni::sys::jstring;
use jni::JNIEnv;
-
use opendal::layers::BlockingLayer;
use opendal::BlockingOperator;
use opendal::Operator;
use opendal::Scheme;
+use crate::get_global_runtime;
+use crate::jmap_to_hashmap;
use crate::Result;
-use crate::{get_global_runtime, jmap_to_hashmap};
#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_BlockingOperator_constructor(
diff --git a/bindings/java/src/metadata.rs b/bindings/java/src/metadata.rs
index 5f2e14ea4..c4f8510d7 100644
--- a/bindings/java/src/metadata.rs
+++ b/bindings/java/src/metadata.rs
@@ -15,7 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use jni::objects::{JClass, JObject};
+use jni::objects::JClass;
+use jni::objects::JObject;
use jni::sys::jboolean;
use jni::sys::jlong;
use jni::JNIEnv;
diff --git a/bindings/java/src/operator.rs b/bindings/java/src/operator.rs
index 43ae74cc4..b06b09067 100644
--- a/bindings/java/src/operator.rs
+++ b/bindings/java/src/operator.rs
@@ -25,7 +25,6 @@ use jni::objects::JValue;
use jni::objects::JValueOwned;
use jni::sys::jlong;
use jni::JNIEnv;
-
use opendal::layers::BlockingLayer;
use opendal::Operator;
use opendal::Scheme;
diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs
index 84b587efa..82e8066e4 100644
--- a/core/src/layers/complete.rs
+++ b/core/src/layers/complete.rs
@@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use std::cmp;
use std::fmt::Debug;
use std::fmt::Formatter;
+use std::io;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
-use std::{cmp, io};
use async_trait::async_trait;
use bytes::Bytes;
@@ -56,12 +57,12 @@ use crate::*;
/// capabilities. CompleteLayer will add those capabilities in
/// a zero cost way.
///
-/// Underlying services will return [`AccessorHint`] to indicate the
+/// Underlying services will return [`AccessorInfo`] to indicate the
/// features that returning readers support.
///
/// - If both `seekable` and `streamable`, return directly.
-/// - If not `streamable`, with [`oio::into_streamable_reader`].
-/// - If not `seekable`, with [`oio::into_reader::by_range`]
+/// - If not `streamable`, with [`oio::into_read_from_stream`].
+/// - If not `seekable`, with [`oio::into_seekable_read_by_range`]
/// - If neither not supported, wrap both by_range and into_streamable.
///
/// All implementations of Reader should be `zero cost`. In our cases,
@@ -73,10 +74,9 @@ use crate::*;
///
/// ### Read is Seekable
///
-/// We use internal `AccessorHint::ReadSeekable` to decide the most
-/// suitable implementations.
+/// We use [`Capability`] to decide the most suitable implementations.
///
-/// If there is a hint that `ReadSeekable`, we will open it with given args
+/// If [`Capability`] `read_can_seek` is true, we will open it with given args
/// directly. Otherwise, we will pick a seekable reader implementation based
/// on input range for it.
///
@@ -92,7 +92,7 @@ use crate::*;
/// We use internal `AccessorHint::ReadStreamable` to decide the most
/// suitable implementations.
///
-/// If there is a hint that `ReadStreamable`, we will use existing reader
+/// If [`Capability`] `read_can_next` is true, we will use existing reader
/// directly. Otherwise, we will use transform this reader as a stream.
///
/// ## List Completion
@@ -100,21 +100,19 @@ use crate::*;
/// There are two styles of list, but not all services support both of
/// them. CompleteLayer will add those capabilities in a zero cost way.
///
-/// Underlying services will return [`AccessorHint`] to indicate the
+/// Underlying services will return [`Capability`] to indicate the
/// features that returning pagers support.
///
-/// - If both `flat` and `hierarchy`, return directly.
-/// - If only `flat`, with [`oio::to_flat_pager`].
-/// - if only `hierarchy`, with [`oio::to_hierarchy_pager`].
-/// - If neither not supported, something must be wrong.
+/// - If both `list_with_delimiter_slash` and `list_without_delimiter`, return
directly.
+/// - If only `list_without_delimiter`, with [`oio::to_flat_pager`].
+/// - if only `list_with_delimiter_slash`, with [`oio::to_hierarchy_pager`].
+/// - If neither not supported, something must be wrong for `list` is true.
///
/// ## Capability Check
///
/// Before performing any operations, `CompleteLayer` will first check
/// the operation against capability of the underlying service. If the
/// operation is not supported, an error will be returned directly.
-///
-/// [`AccessorHint`]: crate::raw::AccessorHint
pub struct CompleteLayer;
impl<A: Accessor> Layer<A> for CompleteLayer {
@@ -128,7 +126,7 @@ impl<A: Accessor> Layer<A> for CompleteLayer {
}
}
-/// Provide reader wrapper for backend.
+/// Provide complete wrapper for backend.
pub struct CompleteReaderAccessor<A: Accessor> {
meta: AccessorInfo,
inner: Arc<A>,
@@ -141,6 +139,16 @@ impl<A: Accessor> Debug for CompleteReaderAccessor<A> {
}
impl<A: Accessor> CompleteReaderAccessor<A> {
+ fn new_unsupported_error(&self, op: impl Into<&'static str>) -> Error {
+ let scheme = self.meta.scheme();
+ let op = op.into();
+ Error::new(
+ ErrorKind::Unsupported,
+ &format!("service {scheme} doesn't support operation {op}"),
+ )
+ .with_operation(op)
+ }
+
async fn complete_reader(
&self,
path: &str,
@@ -148,7 +156,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
) -> Result<(RpRead, CompleteReader<A, A::Reader>)> {
let capability = self.meta.native_capability();
if !capability.read {
- return new_capability_unsupported_error(Operation::Read);
+ return Err(self.new_unsupported_error(Operation::Read));
}
let seekable = capability.read_can_seek;
@@ -201,7 +209,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
) -> Result<(RpRead, CompleteReader<A, A::BlockingReader>)> {
let capability = self.meta.full_capability();
if !capability.read || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingRead);
+ return Err(self.new_unsupported_error(Operation::BlockingRead));
}
let seekable = capability.read_can_seek;
@@ -257,11 +265,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
) -> Result<(RpList, CompletePager<A, A::Pager>)> {
let cap = self.meta.full_capability();
if !cap.list {
- return Err(
- Error::new(ErrorKind::Unsupported, "operation is not
supported")
- .with_context("service", self.meta.scheme())
- .with_operation("list"),
- );
+ return Err(self.new_unsupported_error(Operation::List));
}
let delimiter = args.delimiter();
@@ -306,11 +310,7 @@ impl<A: Accessor> CompleteReaderAccessor<A> {
) -> Result<(RpList, CompletePager<A, A::BlockingPager>)> {
let cap = self.meta.full_capability();
if !cap.list {
- return Err(
- Error::new(ErrorKind::Unsupported, "operation is not
supported")
- .with_context("service", self.meta.scheme())
- .with_operation("list"),
- );
+ return Err(self.new_unsupported_error(Operation::BlockingList));
}
let delimiter = args.delimiter();
@@ -377,51 +377,31 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
meta
}
- async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- self.complete_reader(path, args).await
- }
-
- fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- self.complete_blocking_reader(path, args)
- }
-
- async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
let capability = self.meta.full_capability();
- if !capability.stat {
- return new_capability_unsupported_error(Operation::Stat);
+ if !capability.create_dir {
+ return Err(self.new_unsupported_error(Operation::CreateDir));
}
- self.inner.stat(path, args).await.map(|v| {
- v.map_metadata(|m| {
- let bit = m.bit();
- m.with_bit(bit | Metakey::Complete)
- })
- })
+ self.inner().create_dir(path, args).await
}
- fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- let capability = self.meta.full_capability();
- if !capability.stat || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingStat);
- }
-
- self.inner.blocking_stat(path, args).map(|v| {
- v.map_metadata(|m| {
- let bit = m.bit();
- m.with_bit(bit | Metakey::Complete)
- })
- })
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ self.complete_reader(path, args).await
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
let capability = self.meta.full_capability();
if !capability.write {
- return new_capability_unsupported_error(Operation::Write);
+ return Err(self.new_unsupported_error(Operation::Write));
}
if args.append() && !capability.write_can_append {
return Err(Error::new(
ErrorKind::Unsupported,
- "write with append enabled is not supported",
+ &format!(
+ "service {} doesn't support operation write with append",
+ self.info().scheme()
+ ),
));
}
@@ -453,129 +433,156 @@ impl<A: Accessor> LayeredAccessor for
CompleteReaderAccessor<A> {
Ok((rp, w))
}
- fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
let capability = self.meta.full_capability();
- if !capability.write || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingWrite);
- }
- if args.append() && !capability.write_can_append {
- return Err(Error::new(
- ErrorKind::Unsupported,
- "write with append enabled is not supported",
- ));
+ if !capability.copy {
+ return Err(self.new_unsupported_error(Operation::Copy));
}
- self.inner
- .blocking_write(path, args)
- .map(|(rp, w)| (rp, CompleteWriter::new(w)))
+ self.inner().copy(from, to, args).await
}
- async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ async fn rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
let capability = self.meta.full_capability();
- if !capability.create_dir {
- return new_capability_unsupported_error(Operation::CreateDir);
+ if !capability.rename {
+ return Err(self.new_unsupported_error(Operation::Rename));
}
- self.inner().create_dir(path, args).await
+ self.inner().rename(from, to, args).await
}
- fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.full_capability();
- if !capability.create_dir || !capability.blocking {
- return
new_capability_unsupported_error(Operation::BlockingCreateDir);
+ if !capability.stat {
+ return Err(self.new_unsupported_error(Operation::Stat));
}
- self.inner().blocking_create_dir(path, args)
+ self.inner.stat(path, args).await.map(|v| {
+ v.map_metadata(|m| {
+ let bit = m.bit();
+ m.with_bit(bit | Metakey::Complete)
+ })
+ })
}
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.full_capability();
if !capability.delete {
- return new_capability_unsupported_error(Operation::Delete);
+ return Err(self.new_unsupported_error(Operation::Delete));
}
self.inner().delete(path, args).await
}
- fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
let capability = self.meta.full_capability();
- if !capability.delete || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingDelete);
+ if !capability.list {
+ return Err(self.new_unsupported_error(Operation::List));
}
- self.inner().blocking_delete(path, args)
+ self.complete_list(path, args).await
}
- async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
let capability = self.meta.full_capability();
- if !capability.copy {
- return new_capability_unsupported_error(Operation::Copy);
+ if !capability.batch {
+ return Err(self.new_unsupported_error(Operation::Batch));
}
- self.inner().copy(from, to, args).await
+ self.inner().batch(args).await
}
- fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) ->
Result<RpCopy> {
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
let capability = self.meta.full_capability();
- if !capability.copy || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingCopy);
+ if !capability.presign {
+ return Err(self.new_unsupported_error(Operation::Presign));
}
- self.inner().blocking_copy(from, to, args)
+ self.inner.presign(path, args).await
}
- async fn rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
+ fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
let capability = self.meta.full_capability();
- if !capability.rename {
- return new_capability_unsupported_error(Operation::Rename);
+ if !capability.create_dir || !capability.blocking {
+ return
Err(self.new_unsupported_error(Operation::BlockingCreateDir));
}
- self.inner().rename(from, to, args).await
+ self.inner().blocking_create_dir(path, args)
}
- fn blocking_rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ self.complete_blocking_reader(path, args)
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
let capability = self.meta.full_capability();
- if !capability.rename || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingRename);
+ if !capability.write || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingWrite));
}
- self.inner().blocking_rename(from, to, args)
+ if args.append() && !capability.write_can_append {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ &format!(
+ "service {} doesn't support operation write with append",
+ self.info().scheme()
+ ),
+ ));
+ }
+
+ self.inner
+ .blocking_write(path, args)
+ .map(|(rp, w)| (rp, CompleteWriter::new(w)))
}
- async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) ->
Result<RpCopy> {
let capability = self.meta.full_capability();
- if !capability.list {
- return new_capability_unsupported_error(Operation::List);
+ if !capability.copy || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingCopy));
}
- self.complete_list(path, args).await
+ self.inner().blocking_copy(from, to, args)
}
- fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ fn blocking_rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
let capability = self.meta.full_capability();
- if !capability.list || !capability.blocking {
- return new_capability_unsupported_error(Operation::BlockingList);
+ if !capability.rename || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingRename));
}
- self.complete_blocking_list(path, args)
+ self.inner().blocking_rename(from, to, args)
}
- async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
let capability = self.meta.full_capability();
- if !capability.presign {
- return new_capability_unsupported_error(Operation::Presign);
+ if !capability.stat || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingStat));
}
- self.inner.presign(path, args).await
+ self.inner.blocking_stat(path, args).map(|v| {
+ v.map_metadata(|m| {
+ let bit = m.bit();
+ m.with_bit(bit | Metakey::Complete)
+ })
+ })
}
- async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let capability = self.meta.full_capability();
- if !capability.batch {
- return new_capability_unsupported_error(Operation::Batch);
+ if !capability.delete || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingDelete));
}
- self.inner().batch(args).await
+ self.inner().blocking_delete(path, args)
+ }
+
+ fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ let capability = self.meta.full_capability();
+ if !capability.list || !capability.blocking {
+ return Err(self.new_unsupported_error(Operation::BlockingList));
+ }
+
+ self.complete_blocking_list(path, args)
}
}
@@ -786,13 +793,8 @@ where
}
}
-fn new_capability_unsupported_error<R>(operation: Operation) -> Result<R> {
- Err(Error::new(ErrorKind::Unsupported, "operation is not
supported").with_operation(operation))
-}
-
#[cfg(test)]
mod tests {
- use std::collections::HashMap;
use std::time::Duration;
use async_trait::async_trait;
@@ -801,33 +803,6 @@ mod tests {
use super::*;
- #[derive(Default)]
- struct MockBuilder {
- capability: Capability,
- }
-
- impl MockBuilder {
- fn with_capacity(mut self, capability: Capability) -> Self {
- self.capability = capability;
- self
- }
- }
-
- impl Builder for MockBuilder {
- const SCHEME: Scheme = Scheme::Custom("mock");
- type Accessor = MockService;
-
- fn from_map(_: HashMap<String, String>) -> Self {
- Self::default()
- }
-
- fn build(&mut self) -> Result<Self::Accessor> {
- Ok(MockService {
- capability: self.capability,
- })
- }
- }
-
#[derive(Debug)]
struct MockService {
capability: Capability,
@@ -835,12 +810,12 @@ mod tests {
#[async_trait]
impl Accessor for MockService {
- type Reader = ();
- type BlockingReader = ();
- type Writer = ();
- type BlockingWriter = ();
- type Pager = ();
- type BlockingPager = ();
+ type Reader = oio::Reader;
+ type BlockingReader = oio::BlockingReader;
+ type Writer = oio::Writer;
+ type BlockingWriter = oio::BlockingWriter;
+ type Pager = oio::Pager;
+ type BlockingPager = oio::BlockingPager;
fn info(&self) -> AccessorInfo {
let mut info = AccessorInfo::default();
@@ -849,24 +824,28 @@ mod tests {
info
}
- async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
- Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
+ async fn create_dir(&self, _: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ Ok(RpCreateDir {})
}
async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead,
Self::Reader)> {
- Ok((RpRead::new(0), ()))
+ Ok((RpRead::new(0), Box::new(())))
}
async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- Ok((RpWrite::new(), ()))
+ Ok((RpWrite::new(), Box::new(())))
}
async fn copy(&self, _: &str, _: &str, _: OpCopy) -> Result<RpCopy> {
Ok(RpCopy {})
}
- async fn create_dir(&self, _: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
- Ok(RpCreateDir {})
+ async fn rename(&self, _: &str, _: &str, _: OpRename) ->
Result<RpRename> {
+ Ok(RpRename {})
+ }
+
+ async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> {
+ Ok(RpStat::new(Metadata::new(EntryMode::Unknown)))
}
async fn delete(&self, _: &str, _: OpDelete) -> Result<RpDelete> {
@@ -874,11 +853,7 @@ mod tests {
}
async fn list(&self, _: &str, _: OpList) -> Result<(RpList,
Self::Pager)> {
- Ok((RpList {}, ()))
- }
-
- async fn rename(&self, _: &str, _: &str, _: OpRename) ->
Result<RpRename> {
- Ok(RpRename {})
+ Ok((RpList {}, Box::new(())))
}
async fn presign(&self, _: &str, _: OpPresign) -> Result<RpPresign> {
@@ -890,50 +865,144 @@ mod tests {
}
}
- /// Perform the test against different capability preconditions.
- macro_rules! capability_test {
- ($cap:ident, |$arg:ident| { $($body:tt)* }) => {
- paste::item! {
- #[tokio::test]
- async fn [<test_capability_ $cap>]() {
- let res_builder = |$arg: Operator| async move {
- let res = { $($body)* };
- res.await.err()
- };
-
- let builder =
MockBuilder::default().with_capacity(Capability {
- $cap: false,
- ..Default::default()
- });
- let op = Operator::new(builder).expect("should
build").finish();
- let res = res_builder(op.clone()).await;
- assert_eq!(res.expect("should not be None").kind(),
ErrorKind::Unsupported);
-
- let builder =
MockBuilder::default().with_capacity(Capability {
- $cap: true,
- ..Default::default()
- });
- let op = Operator::new(builder).expect("should
build").finish();
- let res = res_builder(op.clone()).await;
- assert!(res.is_none());
- }
- }
- };
+ fn new_test_operator(capability: Capability) -> Operator {
+ let srv = MockService { capability };
+
+ Operator::from_inner(Arc::new(srv)).layer(CompleteLayer)
+ }
+
+ #[tokio::test]
+ async fn test_read() {
+ let op = new_test_operator(Capability::default());
+ let res = op.read("path").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ read: true,
+ ..Default::default()
+ });
+ let res = op.read("path").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_stat() {
+ let op = new_test_operator(Capability::default());
+ let res = op.stat("path").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ stat: true,
+ ..Default::default()
+ });
+ let res = op.stat("path").await;
+ assert!(res.is_ok())
}
- capability_test!(stat, |op| { op.stat("/path/to/mock_file") });
- capability_test!(read, |op| { op.read("/path/to/mock_file") });
- capability_test!(write, |op| { op.writer("/path/to/mock_file") });
- capability_test!(create_dir, |op| { op.create_dir("/path/to/mock_dir/") });
- capability_test!(delete, |op| { op.delete("/path/to/mock_file") });
- capability_test!(copy, |op| {
- op.copy("/path/to/mock_file", "/path/to/mock_file_2")
- });
- capability_test!(rename, |op| {
- op.rename("/path/to/mock_file", "/path/to/mock_file_2")
- });
- capability_test!(list, |op| { op.lister("/path/to/mock_dir/") });
- capability_test!(presign, |op| {
- op.presign_read("/path/to/mock_file", Duration::from_secs(1))
- });
+ #[tokio::test]
+ async fn test_writer() {
+ let op = new_test_operator(Capability::default());
+ let res = op.write("path", vec![]).await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ write: true,
+ ..Default::default()
+ });
+ let res = op.writer("path").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_create_dir() {
+ let op = new_test_operator(Capability::default());
+ let res = op.create_dir("path/").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ create_dir: true,
+ ..Default::default()
+ });
+ let res = op.create_dir("path/").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_delete() {
+ let op = new_test_operator(Capability::default());
+ let res = op.delete("path").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ delete: true,
+ ..Default::default()
+ });
+ let res = op.delete("path").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_copy() {
+ let op = new_test_operator(Capability::default());
+ let res = op.copy("path_a", "path_b").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ copy: true,
+ ..Default::default()
+ });
+ let res = op.copy("path_a", "path_b").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_rename() {
+ let op = new_test_operator(Capability::default());
+ let res = op.rename("path_a", "path_b").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ rename: true,
+ ..Default::default()
+ });
+ let res = op.rename("path_a", "path_b").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_list() {
+ let op = new_test_operator(Capability::default());
+ let res = op.list("path/").await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ list: true,
+ ..Default::default()
+ });
+ let res = op.list("path/").await;
+ assert!(res.is_ok())
+ }
+
+ #[tokio::test]
+ async fn test_presign() {
+ let op = new_test_operator(Capability::default());
+ let res = op.presign_read("path", Duration::from_secs(1)).await;
+ assert!(res.is_err());
+ assert_eq!(res.unwrap_err().kind(), ErrorKind::Unsupported);
+
+ let op = new_test_operator(Capability {
+ presign: true,
+ ..Default::default()
+ });
+ let res = op.presign_read("path", Duration::from_secs(1)).await;
+ assert!(res.is_ok())
+ }
}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index b09408d45..c20ef4108 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -289,13 +289,13 @@ impl<R: oio::Write> oio::Write for
ConcurrentLimitWrapper<R> {
self.inner.poll_write(cx, bs)
}
- fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
- self.inner.poll_abort(cx)
- }
-
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
self.inner.poll_close(cx)
}
+
+ fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ self.inner.poll_abort(cx)
+ }
}
impl<R: oio::BlockingWrite> oio::BlockingWrite for ConcurrentLimitWrapper<R> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 1fd5e608c..9dae975f9 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -97,13 +97,13 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
self.inner
.read(path, args)
- .map_ok(|(rp, os)| {
+ .map_ok(|(rp, r)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
- inner: os,
+ inner: r,
},
)
})
@@ -119,13 +119,13 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
self.inner
.write(path, args)
- .map_ok(|(rp, os)| {
+ .map_ok(|(rp, w)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
- inner: os,
+ inner: w,
},
)
})
@@ -186,13 +186,13 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
self.inner
.list(path, args)
- .map_ok(|(rp, os)| {
+ .map_ok(|(rp, p)| {
(
rp,
ErrorContextWrapper {
scheme: self.meta.scheme(),
path: path.to_string(),
- inner: os,
+ inner: p,
},
)
})
@@ -204,14 +204,6 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
.await
}
- async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
- self.inner.presign(path, args).await.map_err(|err| {
- err.with_operation(Operation::Presign)
- .with_context("service", self.meta.scheme())
- .with_context("path", path)
- })
- }
-
async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
self.inner
.batch(args)
@@ -238,6 +230,14 @@ impl<A: Accessor> LayeredAccessor for
ErrorContextAccessor<A> {
.await
}
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.inner.presign(path, args).await.map_err(|err| {
+ err.with_operation(Operation::Presign)
+ .with_context("service", self.meta.scheme())
+ .with_context("path", path)
+ })
+ }
+
fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
self.inner.blocking_create_dir(path, args).map_err(|err| {
err.with_operation(Operation::BlockingCreateDir)
@@ -353,6 +353,7 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
+ .with_context("read_buf", buf.len().to_string())
})
}
@@ -379,6 +380,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
+ .with_context("read_buf", buf.len().to_string())
})
}
@@ -387,6 +389,7 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
err.with_operation(ReadOperation::BlockingSeek)
.with_context("service", self.scheme)
.with_context("path", &self.path)
+ .with_context("seek", format!("{pos:?}"))
})
}
@@ -408,20 +411,21 @@ impl<T: oio::Write> oio::Write for ErrorContextWrapper<T>
{
err.with_operation(WriteOperation::Write)
.with_context("service", self.scheme)
.with_context("path", &self.path)
+ .with_context("write_buf", bs.remaining().to_string())
})
}
- fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
- self.inner.poll_abort(cx).map_err(|err| {
- err.with_operation(WriteOperation::Abort)
+ fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ self.inner.poll_close(cx).map_err(|err| {
+ err.with_operation(WriteOperation::Close)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
}
- fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
- self.inner.poll_close(cx).map_err(|err| {
- err.with_operation(WriteOperation::Close)
+ fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ self.inner.poll_abort(cx).map_err(|err| {
+ err.with_operation(WriteOperation::Abort)
.with_context("service", self.scheme)
.with_context("path", &self.path)
})
@@ -434,6 +438,7 @@ impl<T: oio::BlockingWrite> oio::BlockingWrite for
ErrorContextWrapper<T> {
err.with_operation(WriteOperation::BlockingWrite)
.with_context("service", self.scheme)
.with_context("path", &self.path)
+ .with_context("write_buf", bs.remaining().to_string())
})
}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 2c738fcdd..cd9414ba6 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -17,6 +17,15 @@
//! `Layer` is the mechanism to intercept operations.
+mod type_eraser;
+pub(crate) use type_eraser::TypeEraseLayer;
+
+mod error_context;
+pub(crate) use error_context::ErrorContextLayer;
+
+mod complete;
+pub(crate) use complete::CompleteLayer;
+
mod concurrent_limit;
pub use concurrent_limit::ConcurrentLimitLayer;
@@ -61,18 +70,8 @@ mod minitrace;
#[cfg(feature = "layers-minitrace")]
pub use self::minitrace::MinitraceLayer;
-mod type_eraser;
-pub(crate) use type_eraser::TypeEraseLayer;
-
-mod error_context;
-pub(crate) use error_context::ErrorContextLayer;
-
-mod complete;
-pub(crate) use complete::CompleteLayer;
-
#[cfg(feature = "layers-madsim")]
mod madsim;
-
#[cfg(feature = "layers-madsim")]
pub use self::madsim::MadsimLayer;
#[cfg(feature = "layers-madsim")]
@@ -80,11 +79,11 @@ pub use self::madsim::MadsimServer;
#[cfg(feature = "layers-otel-trace")]
mod oteltrace;
+#[cfg(feature = "layers-otel-trace")]
+pub use self::oteltrace::OtelTraceLayer;
#[cfg(feature = "layers-throttle")]
mod throttle;
-#[cfg(feature = "layers-otel-trace")]
-pub use self::oteltrace::OtelTraceLayer;
#[cfg(feature = "layers-throttle")]
pub use self::throttle::ThrottleLayer;
@@ -95,6 +94,5 @@ pub use self::await_tree::AwaitTreeLayer;
#[cfg(feature = "layers-async-backtrace")]
mod async_backtrace;
-
#[cfg(feature = "layers-async-backtrace")]
pub use self::async_backtrace::AsyncBacktraceLayer;
diff --git a/core/src/layers/type_eraser.rs b/core/src/layers/type_eraser.rs
index 8d58221ea..9561700c7 100644
--- a/core/src/layers/type_eraser.rs
+++ b/core/src/layers/type_eraser.rs
@@ -25,6 +25,8 @@ use crate::*;
/// TypeEraseLayer will erase the types on internal accessor.
///
+/// For example, we will erase `Self::Reader` to `oio::Reader` (`Box<dyn
oio::Read>`).
+///
/// # Notes
///
/// TypeEraseLayer is not a public accessible layer that can be used by
@@ -71,12 +73,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
.map(|(rp, r)| (rp, Box::new(r) as oio::Reader))
}
- fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- self.inner
- .blocking_read(path, args)
- .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader))
- }
-
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
self.inner
.write(path, args)
@@ -84,12 +80,6 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
.map(|(rp, w)| (rp, Box::new(w) as oio::Writer))
}
- fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- self.inner
- .blocking_write(path, args)
- .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
- }
-
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
self.inner
.list(path, args)
@@ -97,6 +87,18 @@ impl<A: Accessor> LayeredAccessor for TypeEraseAccessor<A> {
.map(|(rp, p)| (rp, Box::new(p) as oio::Pager))
}
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ self.inner
+ .blocking_read(path, args)
+ .map(|(rp, r)| (rp, Box::new(r) as oio::BlockingReader))
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ self.inner
+ .blocking_write(path, args)
+ .map(|(rp, w)| (rp, Box::new(w) as oio::BlockingWriter))
+ }
+
fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
self.inner
.blocking_list(path, args)
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index f7a40e698..211e9c9ef 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,7 +21,8 @@ use std::task::Context;
use std::task::Poll;
use async_trait::async_trait;
-use bytes::{Bytes, BytesMut};
+use bytes::Bytes;
+use bytes::BytesMut;
use futures::future::BoxFuture;
use futures::FutureExt;
diff --git a/core/src/raw/oio/page/api.rs b/core/src/raw/oio/page/api.rs
index 18bbe4c5d..62a265404 100644
--- a/core/src/raw/oio/page/api.rs
+++ b/core/src/raw/oio/page/api.rs
@@ -72,9 +72,9 @@ pub trait Page: Send + Sync + 'static {
pub type Pager = Box<dyn Page>;
#[async_trait]
-impl Page for Pager {
+impl<P: Page + ?Sized> Page for Box<P> {
async fn next(&mut self) -> Result<Option<Vec<Entry>>> {
- self.as_mut().next().await
+ (**self).next().await
}
}
@@ -107,9 +107,9 @@ pub trait BlockingPage: Send + 'static {
/// BlockingPager is a boxed [`BlockingPage`]
pub type BlockingPager = Box<dyn BlockingPage>;
-impl BlockingPage for BlockingPager {
+impl<P: BlockingPage + ?Sized> BlockingPage for Box<P> {
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
- self.as_mut().next()
+ (**self).next()
}
}
@@ -119,7 +119,6 @@ impl BlockingPage for () {
}
}
-#[async_trait]
impl<P: BlockingPage> BlockingPage for Option<P> {
fn next(&mut self) -> Result<Option<Vec<Entry>>> {
match self {
diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs
index b350f9625..92a6de396 100644
--- a/core/src/types/writer.rs
+++ b/core/src/types/writer.rs
@@ -122,9 +122,7 @@ impl Writer {
///
/// #[tokio::main]
/// async fn sink_example(op: Operator) -> Result<()> {
- /// let mut w = op
- /// .writer_with("path/to/file")
- /// .await?;
+ /// let mut w = op.writer_with("path/to/file").await?;
/// let stream = stream::iter(vec![vec![0; 4096], vec![1;
4096]]).map(Ok);
/// w.sink(stream).await?;
/// w.close().await?;