This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 45345e4f fix(catalog/rest): Using async lock in token to avoid
blocking runtime (#1223)
45345e4f is described below
commit 45345e4f722884de01a17dbd90b4d9e57870b973
Author: Xuanwo <[email protected]>
AuthorDate: Wed Apr 16 20:51:58 2025 +0800
fix(catalog/rest): Using async lock in token to avoid blocking runtime
(#1223)
## Which issue does this PR close?
We used to think it was acceptable to use a blocking lock in the token
since we were not crossing an await boundary. However, our users
reported that this can cause the runtime to hang completely if multiple
catalog instances try to acquire the token concurrently.
## What changes are included in this PR?
This PR fixed it by using an async lock instead.
## Are these changes tested?
Unit tests.
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Renjie Liu <[email protected]>
---
crates/catalog/rest/src/client.rs | 13 +++++--------
1 file changed, 5 insertions(+), 8 deletions(-)
diff --git a/crates/catalog/rest/src/client.rs
b/crates/catalog/rest/src/client.rs
index 31b52187..36069c3e 100644
--- a/crates/catalog/rest/src/client.rs
+++ b/crates/catalog/rest/src/client.rs
@@ -17,13 +17,13 @@
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
-use std::sync::Mutex;
use http::StatusCode;
use iceberg::{Error, ErrorKind, Result};
use reqwest::header::HeaderMap;
use reqwest::{Client, IntoUrl, Method, Request, RequestBuilder, Response};
use serde::de::DeserializeOwned;
+use tokio::sync::Mutex;
use crate::types::{ErrorResponse, TokenResponse};
use crate::RestCatalogConfig;
@@ -79,10 +79,7 @@ impl HttpClient {
.unwrap_or(self.extra_headers);
Ok(HttpClient {
client: cfg.client().unwrap_or(self.client),
- token: Mutex::new(
- cfg.token()
- .or_else(|| self.token.into_inner().ok().flatten()),
- ),
+ token: Mutex::new(cfg.token().or_else(|| self.token.into_inner())),
token_endpoint: (!cfg.get_token_endpoint().is_empty())
.then(|| cfg.get_token_endpoint())
.unwrap_or(self.token_endpoint),
@@ -102,7 +99,7 @@ impl HttpClient {
.build()
.unwrap();
self.authenticate(&mut req).await.ok();
- self.token.lock().unwrap().clone()
+ self.token.lock().await.clone()
}
/// Authenticate the request by filling token.
@@ -116,7 +113,7 @@ impl HttpClient {
/// Support refreshing token while needed.
async fn authenticate(&self, req: &mut Request) -> Result<()> {
// Clone the token from lock without holding the lock for entire
function.
- let token = { self.token.lock().expect("lock poison").clone() };
+ let token = self.token.lock().await.clone();
if self.credential.is_none() && token.is_none() {
return Ok(());
@@ -197,7 +194,7 @@ impl HttpClient {
}?;
let token = auth_res.access_token;
// Update token.
- *self.token.lock().expect("lock poison") = Some(token.clone());
+ *self.token.lock().await = Some(token.clone());
// Insert token in request.
req.headers_mut().insert(
http::header::AUTHORIZATION,