This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-timeout-layer in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 06aee752657a3056135b3ef90d65dc1af7c17375 Author: Xuanwo <[email protected]> AuthorDate: Tue Jan 23 23:04:58 2024 +0800 Add test for timeout Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/timeout.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/core/src/layers/timeout.rs b/core/src/layers/timeout.rs index a8957bca77..cd227283d5 100644 --- a/core/src/layers/timeout.rs +++ b/core/src/layers/timeout.rs @@ -351,3 +351,112 @@ impl<R: oio::List> oio::List for TimeoutWrapper<R> { Poll::Ready(v) } } + +#[cfg(test)] +mod tests { + use crate::layers::{TimeoutLayer, TypeEraseLayer}; + use crate::raw::oio::ReadExt; + use crate::raw::*; + use crate::*; + use async_trait::async_trait; + use bytes::Bytes; + use std::io::SeekFrom; + use std::sync::Arc; + use std::task::{Context, Poll}; + use std::time::Duration; + use tokio::time::{sleep, timeout}; + + #[derive(Debug, Clone, Default)] + struct MockService; + + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] + impl Accessor for MockService { + type Reader = MockReader; + type Writer = (); + type Lister = (); + type BlockingReader = (); + type BlockingWriter = (); + type BlockingLister = (); + + fn info(&self) -> AccessorInfo { + let mut am = AccessorInfo::default(); + am.set_native_capability(Capability { + read: true, + delete: true, + ..Default::default() + }); + + am + } + + /// This function will build a reader that always return pending. + async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { + Ok((RpRead::new(), MockReader)) + } + + /// This function will never return. + async fn delete(&self, _: &str, _: OpDelete) -> Result<RpDelete> { + sleep(Duration::from_secs(u64::MAX)).await; + + Ok(RpDelete::default()) + } + } + + #[derive(Debug, Clone, Default)] + struct MockReader; + + impl oio::Read for MockReader { + fn poll_read(&mut self, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> { + Poll::Pending + } + + fn poll_seek(&mut self, _: &mut Context<'_>, _: SeekFrom) -> Poll<Result<u64>> { + Poll::Pending + } + + fn poll_next(&mut self, _: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + Poll::Pending + } + } + + #[tokio::test] + async fn test_operation_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1))); + + let fut = async { + let res = op.delete("test").await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") + } + + #[tokio::test] + async fn test_io_timeout() { + let acc = Arc::new(TypeEraseLayer.layer(MockService)) as FusedAccessor; + let op = Operator::from_inner(acc) + .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1))); + + let fut = async { + let mut reader = op.reader("test").await.unwrap(); + + let res = reader.read(&mut [0; 4]).await; + assert!(res.is_err()); + let err = res.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::Unexpected); + assert!(err.to_string().contains("timeout")) + }; + + timeout(Duration::from_secs(2), fut) + .await + .expect("this test should not exceed 2 seconds") + } +}
