This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch dropbox-batch-delete in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit ae1e28078da8c834e26e0653cb75e4ce572bfd73 Author: suyanhanx <[email protected]> AuthorDate: Fri Jul 7 17:16:13 2023 +0800 impl batch_delete Signed-off-by: suyanhanx <[email protected]> --- .env.example | 3 + core/src/services/dropbox/backend.rs | 137 +++++++++++++++++++++++++++++++++++ core/src/services/dropbox/builder.rs | 2 +- core/src/services/dropbox/core.rs | 60 +++++++++++++++ core/src/services/dropbox/error.rs | 2 +- 5 files changed, 202 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index 2e71f7b88..e3c54bc69 100644 --- a/.env.example +++ b/.env.example @@ -127,3 +127,6 @@ OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/ OPENDAL_DROPBOX_TEST=false OPENDAL_DROPBOX_ROOT=/tmp/opendal/ OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token> +OPENDAL_DROPBOX_REFRESH_TOKEN=<refresh_token> +OPENDAL_DROPBOX_CLIENT_ID=<client_id> +OPENDAL_DROPBOX_CLIENT_SECRET=<client_secret> diff --git a/core/src/services/dropbox/backend.rs b/core/src/services/dropbox/backend.rs index 4af8f01b6..b0513e21e 100644 --- a/core/src/services/dropbox/backend.rs +++ b/core/src/services/dropbox/backend.rs @@ -26,6 +26,7 @@ use super::core::DropboxCore; use super::error::parse_error; use super::writer::DropboxWriter; use crate::raw::*; +use crate::services::dropbox::error::DropboxErrorResponse; use crate::*; #[derive(Clone, Debug)] @@ -58,6 +59,9 @@ impl Accessor for DropboxBackend { delete: true, + batch: true, + batch_delete: true, + ..Default::default() }); ma @@ -162,6 +166,121 @@ impl Accessor for DropboxBackend { _ => Err(parse_error(resp).await?), } } + + async fn batch(&self, args: OpBatch) -> Result<RpBatch> { + let ops = args.into_operation(); + if ops.len() > 1000 { + return Err(Error::new( + ErrorKind::Unsupported, + "dropbox services only allow delete up to 1000 keys at once", + ) + .with_context("length", ops.len().to_string())); + } + + let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>(); + + let resp = self.core.dropbox_delete_batch(paths).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + let (_parts, body) = resp.into_parts(); + let bs = body.bytes().await?; + let decoded_response = serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs) + .map_err(new_json_deserialize_error)?; + + match decoded_response.tag.as_str() { + "complete" => { + let entries = decoded_response.entries.unwrap_or_default(); + let results = handle_batch_delete_complete_result(entries); + Ok(RpBatch::new(results)) + } + "async_job_id" => { + let job_id = decoded_response + .async_job_id + .expect("async_job_id should be present"); + loop { + let resp = self.core.dropbox_delete_batch_check(job_id.clone()).await?; + let status = resp.status(); + match status { + StatusCode::OK => { + let bs = resp.into_body().bytes().await?; + + let decoded_response = + serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs) + .map_err(new_json_deserialize_error)?; + match decoded_response.tag.as_str() { + "in_progress" => { + continue; + } + "complete" => { + let entries = + decoded_response.entries.unwrap_or_default(); + let results = + handle_batch_delete_complete_result(entries); + return Ok(RpBatch::new(results)); + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + &format!("delete batch check failed with unexpected tag {}", decoded_response.tag), + )); + } + } + } + _ => break Err(parse_error(resp).await?), + } + } + } + _ => Err(Error::new( + ErrorKind::Unexpected, + &format!( + "delete batch failed with unexpected tag {}", + decoded_response.tag + ), + )), + } + } + _ => Err(parse_error(resp).await?), + } + } +} + +pub fn handle_batch_delete_complete_result( + entries: Vec<DropboxDeleteBatchResponseEntry>, +) -> Vec<(String, Result<BatchedReply>)> { + let mut results = Vec::with_capacity(entries.len()); + for entry in entries { + let result = match entry.tag.as_str() { + // Only success response has metadata and then path, + // so we cannot tell which path failed. + "success" => { + let path = entry + .metadata + .expect("metadata should be present") + .path_display; + (path, Ok(RpDelete::default().into())) + } + "failure" => { + let error = entry.error.expect("error should be present"); + let err = Error::new( + ErrorKind::Unexpected, + &format!("delete failed with error {}", error.error_summary), + ); + ("".to_string(), Err(err)) + } + _ => ( + "".to_string(), + Err(Error::new( + ErrorKind::Unexpected, + &format!("delete failed with unexpected tag {}", entry.tag), + )), + ), + }; + results.push(result); + } + results } #[derive(Default, Debug, Deserialize)] @@ -217,3 +336,21 @@ pub struct DropboxMetadataSharingInfo { pub traverse_only: Option<bool>, pub no_access: Option<bool>, } + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxDeleteBatchResponse { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub async_job_id: Option<String>, + pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>, +} + +#[derive(Default, Debug, Deserialize)] +#[serde(default)] +pub struct DropboxDeleteBatchResponseEntry { + #[serde(rename(deserialize = ".tag"))] + pub tag: String, + pub metadata: Option<DropboxMetadataResponse>, + pub error: Option<DropboxErrorResponse>, +} diff --git a/core/src/services/dropbox/builder.rs b/core/src/services/dropbox/builder.rs index bbe1470f8..d31b26a75 100644 --- a/core/src/services/dropbox/builder.rs +++ b/core/src/services/dropbox/builder.rs @@ -80,7 +80,7 @@ use crate::*; /// /// ## Via Builder /// -/// ``` +/// ```rust /// use anyhow::Result; /// use opendal::raw::OpWrite; /// use opendal::services::Dropbox; diff --git a/core/src/services/dropbox/core.rs b/core/src/services/dropbox/core.rs index 07d67ecaa..41f33225b 100644 --- a/core/src/services/dropbox/core.rs +++ b/core/src/services/dropbox/core.rs @@ -130,6 +130,51 @@ impl DropboxCore { self.client.send(request).await } + pub async fn dropbox_delete_batch( + &self, + paths: Vec<String>, + ) -> Result<Response<IncomingAsyncBody>> { + let url = "https://api.dropboxapi.com/2/files/delete_batch".to_string(); + let args = DropboxDeleteBatchArgs { + entries: paths + .into_iter() + .map(|path| DropboxDeleteBatchEntry { + path: self.build_path(&path), + }) + .collect(), + }; + + let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); + + let mut request = Request::post(&url) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; + self.client.send(request).await + } + + pub async fn dropbox_delete_batch_check( + &self, + async_job_id: String, + ) -> Result<Response<IncomingAsyncBody>> { + let url = "https://api.dropboxapi.com/2/files/delete_batch/check".to_string(); + let args = DropboxDeleteBatchCheckArgs { async_job_id }; + + let bs = Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?); + + let mut request = Request::post(&url) + .header(CONTENT_TYPE, "application/json") + .header(CONTENT_LENGTH, bs.len()) + .body(AsyncBody::Bytes(bs)) + .map_err(new_request_build_error)?; + + self.sign(&mut request).await?; + self.client.send(request).await + } + pub async fn dropbox_create_folder(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { let url = "https://api.dropboxapi.com/2/files/create_folder_v2".to_string(); let args = DropboxCreateFolderArgs { @@ -271,6 +316,21 @@ struct DropboxDeleteArgs { path: String, } +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxDeleteBatchEntry { + path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxDeleteBatchArgs { + entries: Vec<DropboxDeleteBatchEntry>, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +struct DropboxDeleteBatchCheckArgs { + async_job_id: String, +} + #[derive(Clone, Debug, Deserialize, Serialize)] struct DropboxCreateFolderArgs { path: String, diff --git a/core/src/services/dropbox/error.rs b/core/src/services/dropbox/error.rs index 57e3c177b..8a00176bc 100644 --- a/core/src/services/dropbox/error.rs +++ b/core/src/services/dropbox/error.rs @@ -28,7 +28,7 @@ use crate::Result; #[derive(Default, Debug, Deserialize)] #[serde(default)] pub struct DropboxErrorResponse { - error_summary: String, + pub error_summary: String, } /// Parse error response into Error.
