This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch dropbox-batch-delete
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/dropbox-batch-delete by this 
push:
     new f162a70be use backon to retry
f162a70be is described below

commit f162a70be2df8c5bbdc10d368e3e8606d40bef28
Author: suyanhanx <[email protected]>
AuthorDate: Sat Jul 8 16:29:02 2023 +0800

    use backon to retry
    
    Signed-off-by: suyanhanx <[email protected]>
---
 core/src/services/dropbox/backend.rs | 87 ++++++++----------------------------
 core/src/services/dropbox/core.rs    | 83 +++++++++++++++++++++++++++++++++-
 2 files changed, 99 insertions(+), 71 deletions(-)

diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index b0513e21e..5567285d0 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -17,9 +17,13 @@
 
 use std::fmt::Debug;
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
+use backon::ExponentialBuilder;
+use backon::Retryable;
 use http::StatusCode;
+use once_cell::sync::Lazy;
 use serde::Deserialize;
 
 use super::core::DropboxCore;
@@ -29,6 +33,13 @@ use crate::raw::*;
 use crate::services::dropbox::error::DropboxErrorResponse;
 use crate::*;
 
+static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
+    ExponentialBuilder::default()
+        .with_max_delay(Duration::from_secs(10))
+        .with_max_times(10)
+        .with_jitter()
+});
+
 #[derive(Clone, Debug)]
 pub struct DropboxBackend {
     pub core: Arc<DropboxCore>,
@@ -193,45 +204,19 @@ impl Accessor for DropboxBackend {
                 match decoded_response.tag.as_str() {
                     "complete" => {
                         let entries = 
decoded_response.entries.unwrap_or_default();
-                        let results = 
handle_batch_delete_complete_result(entries);
+                        let results = 
self.core.handle_batch_delete_complete_result(entries);
                         Ok(RpBatch::new(results))
                     }
                     "async_job_id" => {
                         let job_id = decoded_response
                             .async_job_id
                             .expect("async_job_id should be present");
-                        loop {
-                            let resp = 
self.core.dropbox_delete_batch_check(job_id.clone()).await?;
-                            let status = resp.status();
-                            match status {
-                                StatusCode::OK => {
-                                    let bs = resp.into_body().bytes().await?;
-
-                                    let decoded_response =
-                                        
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
-                                            
.map_err(new_json_deserialize_error)?;
-                                    match decoded_response.tag.as_str() {
-                                        "in_progress" => {
-                                            continue;
-                                        }
-                                        "complete" => {
-                                            let entries =
-                                                
decoded_response.entries.unwrap_or_default();
-                                            let results =
-                                                
handle_batch_delete_complete_result(entries);
-                                            return Ok(RpBatch::new(results));
-                                        }
-                                        _ => {
-                                            return Err(Error::new(
-                                                ErrorKind::Unexpected,
-                                                &format!("delete batch check 
failed with unexpected tag {}", decoded_response.tag),
-                                            ));
-                                        }
-                                    }
-                                }
-                                _ => break Err(parse_error(resp).await?),
-                            }
-                        }
+                        let res = { || 
self.core.dropbox_delete_batch_check(job_id.clone()) }
+                            .retry(&*BACKOFF)
+                            .when(|e| e.is_temporary())
+                            .await?;
+
+                        Ok(res)
                     }
                     _ => Err(Error::new(
                         ErrorKind::Unexpected,
@@ -247,42 +232,6 @@ impl Accessor for DropboxBackend {
     }
 }
 
-pub fn handle_batch_delete_complete_result(
-    entries: Vec<DropboxDeleteBatchResponseEntry>,
-) -> Vec<(String, Result<BatchedReply>)> {
-    let mut results = Vec::with_capacity(entries.len());
-    for entry in entries {
-        let result = match entry.tag.as_str() {
-            // Only success response has metadata and then path,
-            // so we cannot tell which path failed.
-            "success" => {
-                let path = entry
-                    .metadata
-                    .expect("metadata should be present")
-                    .path_display;
-                (path, Ok(RpDelete::default().into()))
-            }
-            "failure" => {
-                let error = entry.error.expect("error should be present");
-                let err = Error::new(
-                    ErrorKind::Unexpected,
-                    &format!("delete failed with error {}", 
error.error_summary),
-                );
-                ("".to_string(), Err(err))
-            }
-            _ => (
-                "".to_string(),
-                Err(Error::new(
-                    ErrorKind::Unexpected,
-                    &format!("delete failed with unexpected tag {}", 
entry.tag),
-                )),
-            ),
-        };
-        results.push(result);
-    }
-    results
-}
-
 #[derive(Default, Debug, Deserialize)]
 #[serde(default)]
 pub struct DropboxMetadataResponse {
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index 41f33225b..31a21cf54 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -28,17 +28,26 @@ use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
 use http::Request;
 use http::Response;
+use http::StatusCode;
 use serde::Deserialize;
 use serde::Serialize;
 use tokio::sync::Mutex;
 
-use crate::raw::build_rooted_abs_path;
 use crate::raw::new_json_deserialize_error;
 use crate::raw::new_json_serialize_error;
 use crate::raw::new_request_build_error;
 use crate::raw::AsyncBody;
 use crate::raw::HttpClient;
 use crate::raw::IncomingAsyncBody;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::BatchedReply;
+use crate::raw::RpBatch;
+use crate::raw::RpDelete;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponse;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponseEntry;
+use crate::services::dropbox::error::parse_error;
+use crate::types::Error;
+use crate::types::ErrorKind;
 use crate::types::Result;
 
 pub struct DropboxCore {
@@ -156,7 +165,7 @@ impl DropboxCore {
         self.client.send(request).await
     }
 
-    pub async fn dropbox_delete_batch_check(
+    pub async fn dropbox_delete_batch_check_request(
         &self,
         async_job_id: String,
     ) -> Result<Response<IncomingAsyncBody>> {
@@ -175,6 +184,39 @@ impl DropboxCore {
         self.client.send(request).await
     }
 
+    pub async fn dropbox_delete_batch_check(&self, job_id: String) -> 
Result<RpBatch> {
+        let resp = 
self.dropbox_delete_batch_check_request(job_id.clone()).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let decoded_response = 
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+                match decoded_response.tag.as_str() {
+                    "in_progress" => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "delete batch job still in progress",
+                    )
+                    .set_temporary()),
+                    "complete" => {
+                        let entries = 
decoded_response.entries.unwrap_or_default();
+                        let results = 
self.handle_batch_delete_complete_result(entries);
+                        Ok(RpBatch::new(results))
+                    }
+                    _ => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!(
+                            "delete batch check failed with unexpected tag {}",
+                            decoded_response.tag
+                        ),
+                    )),
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     pub async fn dropbox_create_folder(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let url = 
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
         let args = DropboxCreateFolderArgs {
@@ -260,6 +302,43 @@ impl DropboxCore {
 
         Ok(())
     }
+
+    pub fn handle_batch_delete_complete_result(
+        &self,
+        entries: Vec<DropboxDeleteBatchResponseEntry>,
+    ) -> Vec<(String, Result<BatchedReply>)> {
+        let mut results = Vec::with_capacity(entries.len());
+        for entry in entries {
+            let result = match entry.tag.as_str() {
+                // Only success response has metadata and then path,
+                // so we cannot tell which path failed.
+                "success" => {
+                    let path = entry
+                        .metadata
+                        .expect("metadata should be present")
+                        .path_display;
+                    (path, Ok(RpDelete::default().into()))
+                }
+                "failure" => {
+                    let error = entry.error.expect("error should be present");
+                    let err = Error::new(
+                        ErrorKind::Unexpected,
+                        &format!("delete failed with error {}", 
error.error_summary),
+                    );
+                    ("".to_string(), Err(err))
+                }
+                _ => (
+                    "".to_string(),
+                    Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!("delete failed with unexpected tag {}", 
entry.tag),
+                    )),
+                ),
+            };
+            results.push(result);
+        }
+        results
+    }
 }
 
 #[derive(Clone)]

Reply via email to