This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch dropbox-batch-delete
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit ae1e28078da8c834e26e0653cb75e4ce572bfd73
Author: suyanhanx <[email protected]>
AuthorDate: Fri Jul 7 17:16:13 2023 +0800

    impl batch_delete
    
    Signed-off-by: suyanhanx <[email protected]>
---
 .env.example                         |   3 +
 core/src/services/dropbox/backend.rs | 137 +++++++++++++++++++++++++++++++++++
 core/src/services/dropbox/builder.rs |   2 +-
 core/src/services/dropbox/core.rs    |  60 +++++++++++++++
 core/src/services/dropbox/error.rs   |   2 +-
 5 files changed, 202 insertions(+), 2 deletions(-)

diff --git a/.env.example b/.env.example
index 2e71f7b88..e3c54bc69 100644
--- a/.env.example
+++ b/.env.example
@@ -127,3 +127,6 @@ OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
 OPENDAL_DROPBOX_TEST=false
 OPENDAL_DROPBOX_ROOT=/tmp/opendal/
 OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
+OPENDAL_DROPBOX_REFRESH_TOKEN=<refresh_token>
+OPENDAL_DROPBOX_CLIENT_ID=<client_id>
+OPENDAL_DROPBOX_CLIENT_SECRET=<client_secret>
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 4af8f01b6..b0513e21e 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -26,6 +26,7 @@ use super::core::DropboxCore;
 use super::error::parse_error;
 use super::writer::DropboxWriter;
 use crate::raw::*;
+use crate::services::dropbox::error::DropboxErrorResponse;
 use crate::*;
 
 #[derive(Clone, Debug)]
@@ -58,6 +59,9 @@ impl Accessor for DropboxBackend {
 
                 delete: true,
 
+                batch: true,
+                batch_delete: true,
+
                 ..Default::default()
             });
         ma
@@ -162,6 +166,121 @@ impl Accessor for DropboxBackend {
             _ => Err(parse_error(resp).await?),
         }
     }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        let ops = args.into_operation();
+        if ops.len() > 1000 {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "dropbox services only allow delete up to 1000 keys at once",
+            )
+            .with_context("length", ops.len().to_string()));
+        }
+
+        let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();
+
+        let resp = self.core.dropbox_delete_batch(paths).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let (_parts, body) = resp.into_parts();
+                let bs = body.bytes().await?;
+                let decoded_response = 
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                match decoded_response.tag.as_str() {
+                    "complete" => {
+                        let entries = 
decoded_response.entries.unwrap_or_default();
+                        let results = 
handle_batch_delete_complete_result(entries);
+                        Ok(RpBatch::new(results))
+                    }
+                    "async_job_id" => {
+                        let job_id = decoded_response
+                            .async_job_id
+                            .expect("async_job_id should be present");
+                        loop {
+                            let resp = 
self.core.dropbox_delete_batch_check(job_id.clone()).await?;
+                            let status = resp.status();
+                            match status {
+                                StatusCode::OK => {
+                                    let bs = resp.into_body().bytes().await?;
+
+                                    let decoded_response =
+                                        
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+                                            
.map_err(new_json_deserialize_error)?;
+                                    match decoded_response.tag.as_str() {
+                                        "in_progress" => {
+                                            continue;
+                                        }
+                                        "complete" => {
+                                            let entries =
+                                                
decoded_response.entries.unwrap_or_default();
+                                            let results =
+                                                
handle_batch_delete_complete_result(entries);
+                                            return Ok(RpBatch::new(results));
+                                        }
+                                        _ => {
+                                            return Err(Error::new(
+                                                ErrorKind::Unexpected,
+                                                &format!("delete batch check 
failed with unexpected tag {}", decoded_response.tag),
+                                            ));
+                                        }
+                                    }
+                                }
+                                _ => break Err(parse_error(resp).await?),
+                            }
+                        }
+                    }
+                    _ => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!(
+                            "delete batch failed with unexpected tag {}",
+                            decoded_response.tag
+                        ),
+                    )),
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+pub fn handle_batch_delete_complete_result(
+    entries: Vec<DropboxDeleteBatchResponseEntry>,
+) -> Vec<(String, Result<BatchedReply>)> {
+    let mut results = Vec::with_capacity(entries.len());
+    for entry in entries {
+        let result = match entry.tag.as_str() {
+            // Only success response has metadata and then path,
+            // so we cannot tell which path failed.
+            "success" => {
+                let path = entry
+                    .metadata
+                    .expect("metadata should be present")
+                    .path_display;
+                (path, Ok(RpDelete::default().into()))
+            }
+            "failure" => {
+                let error = entry.error.expect("error should be present");
+                let err = Error::new(
+                    ErrorKind::Unexpected,
+                    &format!("delete failed with error {}", 
error.error_summary),
+                );
+                ("".to_string(), Err(err))
+            }
+            _ => (
+                "".to_string(),
+                Err(Error::new(
+                    ErrorKind::Unexpected,
+                    &format!("delete failed with unexpected tag {}", 
entry.tag),
+                )),
+            ),
+        };
+        results.push(result);
+    }
+    results
 }
 
 #[derive(Default, Debug, Deserialize)]
