This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 45345e4f fix(catalog/rest): Using async lock in token to avoid 
blocking runtime (#1223)
45345e4f is described below

commit 45345e4f722884de01a17dbd90b4d9e57870b973
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 16 20:51:58 2025 +0800

    fix(catalog/rest): Using async lock in token to avoid blocking runtime 
(#1223)
    
    ## Which issue does this PR close?
    
    We used to think it was acceptable to use a blocking lock in the token
    since we were not crossing an await boundary. However, our users
    reported that this can cause the runtime to hang completely if multiple
    catalog instances try to acquire the token concurrently.
    
    ## What changes are included in this PR?
    
    This PR fixed it by using an async lock instead.
    
    ## Are these changes tested?
    
    Unit tests.
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/catalog/rest/src/client.rs | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/crates/catalog/rest/src/client.rs 
b/crates/catalog/rest/src/client.rs
index 31b52187..36069c3e 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -17,13 +17,13 @@
 
 use std::collections::HashMap;
 use std::fmt::{Debug, Formatter};
-use std::sync::Mutex;
 
 use http::StatusCode;
 use iceberg::{Error, ErrorKind, Result};
 use reqwest::header::HeaderMap;
 use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
 use serde::de::DeserializeOwned;
+use tokio::sync::Mutex;
 
 use crate::types::{ErrorResponse, TokenResponse};
 use crate::RestCatalogConfig;
@@ -79,10 +79,7 @@ impl HttpClient {
             .unwrap_or(self.extra_headers);
         Ok(HttpClient {
             client: cfg.client().unwrap_or(self.client),
-            token: Mutex::new(
-                cfg.token()
-                    .or_else(|| self.token.into_inner().ok().flatten()),
-            ),
+            token: Mutex::new(cfg.token().or_else(|| self.token.into_inner())),
             token_endpoint: (!cfg.get_token_endpoint().is_empty())
                 .then(|| cfg.get_token_endpoint())
                 .unwrap_or(self.token_endpoint),
@@ -102,7 +99,7 @@ impl HttpClient {
             .build()
             .unwrap();
         self.authenticate(&mut req).await.ok();
-        self.token.lock().unwrap().clone()
+        self.token.lock().await.clone()
     }
 
     /// Authenticate the request by filling token.
@@ -116,7 +113,7 @@ impl HttpClient {
     /// Support refreshing token while needed.
     async fn authenticate(&self, req: &mut Request) -> Result<()> {
         // Clone the token from lock without holding the lock for entire 
function.
-        let token = { self.token.lock().expect("lock poison").clone() };
+        let token = self.token.lock().await.clone();
 
         if self.credential.is_none() && token.is_none() {
             return Ok(());
@@ -197,7 +194,7 @@ impl HttpClient {
         }?;
         let token = auth_res.access_token;
         // Update token.
-        *self.token.lock().expect("lock poison") = Some(token.clone());
+        *self.token.lock().await = Some(token.clone());
         // Insert token in request.
         req.headers_mut().insert(
             http::header::AUTHORIZATION,

Reply via email to