This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch migrate-context-http-client in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 7db86e14c8aca2e2809389c06e6180cf539ce08c Author: Xuanwo <[email protected]> AuthorDate: Sat Mar 1 15:01:46 2025 +0800 feat: Polish context related APIs Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/blocking.rs | 5 +- core/src/layers/complete.rs | 3 +- core/src/layers/immutable_index.rs | 3 +- core/src/raw/accessor.rs | 147 +++++++++++++++++++++++++++++-------- core/src/raw/http_util/client.rs | 7 +- core/src/raw/oio/list/flat_list.rs | 5 +- 6 files changed, 132 insertions(+), 38 deletions(-) diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index 767900aeb..a3ddeffd9 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -145,7 +145,10 @@ impl<A: Access> Layer<A> for BlockingLayer { fn layer(&self, inner: A) -> Self::LayeredAccess { let info = inner.info(); - info.update_full_capability(|cap| cap.blocking = true); + info.update_full_capability(|mut cap| { + cap.blocking = true; + cap + }); BlockingAccessor { inner, diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index ac5b3db69..6ab67e207 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -104,12 +104,13 @@ impl<A: Access> Layer<A> for CompleteLayer { fn layer(&self, inner: A) -> Self::LayeredAccess { let info = inner.info(); - info.update_full_capability(|cap| { + info.update_full_capability(|mut cap| { if cap.list && cap.write_can_empty { cap.create_dir = true; } // write operations should always return content length cap.write_has_content_length = true; + cap }); CompleteAccessor { diff --git a/core/src/layers/immutable_index.rs b/core/src/layers/immutable_index.rs index fe3c410b7..44b6c96da 100644 --- a/core/src/layers/immutable_index.rs +++ b/core/src/layers/immutable_index.rs @@ -74,9 +74,10 @@ impl<A: Access> Layer<A> for ImmutableIndexLayer { fn layer(&self, inner: A) -> Self::LayeredAccess { let info = inner.info(); - info.update_full_capability(|cap| { + info.update_full_capability(|mut cap| { cap.list = true; cap.list_with_recursive = true; + cap }); ImmutableIndexAccessor { diff --git a/core/src/raw/accessor.rs b/core/src/raw/accessor.rs index aad6de4e3..fa7e0f24d 100644 --- a/core/src/raw/accessor.rs +++ b/core/src/raw/accessor.rs @@ -17,6 +17,7 @@ use std::fmt::Debug; use std::future::ready; +use std::mem; use std::sync::Arc; use futures::Future; @@ -818,6 +819,15 @@ struct AccessorInfoInner { /// within the same operator, access layers, and services use the same instance of `AccessorInfo`. /// This is especially important for `HttpClient` and `Executor`. /// +/// ## Panic Safety +/// +/// All methods provided by `AccessorInfo` will safely handle lock poisoning scenarios. +/// If the inner `RwLock` is poisoned (which happens when another thread panicked while holding +/// the write lock), this method will gracefully continue execution. +/// +/// - For read operations, the method will return the current state. +/// - For write operations, the method will do nothing. +/// /// ## Maintain Notes /// /// We are using `std::sync::RwLock` to provide thread-safe access to the inner data. @@ -851,6 +861,11 @@ pub struct AccessorInfo { impl AccessorInfo { /// [`Scheme`] of backend. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current scheme. pub fn scheme(&self) -> Scheme { match self.inner.read() { Ok(v) => v.scheme, @@ -859,16 +874,26 @@ impl AccessorInfo { } /// Set [`Scheme`] for backend. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation + /// rather than propagating the panic. pub fn set_scheme(&self, scheme: Scheme) -> &Self { - match self.inner.write() { - Ok(mut v) => v.scheme = scheme, - Err(mut err) => err.get_mut().scheme = scheme, + if let Ok(mut v) = self.inner.write() { + v.scheme = scheme; } self } /// Root of backend, will be in format like `/path/to/dir/` + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current root. pub fn root(&self) -> String { match self.inner.read() { Ok(v) => v.root.clone(), @@ -879,10 +904,15 @@ impl AccessorInfo { /// Set root for backend. /// /// Note: input root must be normalized. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation + /// rather than propagating the panic. pub fn set_root(&self, root: &str) -> &Self { - match self.inner.write() { - Ok(mut v) => v.root = root.to_string(), - Err(mut err) => err.get_mut().root = root.to_string(), + if let Ok(mut v) = self.inner.write() { + v.root = root.to_string(); } self @@ -894,6 +924,11 @@ impl AccessorInfo { /// /// - name for `s3` => bucket name /// - name for `azblob` => container name + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current scheme. pub fn name(&self) -> String { match self.inner.read() { Ok(v) => v.name.to_string(), @@ -902,16 +937,26 @@ impl AccessorInfo { } /// Set name of this backend. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation + /// rather than propagating the panic. pub fn set_name(&self, name: &str) -> &Self { - match self.inner.write() { - Ok(mut v) => v.name = name.to_string(), - Err(mut err) => err.get_mut().name = name.to_string(), + if let Ok(mut v) = self.inner.write() { + v.name = name.to_string() } self } /// Get backend's native capabilities. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current native capability. pub fn native_capability(&self) -> Capability { match self.inner.read() { Ok(v) => v.native_capability, @@ -925,22 +970,27 @@ impl AccessorInfo { /// /// Set native capability will also flush the full capability. The only way to change /// full_capability is via `update_full_capability`. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation + /// rather than propagating the panic. pub fn set_native_capability(&self, capability: Capability) -> &Self { - match self.inner.write() { - Ok(mut v) => { - v.native_capability = capability; - v.full_capability = capability; - } - Err(mut err) => { - err.get_mut().native_capability = capability; - err.get_mut().full_capability = capability; - } + if let Ok(mut v) = self.inner.write() { + v.native_capability = capability; + v.full_capability = capability; } self } /// Get service's full capabilities. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current native capability. pub fn full_capability(&self) -> Capability { match self.inner.read() { Ok(v) => v.full_capability, @@ -949,14 +999,26 @@ impl AccessorInfo { } /// Get service's full capabilities. - pub fn update_full_capability(&self, mut f: impl FnMut(&mut Capability)) { - match self.inner.write() { - Ok(mut v) => f(&mut v.full_capability), - Err(mut err) => f(&mut err.get_mut().full_capability), + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation + /// rather than propagating the panic. + pub fn update_full_capability(&self, f: impl FnOnce(Capability) -> Capability) -> &Self { + if let Ok(mut v) = self.inner.write() { + v.full_capability = f(v.full_capability); } + + self } /// Get http client from the context. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current http client. pub fn http_client(&self) -> HttpClient { match self.inner.read() { Ok(v) => v.http_client.clone(), @@ -964,17 +1026,31 @@ impl AccessorInfo { } } - /// Set http client for the context. - pub fn set_http_client(&self, client: HttpClient) -> &Self { - match self.inner.write() { - Ok(mut v) => v.http_client = client, - Err(mut err) => err.get_mut().http_client = client, + /// Update http client for the context. + /// + /// # Note + /// + /// Requests must be forwarded to the old HTTP client after the update. Otherwise, features such as retry, tracing, and metrics may not function properly. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation. + pub fn update_http_client(&self, f: impl FnOnce(HttpClient) -> HttpClient) -> &Self { + if let Ok(mut v) = self.inner.write() { + let client = mem::take(&mut v.http_client); + v.http_client = f(client); } self } /// Get executor from the context. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply returning the current executor. pub fn executor(&self) -> Executor { match self.inner.read() { Ok(v) => v.executor.clone(), @@ -983,10 +1059,19 @@ impl AccessorInfo { } /// Set executor for the context. - pub fn set_executor(&self, executor: Executor) -> &Self { - match self.inner.write() { - Ok(mut v) => v.executor = executor, - Err(mut err) => err.get_mut().executor = executor, + /// + /// # Note + /// + /// Tasks must be forwarded to the old executor after the update. Otherwise, features such as retry, timeout, and metrics may not function properly. + /// + /// # Panic Safety + /// + /// This method safely handles lock poisoning scenarios. If the inner `RwLock` is poisoned, + /// this method will gracefully continue execution by simply skipping the update operation. + pub fn update_executor(&self, f: impl FnOnce(Executor) -> Executor) -> &Self { + if let Ok(mut v) = self.inner.write() { + let executor = mem::take(&mut v.executor); + v.executor = f(executor); } self diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 6094353f1..f0943b295 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -60,15 +60,16 @@ impl Debug for HttpClient { impl Default for HttpClient { fn default() -> Self { - Self::new().expect("create http client must succeed") + Self { + fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()), + } } } impl HttpClient { /// Create a new http client in async context. pub fn new() -> Result<Self> { - let fetcher = Arc::new(GLOBAL_REQWEST_CLIENT.clone()); - Ok(Self { fetcher }) + Ok(Self::default()) } /// Construct `Self` with given [`reqwest::Client`] diff --git a/core/src/raw/oio/list/flat_list.rs b/core/src/raw/oio/list/flat_list.rs index 90dc7736a..b368d2aea 100644 --- a/core/src/raw/oio/list/flat_list.rs +++ b/core/src/raw/oio/list/flat_list.rs @@ -212,7 +212,10 @@ mod tests { fn info(&self) -> Arc<AccessorInfo> { let am = AccessorInfo::default(); - am.update_full_capability(|cap| cap.list = true); + am.update_full_capability(|mut cap| { + cap.list = true; + cap + }); am.into() }
