This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 56d711907 fix: Don't apply blocking layer when service support 
blocking (#3050)
56d711907 is described below

commit 56d711907a664fa896c06279b10bebc96c00a19b
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 16:43:35 2023 +0800

    fix: Don't apply blocking layer when service support blocking (#3050)
    
    * fix: Don't apply blocking layer when service support blocking
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Update core/src/layers/blocking.rs
    
    Co-authored-by: Suyan <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix multipart upload
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix clippy
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Suyan <[email protected]>
---
 core/src/layers/blocking.rs                      | 8 +++-----
 core/src/layers/retry.rs                         | 2 +-
 core/src/raw/oio/write/multipart_upload_write.rs | 6 +++++-
 core/src/raw/oio/write/range_write.rs            | 5 ++++-
 core/tests/behavior/utils.rs                     | 9 ++++++---
 5 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 7dd2d189c..701191490 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -25,13 +25,11 @@ use crate::raw::oio::ReadExt;
 use crate::raw::*;
 use crate::*;
 
-/// Add blocking API support for every operations.
+/// Add blocking API support for non-blocking services.
 ///
-/// # Blocking API
+/// # Notes
 ///
-/// - This layer is auto-added to the operator if it's accessor doesn't 
support blocking APIs.
-///
-/// Tracking issue: #2678
+/// Please only enable this layer when the underlying service does not support 
blocking.
 #[derive(Debug, Clone)]
 pub struct BlockingLayer {
     handle: Handle,
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 7b2549dc0..e99737dd0 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -998,7 +998,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for 
RetryWrapper<R, I> {
                             &err,
                             dur,
                             &[
-                                ("operation", 
WriteOperation::Abort.into_static()),
+                                ("operation", 
WriteOperation::Close.into_static()),
                                 ("path", &self.path),
                             ],
                         );
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs 
b/core/src/raw/oio/write/multipart_upload_write.rs
index 6edeb96a8..4cf597e59 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -195,6 +195,7 @@ where
                     let (w, part) = ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
                     self.parts.push(part?);
+
                     // Replace the cache when last write succeeded
                     let size = bs.remaining();
                     let cb = 
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
@@ -264,8 +265,11 @@ where
                 State::Close(fut) => {
                     let (w, res) = futures::ready!(fut.as_mut().poll(cx));
                     self.state = State::Idle(Some(w));
+                    // We should check res first before clean up cache.
+                    res?;
+
                     self.cache = None;
-                    return Poll::Ready(res);
+                    return Poll::Ready(Ok(()));
                 }
                 State::Init(_) => unreachable!(
                     "MultipartUploadWriter must not go into State::Init during 
poll_close"
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
index 9ae017a5e..6dd9c23b2 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -274,8 +274,11 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
                 State::Abort(fut) => {
                     let (w, res) = ready!(fut.poll_unpin(cx));
                     self.state = State::Idle(Some(w));
+                    // We should check res first before clean up cache.
+                    res?;
+
                     self.buffer = None;
-                    return Poll::Ready(res);
+                    return Poll::Ready(Ok(()));
                 }
             }
         }
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 3f197869e..b5bd0970d 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -82,14 +82,17 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
         op.layer(ChaosLayer::new(0.1))
     };
 
-    let _guard = RUNTIME.enter();
-    let op = op
-        .layer(BlockingLayer::create().expect("blocking layer must be 
created"))
+    let mut op = op
         .layer(LoggingLayer::default().with_backtrace_output(true))
         .layer(TimeoutLayer::new())
         .layer(RetryLayer::new())
         .finish();
 
+    if !op.info().full_capability().blocking {
+        let _guard = RUNTIME.enter();
+        op = op.layer(BlockingLayer::create().expect("blocking layer must be 
created"))
+    }
+
     Some(op)
 }
 

Reply via email to