This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b87fa5af1 fix: Fix behaivor tests for blocking layer (#2809)
b87fa5af1 is described below

commit b87fa5af1e1367e1a8bf90c4cf561d4545cbe7b6
Author: Xuanwo <[email protected]>
AuthorDate: Tue Aug 8 14:10:55 2023 +0800

    fix: Fix behaivor tests for blocking layer (#2809)
    
    * fix: Fix behaivor tests for blocking layer
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Make nextcloud and owncloud happy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Try fix deadlock in copy non exist file
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix copy and rename not retried
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/service_test_webdav.yml | 18 +++++++--------
 core/src/layers/blocking.rs               |  7 +++---
 core/src/layers/retry.rs                  | 38 +++++++++++++++++++++++++++++++
 core/src/services/dropbox/error.rs        |  1 +
 core/src/services/webdav/backend.rs       | 12 ++++++++++
 core/tests/behavior/blocking_write.rs     | 34 ++++++++++++++++++++++++++-
 core/tests/behavior/utils.rs              |  3 ++-
 7 files changed, 98 insertions(+), 15 deletions(-)

diff --git a/.github/workflows/service_test_webdav.yml 
b/.github/workflows/service_test_webdav.yml
index 7f0467d24..9b130d27e 100644
--- a/.github/workflows/service_test_webdav.yml
+++ b/.github/workflows/service_test_webdav.yml
@@ -170,18 +170,18 @@ jobs:
           NEXTCLOUD_ADMIN_PASSWORD: admin
           REDIS_HOST: redis
         options: >-
-          --health-cmd="curl -f http://localhost"; 
-          --health-interval=10s 
-          --health-timeout=5s 
+          --health-cmd="curl -f http://localhost";
+          --health-interval=10s
+          --health-timeout=5s
           --health-retries=5
-      
+
       redis:
         image: redis
         options: >-
           --health-cmd "redis-cli ping"
           --health-interval 10s
           --health-timeout 5s
-          --health-retries 5   
+          --health-retries 5
 
     steps:
       - uses: actions/checkout@v3
@@ -194,7 +194,7 @@ jobs:
         shell: bash
         working-directory: core
         run: |
-          cargo test webdav
+          cargo test webdav -j=1
         env:
           OPENDAL_WEBDAV_TEST: on
           OPENDAL_WEBDAV_ENDPOINT: http://127.0.0.1:8080/remote.php/webdav/
@@ -229,7 +229,7 @@ jobs:
           --health-cmd "redis-cli ping"
           --health-interval 10s
           --health-timeout 5s
-          --health-retries 5   
+          --health-retries 5
 
     steps:
       - uses: actions/checkout@v3
@@ -242,9 +242,9 @@ jobs:
         shell: bash
         working-directory: core
         run: |
-          cargo test webdav
+          cargo test webdav -j=1
         env:
           OPENDAL_WEBDAV_TEST: on
           OPENDAL_WEBDAV_ENDPOINT: http://127.0.0.1:8080/remote.php/webdav/
           OPENDAL_WEBDAV_USERNAME: admin
-          OPENDAL_WEBDAV_PASSWORD: admin
\ No newline at end of file
+          OPENDAL_WEBDAV_PASSWORD: admin
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index ef492d7eb..a9f8e0df4 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -189,8 +189,7 @@ impl<I> BlockingWrapper<I> {
 
 impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
     fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
-        self.handle
-            .block_on(oio::ReadExt::read(&mut self.inner, buf))
+        self.handle.block_on(self.inner.read(buf))
     }
 
     fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64> {
@@ -204,11 +203,11 @@ impl<I: oio::Read + 'static> oio::BlockingRead for 
BlockingWrapper<I> {
 
 impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.handle.block_on(oio::Write::write(&mut self.inner, bs))
+        self.handle.block_on(self.inner.write(bs))
     }
 
     fn close(&mut self) -> Result<()> {
-        self.handle.block_on(oio::Write::close(&mut self.inner))
+        self.handle.block_on(self.inner.close())
     }
 }
 
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index be21adbd1..c2ba581af 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -612,6 +612,44 @@ impl<A: Accessor, I: RetryInterceptor> LayeredAccessor for 
RetryAccessor<A, I> {
             .map_err(|e| e.set_persistent())
     }
 
+    fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> 
Result<RpCopy> {
+        { || self.inner.blocking_copy(from, to, args.clone()) }
+            .retry(&self.builder)
+            .when(|e| e.is_temporary())
+            .notify(|err, dur| {
+                self.notify.intercept(
+                    err,
+                    dur,
+                    &[
+                        ("operation", Operation::BlockingCopy.into_static()),
+                        ("from", from),
+                        ("to", to),
+                    ],
+                )
+            })
+            .call()
+            .map_err(|e| e.set_persistent())
+    }
+
+    fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> 
Result<RpRename> {
+        { || self.inner.blocking_rename(from, to, args.clone()) }
+            .retry(&self.builder)
+            .when(|e| e.is_temporary())
+            .notify(|err, dur| {
+                self.notify.intercept(
+                    err,
+                    dur,
+                    &[
+                        ("operation", Operation::BlockingRename.into_static()),
+                        ("from", from),
+                        ("to", to),
+                    ],
+                )
+            })
+            .call()
+            .map_err(|e| e.set_persistent())
+    }
+
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
         { || self.inner.blocking_list(path, args.clone()) }
             .retry(&self.builder)
diff --git a/core/src/services/dropbox/error.rs 
b/core/src/services/dropbox/error.rs
index f25f298af..ae731df78 100644
--- a/core/src/services/dropbox/error.rs
+++ b/core/src/services/dropbox/error.rs
@@ -38,6 +38,7 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
     let (mut kind, mut retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+        StatusCode::TOO_MANY_REQUESTS => (ErrorKind::RateLimited, true),
         StatusCode::INTERNAL_SERVER_ERROR
         | StatusCode::BAD_GATEWAY
         | StatusCode::SERVICE_UNAVAILABLE
diff --git a/core/src/services/webdav/backend.rs 
b/core/src/services/webdav/backend.rs
index 64fa3c73b..49b1e981d 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -342,7 +342,19 @@ impl Accessor for WebdavBackend {
         Ok((RpWrite::default(), WebdavWriter::new(self.clone(), args, p)))
     }
 
+    /// # Notes
+    ///
+    /// There is a strange dead lock issues when copying a non-exist file, so 
we will check
+    /// if the source exists first.
+    ///
+    /// For example: <https://github.com/apache/incubator-opendal/pull/2809>
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        if let Err(err) = self.stat(from, OpStat::default()).await {
+            if err.kind() == ErrorKind::NotFound {
+                return Err(err);
+            }
+        }
+
         self.ensure_parent_path(to).await?;
 
         let resp = self.webdav_copy(from, to).await?;
diff --git a/core/tests/behavior/blocking_write.rs 
b/core/tests/behavior/blocking_write.rs
index 3775509c2..62838d8e0 100644
--- a/core/tests/behavior/blocking_write.rs
+++ b/core/tests/behavior/blocking_write.rs
@@ -19,7 +19,7 @@ use std::io::Read;
 use std::io::Seek;
 
 use anyhow::Result;
-use log::debug;
+use log::{debug, warn};
 use sha2::Digest;
 use sha2::Sha256;
 
@@ -112,6 +112,12 @@ pub fn test_blocking_write_with_dir_path(op: 
BlockingOperator) -> Result<()> {
 
 /// Write a single file with special chars should succeed.
 pub fn test_blocking_write_with_special_chars(op: BlockingOperator) -> 
Result<()> {
+    // Ignore test for supabase until 
https://github.com/apache/incubator-opendal/issues/2194 addressed.
+    if op.info().scheme() == opendal::Scheme::Supabase {
+        warn!("ignore test for supabase until 
https://github.com/apache/incubator-opendal/issues/2194 is resolved");
+        return Ok(());
+    }
+
     let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4());
     debug!("Generate a random file: {}", &path);
     let (content, size) = gen_bytes();
@@ -156,6 +162,12 @@ pub fn test_blocking_stat_dir(op: BlockingOperator) -> 
Result<()> {
 
 /// Stat existing file with special chars should return metadata
 pub fn test_blocking_stat_with_special_chars(op: BlockingOperator) -> 
Result<()> {
+    // Ignore test for supabase until 
https://github.com/apache/incubator-opendal/issues/2194 addressed.
+    if op.info().scheme() == opendal::Scheme::Supabase {
+        warn!("ignore test for supabase until 
https://github.com/apache/incubator-opendal/issues/2194 is resolved");
+        return Ok(());
+    }
+
     let path = format!("{} !@#$%^&()_+-=;',.txt", uuid::Uuid::new_v4());
     debug!("Generate a random file: {}", &path);
     let (content, size) = gen_bytes();
@@ -204,6 +216,10 @@ pub fn test_blocking_read_full(op: BlockingOperator) -> 
Result<()> {
 
 /// Read range content should match.
 pub fn test_blocking_read_range(op: BlockingOperator) -> Result<()> {
+    if !op.info().capability().read_with_range {
+        return Ok(());
+    }
+
     let path = uuid::Uuid::new_v4().to_string();
     debug!("Generate a random file: {}", &path);
     let (content, size) = gen_bytes();
@@ -229,6 +245,10 @@ pub fn test_blocking_read_range(op: BlockingOperator) -> 
Result<()> {
 
 /// Read large range content should match.
 pub fn test_blocking_read_large_range(op: BlockingOperator) -> Result<()> {
+    if !op.info().capability().read_with_range {
+        return Ok(());
+    }
+
     let path = uuid::Uuid::new_v4().to_string();
     debug!("Generate a random file: {}", &path);
     let (content, size) = gen_bytes();
@@ -265,6 +285,10 @@ pub fn test_blocking_read_not_exist(op: BlockingOperator) 
-> Result<()> {
 }
 
 pub fn test_blocking_fuzz_range_reader(op: BlockingOperator) -> Result<()> {
+    if !op.info().capability().read_with_range {
+        return Ok(());
+    }
+
     let path = uuid::Uuid::new_v4().to_string();
     debug!("Generate a random file: {}", &path);
     let (content, _) = gen_bytes();
@@ -298,6 +322,10 @@ pub fn test_blocking_fuzz_range_reader(op: 
BlockingOperator) -> Result<()> {
 }
 
 pub fn test_blocking_fuzz_offset_reader(op: BlockingOperator) -> Result<()> {
+    if !op.info().capability().read_with_range {
+        return Ok(());
+    }
+
     let path = uuid::Uuid::new_v4().to_string();
     debug!("Generate a random file: {}", &path);
     let (content, _) = gen_bytes();
@@ -331,6 +359,10 @@ pub fn test_blocking_fuzz_offset_reader(op: 
BlockingOperator) -> Result<()> {
 }
 
 pub fn test_blocking_fuzz_part_reader(op: BlockingOperator) -> Result<()> {
+    if !op.info().capability().read_with_range {
+        return Ok(());
+    }
+
     let path = uuid::Uuid::new_v4().to_string();
     debug!("Generate a random file: {}", &path);
     let (content, size) = gen_bytes();
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 62b6eccf2..5913a11ae 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -28,6 +28,7 @@ use futures::Future;
 use libtest_mimic::Failed;
 use libtest_mimic::Trial;
 use log::debug;
+use opendal::layers::BlockingLayer;
 use opendal::layers::LoggingLayer;
 use opendal::layers::RetryLayer;
 use opendal::layers::TimeoutLayer;
@@ -83,7 +84,7 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
 
     let _guard = RUNTIME.enter();
     let op = op
-        // .layer(BlockingLayer::create().expect("blocking layer must be 
created"))
+        .layer(BlockingLayer::create().expect("blocking layer must be 
created"))
         .layer(LoggingLayer::default())
         .layer(TimeoutLayer::new())
         .layer(RetryLayer::new())

Reply via email to