This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 45f042504 feat(service/dropbox): impl batch delete (#2606)
45f042504 is described below
commit 45f042504146ef0871489007103032a90d7a8f59
Author: Suyan <[email protected]>
AuthorDate: Sat Jul 8 19:40:22 2023 +0800
feat(service/dropbox): impl batch delete (#2606)
* refresh token flow
Signed-off-by: suyanhanx <[email protected]>
* impl batch_delete
Signed-off-by: suyanhanx <[email protected]>
* enable CI test
Signed-off-by: suyanhanx <[email protected]>
* add too_many_write_operations as retriable
Signed-off-by: suyanhanx <[email protected]>
* use backon to retry
Signed-off-by: suyanhanx <[email protected]>
* make fmt happy
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
.env.example | 3 +
.github/workflows/service_test_dropbox.yml | 66 +++++++++++++
core/src/services/dropbox/backend.rs | 86 +++++++++++++++++
core/src/services/dropbox/builder.rs | 49 +++++++---
core/src/services/dropbox/core.rs | 146 ++++++++++++++++++++++++++++-
core/src/services/dropbox/error.rs | 4 +-
6 files changed, 338 insertions(+), 16 deletions(-)
diff --git a/.env.example b/.env.example
index 2e71f7b88..e3c54bc69 100644
--- a/.env.example
+++ b/.env.example
@@ -127,3 +127,6 @@ OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
OPENDAL_DROPBOX_TEST=false
OPENDAL_DROPBOX_ROOT=/tmp/opendal/
OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
+OPENDAL_DROPBOX_REFRESH_TOKEN=<refresh_token>
+OPENDAL_DROPBOX_CLIENT_ID=<client_id>
+OPENDAL_DROPBOX_CLIENT_SECRET=<client_secret>
diff --git a/.github/workflows/service_test_dropbox.yml
b/.github/workflows/service_test_dropbox.yml
new file mode 100644
index 000000000..0bc02c895
--- /dev/null
+++ b/.github/workflows/service_test_dropbox.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Dropbox
+
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "core/src/**"
+ - "core/tests/**"
+ - "!core/src/docs/**"
+ - "!core/src/services/**"
+ - "core/src/services/dropbox/**"
+ - ".github/workflows/service_test_dropbox.yml"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ dropbox:
+ runs-on: ubuntu-latest
+ if: github.event_name == 'push' ||
!github.event.pull_request.head.repo.fork
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+
+ - name: Load secret
+ id: op-load-secret
+ uses: 1password/load-secrets-action@v1
+ with:
+ export-env: true
+ env:
+ OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+ OPENDAL_DROPBOX_TEST: op://services/dropbox/test
+ OPENDAL_DROPBOX_ROOT: op://services/dropbox/root
+ OPENDAL_DROPBOX_REFRESH_TOKEN: op://services/dropbox/refresh_token
+ OPENDAL_DROPBOX_CLIENT_ID: op://services/dropbox/client_id
+ OPENDAL_DROPBOX_CLIENT_SECRET: op://services/dropbox/client_secret
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ # It's easily for dropbox to trigger too_many_write_operations error.
+ # So we run tests one by one.
+ run: cargo test dropbox --features=services-dropbox -- --test-threads=1
diff --git a/core/src/services/dropbox/backend.rs
b/core/src/services/dropbox/backend.rs
index 4af8f01b6..5567285d0 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -17,17 +17,29 @@
use std::fmt::Debug;
use std::sync::Arc;
+use std::time::Duration;
use async_trait::async_trait;
+use backon::ExponentialBuilder;
+use backon::Retryable;
use http::StatusCode;
+use once_cell::sync::Lazy;
use serde::Deserialize;
use super::core::DropboxCore;
use super::error::parse_error;
use super::writer::DropboxWriter;
use crate::raw::*;
+use crate::services::dropbox::error::DropboxErrorResponse;
use crate::*;
+static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
+ ExponentialBuilder::default()
+ .with_max_delay(Duration::from_secs(10))
+ .with_max_times(10)
+ .with_jitter()
+});
+
#[derive(Clone, Debug)]
pub struct DropboxBackend {
pub core: Arc<DropboxCore>,
@@ -58,6 +70,9 @@ impl Accessor for DropboxBackend {
delete: true,
+ batch: true,
+ batch_delete: true,
+
..Default::default()
});
ma
@@ -162,6 +177,59 @@ impl Accessor for DropboxBackend {
_ => Err(parse_error(resp).await?),
}
}
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ let ops = args.into_operation();
+ if ops.len() > 1000 {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "dropbox services only allow delete up to 1000 keys at once",
+ )
+ .with_context("length", ops.len().to_string()));
+ }
+
+ let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();
+
+ let resp = self.core.dropbox_delete_batch(paths).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let (_parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+ let decoded_response =
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+ .map_err(new_json_deserialize_error)?;
+
+ match decoded_response.tag.as_str() {
+ "complete" => {
+ let entries =
decoded_response.entries.unwrap_or_default();
+ let results =
self.core.handle_batch_delete_complete_result(entries);
+ Ok(RpBatch::new(results))
+ }
+ "async_job_id" => {
+ let job_id = decoded_response
+ .async_job_id
+ .expect("async_job_id should be present");
+ let res = { ||
self.core.dropbox_delete_batch_check(job_id.clone()) }
+ .retry(&*BACKOFF)
+ .when(|e| e.is_temporary())
+ .await?;
+
+ Ok(res)
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!(
+ "delete batch failed with unexpected tag {}",
+ decoded_response.tag
+ ),
+ )),
+ }
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
}
#[derive(Default, Debug, Deserialize)]
@@ -217,3 +285,21 @@ pub struct DropboxMetadataSharingInfo {
pub traverse_only: Option<bool>,
pub no_access: Option<bool>,
}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponse {
+ #[serde(rename(deserialize = ".tag"))]
+ pub tag: String,
+ pub async_job_id: Option<String>,
+ pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponseEntry {
+ #[serde(rename(deserialize = ".tag"))]
+ pub tag: String,
+ pub metadata: Option<DropboxMetadataResponse>,
+ pub error: Option<DropboxErrorResponse>,
+}
diff --git a/core/src/services/dropbox/builder.rs
b/core/src/services/dropbox/builder.rs
index 2827d86a9..d31b26a75 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -49,16 +49,38 @@ use crate::*;
///
/// # Configuration
///
-/// - `access_token`: set the access_token for google drive api
-/// - `root`: Set the work directory for backend
+/// - `root`: Set the work directory for this backend.
///
-/// You can refer to [`DropboxBuilder`]'s docs for more information
+/// ## Credentials related
+///
+/// ### Just provide Access Token (Temporary)
+///
+/// - `access_token`: set the access_token for this backend.
+/// Please notice its expiration.
+///
+/// ### Or provide Client ID and Client Secret and refresh token (Long Term)
+///
+/// If you want to let OpenDAL to refresh the access token automatically,
+/// please provide the following fields:
+///
+/// - `refresh_token`: set the refresh_token for dropbox api
+/// - `client_id`: set the client_id for dropbox api
+/// - `client_secret`: set the client_secret for dropbox api
+///
+/// OpenDAL is a library, it cannot do the first step of OAuth2 for you.
+/// You need to get authorization code from user by calling Dropbox's
authorize url
+/// and exchange it for refresh token.
+///
+/// Please refer to [Dropbox OAuth2
Guide](https://www.dropbox.com/developers/reference/oauth-guide)
+/// for more information.
+///
+/// You can refer to [`DropboxBuilder`]'s docs for more information.
///
/// # Example
///
/// ## Via Builder
///
-/// ```
+/// ```rust
/// use anyhow::Result;
/// use opendal::raw::OpWrite;
/// use opendal::services::Dropbox;
@@ -66,8 +88,8 @@ use crate::*;
///
/// #[tokio::main]
/// async fn main() -> Result<()> {
-/// // create backend builder
/// let mut builder = Dropbox::default();
+/// builder.root("/test");
/// builder.access_token("<token>");
///
/// let op: Operator = Operator::new(builder)?.finish();
@@ -78,7 +100,9 @@ use crate::*;
#[derive(Default)]
pub struct DropboxBuilder {
root: Option<String>,
+
access_token: Option<String>,
+
refresh_token: Option<String>,
client_id: Option<String>,
client_secret: Option<String>,
@@ -105,15 +129,16 @@ impl DropboxBuilder {
///
/// You can get the access token from [Dropbox App
Console](https://www.dropbox.com/developers/apps)
///
- /// NOTE: this token will be expired in 4 hours. If you are trying to use
dropbox services in a long time, please set a refresh_token instead.
+ /// NOTE: this token will be expired in 4 hours.
+ /// If you are trying to use the Dropbox service in a long time, please
set a refresh_token instead.
pub fn access_token(&mut self, access_token: &str) -> &mut Self {
self.access_token = Some(access_token.to_string());
self
}
- /// Refersh token is used for long term access to the Dropbox API.
+ /// Refresh token is used for long term access to the Dropbox API.
///
- /// You can get the refresh token via OAuth2.0 Flow of dropbox.
+ /// You can get the refresh token via OAuth 2.0 Flow of Dropbox.
///
/// OpenDAL will use this refresh token to get a new access token when the
old one is expired.
pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self {
@@ -121,17 +146,17 @@ impl DropboxBuilder {
self
}
- /// Set the client id for dropbox.
+ /// Set the client id for Dropbox.
///
- /// This is required for OAuth2.0 Flow with refresh token.
+ /// This is required for OAuth 2.0 Flow to refresh the access token.
pub fn client_id(&mut self, client_id: &str) -> &mut Self {
self.client_id = Some(client_id.to_string());
self
}
- /// Set the client secret for dropbox.
+ /// Set the client secret for Dropbox.
///
- /// This is required for OAuth2.0 Flow with refresh token.
+ /// This is required for OAuth 2.0 Flow with refresh the access token.
pub fn client_secret(&mut self, client_secret: &str) -> &mut Self {
self.client_secret = Some(client_secret.to_string());
self
diff --git a/core/src/services/dropbox/core.rs
b/core/src/services/dropbox/core.rs
index bbb2e5208..4f5be56bf 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -28,6 +28,7 @@ use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::Request;
use http::Response;
+use http::StatusCode;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;
@@ -37,8 +38,16 @@ use crate::raw::new_json_deserialize_error;
use crate::raw::new_json_serialize_error;
use crate::raw::new_request_build_error;
use crate::raw::AsyncBody;
+use crate::raw::BatchedReply;
use crate::raw::HttpClient;
use crate::raw::IncomingAsyncBody;
+use crate::raw::RpBatch;
+use crate::raw::RpDelete;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponse;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponseEntry;
+use crate::services::dropbox::error::parse_error;
+use crate::types::Error;
+use crate::types::ErrorKind;
use crate::types::Result;
pub struct DropboxCore {
@@ -130,6 +139,86 @@ impl DropboxCore {
self.client.send(request).await
}
+ pub async fn dropbox_delete_batch(
+ &self,
+ paths: Vec<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url =
"https://api.dropboxapi.com/2/files/delete_batch".to_string();
+ let args = DropboxDeleteBatchArgs {
+ entries: paths
+ .into_iter()
+ .map(|path| DropboxDeleteBatchEntry {
+ path: self.build_path(&path),
+ })
+ .collect(),
+ };
+
+ let bs =
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+ let mut request = Request::post(&url)
+ .header(CONTENT_TYPE, "application/json")
+ .header(CONTENT_LENGTH, bs.len())
+ .body(AsyncBody::Bytes(bs))
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut request).await?;
+ self.client.send(request).await
+ }
+
+ pub async fn dropbox_delete_batch_check_request(
+ &self,
+ async_job_id: String,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url =
"https://api.dropboxapi.com/2/files/delete_batch/check".to_string();
+ let args = DropboxDeleteBatchCheckArgs { async_job_id };
+
+ let bs =
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+ let mut request = Request::post(&url)
+ .header(CONTENT_TYPE, "application/json")
+ .header(CONTENT_LENGTH, bs.len())
+ .body(AsyncBody::Bytes(bs))
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut request).await?;
+ self.client.send(request).await
+ }
+
+ pub async fn dropbox_delete_batch_check(&self, job_id: String) ->
Result<RpBatch> {
+ let resp = self
+ .dropbox_delete_batch_check_request(job_id.clone())
+ .await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let decoded_response =
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+ .map_err(new_json_deserialize_error)?;
+ match decoded_response.tag.as_str() {
+ "in_progress" => Err(Error::new(
+ ErrorKind::Unexpected,
+ "delete batch job still in progress",
+ )
+ .set_temporary()),
+ "complete" => {
+ let entries =
decoded_response.entries.unwrap_or_default();
+ let results =
self.handle_batch_delete_complete_result(entries);
+ Ok(RpBatch::new(results))
+ }
+ _ => Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!(
+ "delete batch check failed with unexpected tag {}",
+ decoded_response.tag
+ ),
+ )),
+ }
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
pub async fn dropbox_create_folder(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let url =
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
let args = DropboxCreateFolderArgs {
@@ -175,7 +264,7 @@ impl DropboxCore {
if !signer.access_token.is_empty() && signer.expires_in > Utc::now() {
let value = format!("Bearer {}", signer.access_token)
.parse()
- .expect("token must be valid header");
+ .expect("token must be valid header value");
req.headers_mut().insert(header::AUTHORIZATION, value);
return Ok(());
}
@@ -202,7 +291,6 @@ impl DropboxCore {
serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
// Update signer after token refreshed.
-
signer.access_token = token.access_token.clone();
// Refresh it 2 minutes earlier.
@@ -211,11 +299,48 @@ impl DropboxCore {
let value = format!("Bearer {}", token.access_token)
.parse()
- .expect("token must be valid header");
+ .expect("token must be valid header value");
req.headers_mut().insert(header::AUTHORIZATION, value);
Ok(())
}
+
+ pub fn handle_batch_delete_complete_result(
+ &self,
+ entries: Vec<DropboxDeleteBatchResponseEntry>,
+ ) -> Vec<(String, Result<BatchedReply>)> {
+ let mut results = Vec::with_capacity(entries.len());
+ for entry in entries {
+ let result = match entry.tag.as_str() {
+ // Only success response has metadata and then path,
+ // so we cannot tell which path failed.
+ "success" => {
+ let path = entry
+ .metadata
+ .expect("metadata should be present")
+ .path_display;
+ (path, Ok(RpDelete::default().into()))
+ }
+ "failure" => {
+ let error = entry.error.expect("error should be present");
+ let err = Error::new(
+ ErrorKind::Unexpected,
+ &format!("delete failed with error {}",
error.error_summary),
+ );
+ ("".to_string(), Err(err))
+ }
+ _ => (
+ "".to_string(),
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!("delete failed with unexpected tag {}",
entry.tag),
+ )),
+ ),
+ };
+ results.push(result);
+ }
+ results
+ }
}
#[derive(Clone)]
@@ -272,6 +397,21 @@ struct DropboxDeleteArgs {
path: String,
}
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchEntry {
+ path: String,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchArgs {
+ entries: Vec<DropboxDeleteBatchEntry>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchCheckArgs {
+ async_job_id: String,
+}
+
#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxCreateFolderArgs {
path: String,
diff --git a/core/src/services/dropbox/error.rs
b/core/src/services/dropbox/error.rs
index 57e3c177b..b27722636 100644
--- a/core/src/services/dropbox/error.rs
+++ b/core/src/services/dropbox/error.rs
@@ -28,7 +28,7 @@ use crate::Result;
#[derive(Default, Debug, Deserialize)]
#[serde(default)]
pub struct DropboxErrorResponse {
- error_summary: String,
+ pub error_summary: String,
}
/// Parse error response into Error.
@@ -78,6 +78,8 @@ pub fn parse_dropbox_error_summary(summary: &str) ->
Option<(ErrorKind, bool)> {
Some((ErrorKind::NotFound, false))
} else if summary.starts_with("path/conflict") {
Some((ErrorKind::AlreadyExists, false))
+ } else if summary.starts_with("too_many_write_operations") {
+ Some((ErrorKind::RateLimited, true))
} else {
None
}