@@ -217,3 +336,21 @@ pub struct DropboxMetadataSharingInfo {
     pub traverse_only: Option<bool>,
     pub no_access: Option<bool>,
 }
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponse {
+    #[serde(rename(deserialize = ".tag"))]
+    pub tag: String,
+    pub async_job_id: Option<String>,
+    pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponseEntry {
+    #[serde(rename(deserialize = ".tag"))]
+    pub tag: String,
+    pub metadata: Option<DropboxMetadataResponse>,
+    pub error: Option<DropboxErrorResponse>,
+}
diff --git a/core/src/services/dropbox/builder.rs 
b/core/src/services/dropbox/builder.rs
index bbe1470f8..d31b26a75 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -80,7 +80,7 @@ use crate::*;
 ///
 /// ## Via Builder
 ///
-/// ```
+/// ```rust
 /// use anyhow::Result;
 /// use opendal::raw::OpWrite;
 /// use opendal::services::Dropbox;
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index 07d67ecaa..41f33225b 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -130,6 +130,51 @@ impl DropboxCore {
         self.client.send(request).await
     }
 
+    pub async fn dropbox_delete_batch(
+        &self,
+        paths: Vec<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = 
"https://api.dropboxapi.com/2/files/delete_batch".to_string();
+        let args = DropboxDeleteBatchArgs {
+            entries: paths
+                .into_iter()
+                .map(|path| DropboxDeleteBatchEntry {
+                    path: self.build_path(&path),
+                })
+                .collect(),
+        };
+
+        let bs = 
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+        let mut request = Request::post(&url)
+            .header(CONTENT_TYPE, "application/json")
+            .header(CONTENT_LENGTH, bs.len())
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut request).await?;
+        self.client.send(request).await
+    }
+
+    pub async fn dropbox_delete_batch_check(
+        &self,
+        async_job_id: String,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = 
"https://api.dropboxapi.com/2/files/delete_batch/check".to_string();
+        let args = DropboxDeleteBatchCheckArgs { async_job_id };
+
+        let bs = 
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+        let mut request = Request::post(&url)
+            .header(CONTENT_TYPE, "application/json")
+            .header(CONTENT_LENGTH, bs.len())
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut request).await?;
+        self.client.send(request).await
+    }
+
     pub async fn dropbox_create_folder(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let url = 
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
         let args = DropboxCreateFolderArgs {
@@ -271,6 +316,21 @@ struct DropboxDeleteArgs {
     path: String,
 }
 
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchEntry {
+    path: String,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchArgs {
+    entries: Vec<DropboxDeleteBatchEntry>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchCheckArgs {
+    async_job_id: String,
+}
+
 #[derive(Clone, Debug, Deserialize, Serialize)]
 struct DropboxCreateFolderArgs {
     path: String,
diff --git a/core/src/services/dropbox/error.rs 
b/core/src/services/dropbox/error.rs
index 57e3c177b..8a00176bc 100644
--- a/core/src/services/dropbox/error.rs
+++ b/core/src/services/dropbox/error.rs
@@ -28,7 +28,7 @@ use crate::Result;
 #[derive(Default, Debug, Deserialize)]
 #[serde(default)]
 pub struct DropboxErrorResponse {
-    error_summary: String,
+    pub error_summary: String,
 }
 
 /// Parse error response into Error.

Reply via email to