This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch dropbox-refresh-token in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit f1d1d44eb239c66e58db2705b596af2b67eab0d7 Author: Xuanwo <[email protected]> AuthorDate: Fri Jul 7 12:46:40 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- core/src/services/dropbox/builder.rs | 103 +++++++++++++++++++++++++---- core/src/services/dropbox/core.rs | 121 ++++++++++++++++++++++++++++------- 2 files changed, 190 insertions(+), 34 deletions(-) diff --git a/core/src/services/dropbox/builder.rs b/core/src/services/dropbox/builder.rs index 373c83a9c..47621f7ae 100644 --- a/core/src/services/dropbox/builder.rs +++ b/core/src/services/dropbox/builder.rs @@ -20,8 +20,13 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use chrono::DateTime; +use chrono::Utc; +use tokio::sync::Mutex; + use super::backend::DropboxBackend; use super::core::DropboxCore; +use super::core::DropboxSigner; use crate::raw::*; use crate::*; @@ -72,8 +77,12 @@ use crate::*; #[derive(Default)] pub struct DropboxBuilder { - access_token: Option<String>, root: Option<String>, + access_token: Option<String>, + refresh_token: Option<String>, + client_id: Option<String>, + client_secret: Option<String>, + http_client: Option<HttpClient>, } @@ -84,15 +93,47 @@ impl Debug for DropboxBuilder { } impl DropboxBuilder { - /// default: no access token, which leads to failure + /// Set the root directory for dropbox. + /// + /// Defautl to `/` if not set. + pub fn root(&mut self, root: &str) -> &mut Self { + self.root = Some(root.to_string()); + self + } + + /// Access token is used for temporary access to the Dropbox API. + /// + /// You can get the access token from [Dropbox App Console](https://www.dropbox.com/developers/apps) + /// + /// NOTE: this token will be expired in 4 hours. If you are trying to use dropbox services in a long time, please set a refresh_token instead. pub fn access_token(&mut self, access_token: &str) -> &mut Self { self.access_token = Some(access_token.to_string()); self } - /// default: no root path, which leads to failure - pub fn root(&mut self, root: &str) -> &mut Self { - self.root = Some(root.to_string()); + /// Refersh token is used for long term access to the Dropbox API. + /// + /// You can get the refresh token via OAuth2.0 Flow of dropbox. + /// + /// OpenDAL will use this refresh token to get a new access token when the old one is expired. + pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self { + self.refresh_token = Some(refresh_token.to_string()); + self + } + + /// Set the client id for dropbox. + /// + /// This is required for OAuth2.0 Flow with refresh token. + pub fn client_id(&mut self, client_id: &str) -> &mut Self { + self.client_id = Some(client_id.to_string()); + self + } + + /// Set the client secret for dropbox. + /// + /// This is required for OAuth2.0 Flow with refresh token. + pub fn client_secret(&mut self, client_secret: &str) -> &mut Self { + self.client_secret = Some(client_secret.to_string()); self } @@ -116,6 +157,9 @@ impl Builder for DropboxBuilder { let mut builder = Self::default(); map.get("root").map(|v| builder.root(v)); map.get("access_token").map(|v| builder.access_token(v)); + map.get("refresh_token").map(|v| builder.refresh_token(v)); + map.get("client_id").map(|v| builder.client_id(v)); + map.get("client_secret").map(|v| builder.client_secret(v)); builder } @@ -129,20 +173,57 @@ impl Builder for DropboxBuilder { .with_context("service", Scheme::Dropbox) })? }; - let token = match self.access_token.clone() { - Some(access_token) => access_token, - None => { + + let signer = match (self.access_token.take(), self.refresh_token.take()) { + (Some(access_token), None) => DropboxSigner { + access_token, + // We will never expire user specifed token. + expires_in: DateTime::<Utc>::MAX_UTC, + ..Default::default() + }, + (None, Some(refresh_token)) => { + let client_id = self.client_id.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_id must be set when refresh_token is set", + ) + .with_context("service", Scheme::Dropbox) + })?; + let client_secret = self.client_secret.take().ok_or_else(|| { + Error::new( + ErrorKind::ConfigInvalid, + "client_secret must be set when refresh_token is set", + ) + .with_context("service", Scheme::Dropbox) + })?; + + DropboxSigner { + refresh_token, + client_id, + client_secret, + ..Default::default() + } + } + (Some(_), Some(_)) => { + return Err(Error::new( + ErrorKind::ConfigInvalid, + "access_token and refresh_token can not be set at the same time", + ) + .with_context("service", Scheme::Dropbox)) + } + (None, None) => { return Err(Error::new( ErrorKind::ConfigInvalid, - "access_token is required", - )) + "access_token or refresh_token must be set", + ) + .with_context("service", Scheme::Dropbox)) } }; Ok(DropboxBackend { core: Arc::new(DropboxCore { root, - token, + signer: Arc::new(Mutex::new(signer)), client, }), }) diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 91efc14af..3795be44b 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -18,15 +18,22 @@ use std::default::Default; use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::Arc; use bytes::Bytes; +use chrono::DateTime; +use chrono::Utc; use http::header; +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; use http::Request; use http::Response; use serde::Deserialize; use serde::Serialize; +use tokio::sync::Mutex; use crate::raw::build_rooted_abs_path; +use crate::raw::new_json_deserialize_error; use crate::raw::new_json_serialize_error; use crate::raw::new_request_build_error; use crate::raw::AsyncBody; @@ -35,7 +42,7 @@ use crate::raw::IncomingAsyncBody; use crate::types::Result; pub struct DropboxCore { - pub token: String, + pub signer: Arc<Mutex<DropboxSigner>>, pub client: HttpClient, pub root: String, } @@ -64,7 +71,7 @@ impl DropboxCore { serde_json::to_string(&download_args).map_err(new_json_serialize_error)?; let mut request = Request::post(&url) .header("Dropbox-API-Arg", request_payload) - .header(header::CONTENT_LENGTH, 0) + .header(CONTENT_LENGTH, 0) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; @@ -86,10 +93,10 @@ impl DropboxCore { }; let mut request_builder = Request::post(&url); if let Some(size) = size { - request_builder = request_builder.header(header::CONTENT_LENGTH, size); + request_builder = request_builder.header(CONTENT_LENGTH, size); } request_builder = request_builder.header( - header::CONTENT_TYPE, + CONTENT_TYPE, content_type.unwrap_or("application/octet-stream"), ); @@ -114,8 +121,8 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); let mut request = Request::post(&url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; @@ -132,8 +139,8 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); let mut request = Request::post(&url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; @@ -151,8 +158,8 @@ impl DropboxCore { let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); let mut request = Request::post(&url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::CONTENT_LENGTH, bs.len()) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) .body(AsyncBody::Bytes(bs)) .map_err(new_request_build_error)?; @@ -162,7 +169,46 @@ impl DropboxCore { } pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { - let value = format!("Bearer {}", self.token) + let mut signer = self.signer.lock().await; + + // Access token is valid, use it directly. + if !signer.access_token.is_empty() && signer.expires_in > Utc::now() { + let value = format!("Bearer {}", signer.access_token) + .parse() + .expect("token must be valid header"); + req.headers_mut().insert(header::AUTHORIZATION, value); + } + + // Refresh invalid token. + let url = "https://api.dropboxapi.com/oauth2/token".to_string(); + + let content = format!( + "grant_type=refresh_token&refresh_token={}&client_id={}&client_secret={}", + signer.refresh_token, signer.client_id, signer.client_secret + ); + let bs = Bytes::from(content); + + let request = Request::post(&url) + .header(CONTENT_TYPE, "application/x-www-form-urlencoded") + .header(CONTENT_LENGTH, bs.len()) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + let resp = self.client.send(request).await?; + let body = resp.into_body().bytes().await?; + + let token: DropboxTokenResponse = + serde_json::from_slice(&body).map_err(new_json_deserialize_error)?; + + // Update signer after token refreshed. + + signer.access_token = token.access_token.clone(); + + // Refresh it 2 minutes earlier. + signer.expires_in = Utc::now() + chrono::Duration::seconds(token.expires_in as i64) + - chrono::Duration::seconds(120); + + let value = format!("Bearer {}", token.access_token) .parse() .expect("token must be valid header"); req.headers_mut().insert(header::AUTHORIZATION, value); @@ -171,6 +217,29 @@ impl DropboxCore { } } +#[derive(Clone)] +pub struct DropboxSigner { + pub client_id: String, + pub client_secret: String, + pub refresh_token: String, + + pub access_token: String, + pub expires_in: DateTime<Utc>, +} + +impl Default for DropboxSigner { + fn default() -> Self { + DropboxSigner { + refresh_token: "".to_string(), + client_id: String::new(), + client_secret: String::new(), + + access_token: "".to_string(), + expires_in: DateTime::<Utc>::MIN_UTC, + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct DropboxDownloadArgs { path: String, @@ -185,6 +254,18 @@ struct DropboxUploadArgs { strict_conflict: bool, } +impl Default for DropboxUploadArgs { + fn default() -> Self { + DropboxUploadArgs { + mode: "overwrite".to_string(), + path: "".to_string(), + mute: true, + autorename: false, + strict_conflict: false, + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct DropboxDeleteArgs { path: String, @@ -203,18 +284,6 @@ struct DropboxMetadataArgs { path: String, } -impl Default for DropboxUploadArgs { - fn default() -> Self { - DropboxUploadArgs { - mode: "overwrite".to_string(), - path: "".to_string(), - mute: true, - autorename: false, - strict_conflict: false, - } - } -} - impl Default for DropboxMetadataArgs { fn default() -> Self { DropboxMetadataArgs { @@ -225,3 +294,9 @@ impl Default for DropboxMetadataArgs { } } } + +#[derive(Clone, Deserialize)] +struct DropboxTokenResponse { + access_token: String, + expires_in: usize, +}
