This is an automated email from the ASF dual-hosted git repository.
suyanhanx pushed a commit to branch dropbox-batch-delete
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/dropbox-batch-delete by this
push:
new f162a70be use backon to retry
f162a70be is described below
commit f162a70be2df8c5bbdc10d368e3e8606d40bef28
Author: suyanhanx <[email protected]>
AuthorDate: Sat Jul 8 16:29:02 2023 +0800
use backon to retry
Signed-off-by: suyanhanx <[email protected]>
---
core/src/services/dropbox/backend.rs | 87 ++++++++----------------------------
core/src/services/dropbox/core.rs | 83 +++++++++++++++++++++++++++++++++-
2 files changed, 99 insertions(+), 71 deletions(-)
diff --git a/core/src/services/dropbox/backend.rs
b/core/src/services/dropbox/backend.rs
index b0513e21e..5567285d0 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -17,9 +17,13 @@
use std::fmt::Debug;
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
+use backon::ExponentialBuilder;
+use backon::Retryable;
use http::StatusCode;
+use once_cell::sync::Lazy;
use serde::Deserialize;
use super::core::DropboxCore;
@@ -29,6 +33,13 @@ use crate::raw::*;
use crate::services::dropbox::error::DropboxErrorResponse;
use crate::*;
+static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
+ ExponentialBuilder::default()
+ .with_max_delay(Duration::from_secs(10))
+ .with_max_times(10)
+ .with_jitter()
+});
+
#[derive(Clone, Debug)]
pub struct DropboxBackend {
pub core: Arc<DropboxCore>,
@@ -193,45 +204,19 @@ impl Accessor for DropboxBackend {
match decoded_response.tag.as_str() {
"complete" => {
let entries =
decoded_response.entries.unwrap_or_default();
- let results =
handle_batch_delete_complete_result(entries);
+ let results =
self.core.handle_batch_delete_complete_result(entries);
Ok(RpBatch::new(results))
}
"async_job_id" => {
let job_id = decoded_response
.async_job_id
.expect("async_job_id should be present");
- loop {
- let resp =
self.core.dropbox_delete_batch_check(job_id.clone()).await?;
- let status = resp.status();
- match status {
- StatusCode::OK => {
- let bs = resp.into_body().bytes().await?;
-
- let decoded_response =
-
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
-
.map_err(new_json_deserialize_error)?;
- match decoded_response.tag.as_str() {
- "in_progress" => {
- continue;
- }
- "complete" => {
- let entries =
-
decoded_response.entries.unwrap_or_default();
- let results =
-
handle_batch_delete_complete_result(entries);
- return Ok(RpBatch::new(results));
- }
- _ => {
- return Err(Error::new(
- ErrorKind::Unexpected,
- &format!("delete batch check
failed with unexpected tag {}", decoded_response.tag),
- ));
- }
- }
- }
- _ => break Err(parse_error(resp).await?),
- }
- }
+ let res = { ||
self.core.dropbox_delete_batch_check(job_id.clone()) }
+ .retry(&*BACKOFF)
+ .when(|e| e.is_temporary())
+ .await?;
+
+ Ok(res)
}
_ => Err(Error::new(
ErrorKind::Unexpected,
@@ -247,42 +232,6 @@ impl Accessor for DropboxBackend {
}
}
-pub fn handle_batch_delete_complete_result(
- entries: Vec<DropboxDeleteBatchResponseEntry>,
-) -> Vec<(String, Result<BatchedReply>)> {
- let mut results = Vec::with_capacity(entries.len());
- for entry in entries {
- let result = match entry.tag.as_str() {
- // Only success response has metadata and then path,
- // so we cannot tell which path failed.
- "success" => {
- let path = entry
- .metadata
- .expect("metadata should be present")
- .path_display;
- (path, Ok(RpDelete::default().into()))
- }
- "failure" => {
- let error = entry.error.expect("error should be present");
- let err = Error::new(
- ErrorKind::Unexpected,
- &format!("delete failed with error {}",
error.error_summary),
- );
- ("".to_string(), Err(err))
- }
- _ => (
- "".to_string(),
- Err(Error::new(
- ErrorKind::Unexpected,
- &format!("delete failed with unexpected tag {}",
entry.tag),
- )),
- ),
- };
- results.push(result);
- }
- results
-}
-
#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxMetadataResponse {
diff --git a/core/src/services/dropbox/core.rs
b/core/src/services/dropbox/core.rs
index 41f33225b..31a21cf54 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -28,17 +28,26 @@ use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::Request;
use http::Response;
+use http::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;
-use crate::raw::build_rooted_abs_path;
use crate::raw::new_json_deserialize_error;
use crate::raw::new_json_serialize_error;
use crate::raw::new_request_build_error;
use crate::raw::AsyncBody;
use crate::raw::HttpClient;
use crate::raw::IncomingAsyncBody;
+use crate::raw::build_rooted_abs_path;
+use crate::raw::BatchedReply;
+use crate::raw::RpBatch;
+use crate::raw::RpDelete;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponse;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponseEntry;
+use crate::services::dropbox::error::parse_error;
+use crate::types::Error;
+use crate::types::ErrorKind;
use crate::types::Result;
pub struct DropboxCore {
@@ -156,7 +165,7 @@ impl DropboxCore {
self.client.send(request).await
}
- pub async fn dropbox_delete_batch_check(
+ pub async fn dropbox_delete_batch_check_request(
&self,
async_job_id: String,
) -> Result<Response<IncomingAsyncBody>> {
@@ -175,6 +184,39 @@ impl DropboxCore {
self.client.send(request).await
}
+ pub async fn dropbox_delete_batch_check(&self, job_id: String) ->
Result<RpBatch> {
+ let resp =
self.dropbox_delete_batch_check_request(job_id.clone()).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let decoded_response =
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+ .map_err(new_json_deserialize_error)?;
+ match decoded_response.tag.as_str() {
+ "in_progress" => Err(Error::new(
+ ErrorKind::Unexpected,
+ "delete batch job still in progress",
+ )
+ .set_temporary()),
+ "complete" => {
+ let entries =
decoded_response.entries.unwrap_or_default();
+ let results =
self.handle_batch_delete_complete_result(entries);
+ Ok(RpBatch::new(results))
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!(
+ "delete batch check failed with unexpected tag {}",
+ decoded_response.tag
+ ),
+ )),
+ }
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
pub async fn dropbox_create_folder(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let url =
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
let args = DropboxCreateFolderArgs {
@@ -260,6 +302,43 @@ impl DropboxCore {
Ok(())
}
+
+ pub fn handle_batch_delete_complete_result(
+ &self,
+ entries: Vec<DropboxDeleteBatchResponseEntry>,
+ ) -> Vec<(String, Result<BatchedReply>)> {
+ let mut results = Vec::with_capacity(entries.len());
+ for entry in entries {
+ let result = match entry.tag.as_str() {
+ // Only success response has metadata and then path,
+ // so we cannot tell which path failed.
+ "success" => {
+ let path = entry
+ .metadata
+ .expect("metadata should be present")
+ .path_display;
+ (path, Ok(RpDelete::default().into()))
+ }
+ "failure" => {
+ let error = entry.error.expect("error should be present");
+ let err = Error::new(
+ ErrorKind::Unexpected,
+ &format!("delete failed with error {}",
error.error_summary),
+ );
+ ("".to_string(), Err(err))
+ }
+ _ => (
+ "".to_string(),
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!("delete failed with unexpected tag {}",
entry.tag),
+ )),
+ ),
+ };
+ results.push(result);
+ }
+ results
+ }
}
#[derive(Clone)]