This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 56d711907 fix: Don't apply blocking layer when service support
blocking (#3050)
56d711907 is described below
commit 56d711907a664fa896c06279b10bebc96c00a19b
Author: Xuanwo <[email protected]>
AuthorDate: Wed Sep 13 16:43:35 2023 +0800
fix: Don't apply blocking layer when service support blocking (#3050)
* fix: Don't apply blocking layer when service support blocking
Signed-off-by: Xuanwo <[email protected]>
* Update core/src/layers/blocking.rs
Co-authored-by: Suyan <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
* Fix multipart upload
Signed-off-by: Xuanwo <[email protected]>
* Fix clippy
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Suyan <[email protected]>
---
core/src/layers/blocking.rs | 8 +++-----
core/src/layers/retry.rs | 2 +-
core/src/raw/oio/write/multipart_upload_write.rs | 6 +++++-
core/src/raw/oio/write/range_write.rs | 5 ++++-
core/tests/behavior/utils.rs | 9 ++++++---
5 files changed, 19 insertions(+), 11 deletions(-)
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 7dd2d189c..701191490 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -25,13 +25,11 @@ use crate::raw::oio::ReadExt;
use crate::raw::*;
use crate::*;
-/// Add blocking API support for every operations.
+/// Add blocking API support for non-blocking services.
///
-/// # Blocking API
+/// # Notes
///
-/// - This layer is auto-added to the operator if it's accessor doesn't
support blocking APIs.
-///
-/// Tracking issue: #2678
+/// Please only enable this layer when the underlying service does not support
blocking.
#[derive(Debug, Clone)]
pub struct BlockingLayer {
handle: Handle,
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 7b2549dc0..e99737dd0 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -998,7 +998,7 @@ impl<R: oio::Write, I: RetryInterceptor> oio::Write for
RetryWrapper<R, I> {
&err,
dur,
&[
- ("operation",
WriteOperation::Abort.into_static()),
+ ("operation",
WriteOperation::Close.into_static()),
("path", &self.path),
],
);
diff --git a/core/src/raw/oio/write/multipart_upload_write.rs
b/core/src/raw/oio/write/multipart_upload_write.rs
index 6edeb96a8..4cf597e59 100644
--- a/core/src/raw/oio/write/multipart_upload_write.rs
+++ b/core/src/raw/oio/write/multipart_upload_write.rs
@@ -195,6 +195,7 @@ where
let (w, part) = ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
self.parts.push(part?);
+
// Replace the cache when last write succeeded
let size = bs.remaining();
let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
@@ -264,8 +265,11 @@ where
State::Close(fut) => {
let (w, res) = futures::ready!(fut.as_mut().poll(cx));
self.state = State::Idle(Some(w));
+ // We should check res first before clean up cache.
+ res?;
+
self.cache = None;
- return Poll::Ready(res);
+ return Poll::Ready(Ok(()));
}
State::Init(_) => unreachable!(
"MultipartUploadWriter must not go into State::Init during
poll_close"
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 9ae017a5e..6dd9c23b2 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -274,8 +274,11 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
State::Abort(fut) => {
let (w, res) = ready!(fut.poll_unpin(cx));
self.state = State::Idle(Some(w));
+ // We should check res first before clean up cache.
+ res?;
+
self.buffer = None;
- return Poll::Ready(res);
+ return Poll::Ready(Ok(()));
}
}
}
diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs
index 3f197869e..b5bd0970d 100644
--- a/core/tests/behavior/utils.rs
+++ b/core/tests/behavior/utils.rs
@@ -82,14 +82,17 @@ pub fn init_service<B: Builder>() -> Option<Operator> {
op.layer(ChaosLayer::new(0.1))
};
- let _guard = RUNTIME.enter();
- let op = op
- .layer(BlockingLayer::create().expect("blocking layer must be
created"))
+ let mut op = op
.layer(LoggingLayer::default().with_backtrace_output(true))
.layer(TimeoutLayer::new())
.layer(RetryLayer::new())
.finish();
+ if !op.info().full_capability().blocking {
+ let _guard = RUNTIME.enter();
+ op = op.layer(BlockingLayer::create().expect("blocking layer must be
created"))
+ }
+
Some(op)
}