This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch gdrive-auth in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 257deee83ae80338e9618c1fd0a687f78098fa03 Author: suyanhanx <[email protected]> AuthorDate: Wed Aug 23 22:34:16 2023 +0800 feat(services/gdrive): credential manage Signed-off-by: suyanhanx <[email protected]> --- core/src/services/gdrive/backend.rs | 15 +----- core/src/services/gdrive/builder.rs | 98 +++++++++++++++++++++++++++++++++++-- core/src/services/gdrive/core.rs | 85 +++++++++++++++++++++++++++----- 3 files changed, 167 insertions(+), 31 deletions(-) diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 49ce23314..246e32f33 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -33,20 +33,7 @@ use crate::*; #[derive(Clone, Debug)] pub struct GdriveBackend { - core: Arc<GdriveCore>, -} - -impl GdriveBackend { - pub(crate) fn new(root: String, access_token: String, http_client: HttpClient) -> Self { - GdriveBackend { - core: Arc::new(GdriveCore { - root, - access_token, - client: http_client, - path_cache: Arc::default(), - }), - } - } + pub core: Arc<GdriveCore>, } #[async_trait] diff --git a/core/src/services/gdrive/builder.rs b/core/src/services/gdrive/builder.rs index aa1a4b8d6..eddeb95d6 100644 --- a/core/src/services/gdrive/builder.rs +++ b/core/src/services/gdrive/builder.rs @@ -18,12 +18,17 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; +use chrono::DateTime; +use chrono::Utc; use log::debug; +use tokio::sync::Mutex; use super::backend::GdriveBackend; use crate::raw::normalize_root; use crate::raw::HttpClient; +use crate::services::gdrive::core::{GdriveCore, GdriveSigner}; use crate::Scheme; use crate::*; @@ -106,11 +111,43 @@ impl GdriveBuilder { /// Access token is used for temporary access to the GoogleDrive API. /// /// You can get the access token from [GoogleDrive App Console](https://console.cloud.google.com/apis/credentials) + /// or [GoogleDrive OAuth2 Playground](https://developers.google.com/oauthplayground/) + /// + /// # Note + /// + /// - An access token is valid for 1 hour. + /// - If you want to use the access token for a long time, + /// you can use the refresh token to get a new access token. pub fn access_token(&mut self, access_token: &str) -> &mut Self { self.access_token = Some(access_token.to_string()); self } + /// Refresh token is used for long term access to the GoogleDrive API. + /// + /// You can get the refresh token via OAuth 2.0 Flow of GoogleDrive API. + /// + /// OpenDAL will use this refresh token to get a new access token when the old one is expired. + pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self { + self.refresh_token = Some(refresh_token.to_string()); + self + } + + /// Set the client id for GoogleDrive. + /// + /// This is required for OAuth 2.0 Flow to refresh the access token. + pub fn client_id(&mut self, client_id: &str) -> &mut Self { + self.client_id = Some(client_id.to_string()); + self + } + + /// Set the client secret for GoogleDrive. + /// + /// This is required for OAuth 2.0 Flow with refresh the access token. + pub fn client_secret(&mut self, client_secret: &str) -> &mut Self { + self.client_secret = Some(client_secret.to_string()); + self + } /// Specify the http client that used by this service. /// @@ -134,6 +171,9 @@ impl Builder for GdriveBuilder { map.get("root").map(|v| builder.root(v)); map.get("access_token").map(|v| builder.access_token(v)); + map.get("refresh_token").map(|v| builder.refresh_token(v)); + map.get("client_id").map(|v| builder.client_id(v)); + map.get("client_secret").map(|v| builder.client_secret(v)); builder } @@ -151,9 +191,59 @@ impl Builder for GdriveBuilder { })? }; - match self.access_token.clone() { - Some(access_token) => Ok(GdriveBackend::new(root, access_token, client)), - None => Err(Error::new(ErrorKind::ConfigInvalid, "access_token not set")), - } + let signer = match (self.access_token.take(), self.refresh_token.take()) { + (Some(access_token), None) => GdriveSigner { + access_token, + // We will never expire user specified access token. + expires_in: DateTime::<Utc>::MAX_UTC, + ..Default::default() + }, + (None, Some(refresh_token)) => { + let client_id = self.client_id.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_id must be set when refresh_token is set", + ) + .with_context("service", Scheme::Gdrive) + })?; + let client_secret = self.client_secret.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_secret must be set when refresh_token is set", + ) + .with_context("service", Scheme::Gdrive) + })?; + + GdriveSigner { + refresh_token, + client_id, + client_secret, + ..Default::default() + } + } + (Some(_), Some(_)) => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "access_token and refresh_token cannot be set at the same time", + ) + .with_context("service", Scheme::Gdrive)) + } + (None, None) => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "access_token or refresh_token must be set", + ) + .with_context("service", Scheme::Gdrive)) + } + }; + + Ok(GdriveBackend { + core: Arc::new(GdriveCore { + root, + signer: Arc::new(Mutex::new(signer)), + client, + path_cache: Arc::default(), + }), + }) } } diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index a4dcb9fc3..389d46569 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -39,10 +39,11 @@ use crate::ErrorKind; pub struct GdriveCore { pub root: String, - pub access_token: String, pub client: HttpClient, + pub signer: Arc<Mutex<GdriveSigner>>, + /// Cache the mapping from path to file id /// /// Google Drive uses file id to identify a file. @@ -91,7 +92,7 @@ impl GdriveCore { .body(AsyncBody::default()) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; let resp = self.client.send(req).await?; let status = resp.status(); @@ -151,7 +152,7 @@ impl GdriveCore { )) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; let resp = self.client.send(req).await?; let status = resp.status(); @@ -224,7 +225,7 @@ impl GdriveCore { .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - self.sign(&mut req)?; + self.sign(&mut req).await?; self.client.send(req).await } @@ -257,7 +258,7 @@ impl GdriveCore { ))) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -273,7 +274,7 @@ impl GdriveCore { )) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -287,7 +288,7 @@ impl GdriveCore { let mut req = Request::get(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -315,7 +316,7 @@ impl GdriveCore { } let mut req = req.body(body).map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -330,7 +331,7 @@ impl GdriveCore { .body(AsyncBody::Empty) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -374,7 +375,7 @@ impl GdriveCore { let mut req = multipart.apply(req)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } @@ -397,13 +398,52 @@ impl GdriveCore { .body(AsyncBody::Bytes(body)) .map_err(new_request_build_error)?; - let _ = self.sign(&mut req); + self.sign(&mut req).await?; self.client.send(req).await } - fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { - let auth_header_content = format!("Bearer {}", self.access_token); + pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { + let mut signer = self.signer.lock().await; + + if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { + let value = format!("Bearer {}", signer.access_token) + .parse() + .expect("access token must be valid header value"); + + req.headers_mut().insert(header::AUTHORIZATION, value); + return Ok(()); + } + + let url = format!( + "https://oauth2.googleapis.com/token?refresh_token={}&client_id={}&client_secret={}&grant_type=refresh_token", + signer.refresh_token, signer.client_id, signer.client_secret + ); + + { + let req = Request::post(url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + let token = serde_json::from_slice::<GdriveTokenResponse>(resp_body) + .map_err(new_json_deserialize_error)?; + signer.access_token = token.access_token.clone(); + signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in) + - chrono::Duration::seconds(120); + } + _ => { + return Err(parse_error(resp).await?); + } + } + } + + let auth_header_content = format!("Bearer {}", signer.access_token); req.headers_mut() .insert(header::AUTHORIZATION, auth_header_content.parse().unwrap()); @@ -421,6 +461,25 @@ pub struct GdriveSigner { pub expires_in: DateTime<Utc>, } +impl Default for GdriveSigner { + fn default() -> Self { + GdriveSigner { + access_token: String::new(), + expires_in: DateTime::<Utc>::MIN_UTC, + + refresh_token: String::new(), + client_id: String::new(), + client_secret: String::new(), + } + } +} + +#[derive(Deserialize)] +pub struct GdriveTokenResponse { + access_token: String, + expires_in: i64, +} + // This is the file struct returned by the Google Drive API. // This is a complex struct, but we only add the fields we need. // refer to https://developers.google.com/drive/api/reference/rest/v3/files#File
