This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 45f042504 feat(service/dropbox): impl batch delete (#2606)
45f042504 is described below

commit 45f042504146ef0871489007103032a90d7a8f59
Author: Suyan <[email protected]>
AuthorDate: Sat Jul 8 19:40:22 2023 +0800

    feat(service/dropbox): impl batch delete (#2606)
    
    * refresh token flow
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * impl batch_delete
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * enable CI test
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * add too_many_write_operations as retriable
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * use backon to retry
    
    Signed-off-by: suyanhanx <[email protected]>
    
    * make fmt happy
    
    Signed-off-by: suyanhanx <[email protected]>
    
    ---------
    
    Signed-off-by: suyanhanx <[email protected]>
---
 .env.example                               |   3 +
 .github/workflows/service_test_dropbox.yml |  66 +++++++++++++
 core/src/services/dropbox/backend.rs       |  86 +++++++++++++++++
 core/src/services/dropbox/builder.rs       |  49 +++++++---
 core/src/services/dropbox/core.rs          | 146 ++++++++++++++++++++++++++++-
 core/src/services/dropbox/error.rs         |   4 +-
 6 files changed, 338 insertions(+), 16 deletions(-)

diff --git a/.env.example b/.env.example
index 2e71f7b88..e3c54bc69 100644
--- a/.env.example
+++ b/.env.example
@@ -127,3 +127,6 @@ OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
 OPENDAL_DROPBOX_TEST=false
 OPENDAL_DROPBOX_ROOT=/tmp/opendal/
 OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
+OPENDAL_DROPBOX_REFRESH_TOKEN=<refresh_token>
+OPENDAL_DROPBOX_CLIENT_ID=<client_id>
+OPENDAL_DROPBOX_CLIENT_SECRET=<client_secret>
diff --git a/.github/workflows/service_test_dropbox.yml 
b/.github/workflows/service_test_dropbox.yml
new file mode 100644
index 000000000..0bc02c895
--- /dev/null
+++ b/.github/workflows/service_test_dropbox.yml
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Dropbox
+
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    branches:
+      - main
+    paths:
+      - "core/src/**"
+      - "core/tests/**"
+      - "!core/src/docs/**"
+      - "!core/src/services/**"
+      - "core/src/services/dropbox/**"
+      - ".github/workflows/service_test_dropbox.yml"
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  dropbox:
+    runs-on: ubuntu-latest
+    if: github.event_name == 'push' || 
!github.event.pull_request.head.repo.fork
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+
+      - name: Load secret
+        id: op-load-secret
+        uses: 1password/load-secrets-action@v1
+        with:
+          export-env: true
+        env:
+          OP_SERVICE_ACCOUNT_TOKEN: ${{ secrets.OP_SERVICE_ACCOUNT_TOKEN }}
+          OPENDAL_DROPBOX_TEST: op://services/dropbox/test
+          OPENDAL_DROPBOX_ROOT: op://services/dropbox/root
+          OPENDAL_DROPBOX_REFRESH_TOKEN: op://services/dropbox/refresh_token
+          OPENDAL_DROPBOX_CLIENT_ID: op://services/dropbox/client_id
+          OPENDAL_DROPBOX_CLIENT_SECRET: op://services/dropbox/client_secret
+
+      - name: Test
+        shell: bash
+        working-directory: core
+        # It's easily for dropbox to trigger too_many_write_operations error.
+        # So we run tests one by one.
+        run: cargo test dropbox --features=services-dropbox -- --test-threads=1
diff --git a/core/src/services/dropbox/backend.rs 
b/core/src/services/dropbox/backend.rs
index 4af8f01b6..5567285d0 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -17,17 +17,29 @@
 
 use std::fmt::Debug;
 use std::sync::Arc;
+use std::time::Duration;
 
 use async_trait::async_trait;
+use backon::ExponentialBuilder;
+use backon::Retryable;
 use http::StatusCode;
+use once_cell::sync::Lazy;
 use serde::Deserialize;
 
 use super::core::DropboxCore;
 use super::error::parse_error;
 use super::writer::DropboxWriter;
 use crate::raw::*;
+use crate::services::dropbox::error::DropboxErrorResponse;
 use crate::*;
 
+static BACKOFF: Lazy<ExponentialBuilder> = Lazy::new(|| {
+    ExponentialBuilder::default()
+        .with_max_delay(Duration::from_secs(10))
+        .with_max_times(10)
+        .with_jitter()
+});
+
 #[derive(Clone, Debug)]
 pub struct DropboxBackend {
     pub core: Arc<DropboxCore>,
@@ -58,6 +70,9 @@ impl Accessor for DropboxBackend {
 
                 delete: true,
 
+                batch: true,
+                batch_delete: true,
+
                 ..Default::default()
             });
         ma
@@ -162,6 +177,59 @@ impl Accessor for DropboxBackend {
             _ => Err(parse_error(resp).await?),
         }
     }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        let ops = args.into_operation();
+        if ops.len() > 1000 {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "dropbox services only allow delete up to 1000 keys at once",
+            )
+            .with_context("length", ops.len().to_string()));
+        }
+
+        let paths = ops.into_iter().map(|(p, _)| p).collect::<Vec<_>>();
+
+        let resp = self.core.dropbox_delete_batch(paths).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let (_parts, body) = resp.into_parts();
+                let bs = body.bytes().await?;
+                let decoded_response = 
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                match decoded_response.tag.as_str() {
+                    "complete" => {
+                        let entries = 
decoded_response.entries.unwrap_or_default();
+                        let results = 
self.core.handle_batch_delete_complete_result(entries);
+                        Ok(RpBatch::new(results))
+                    }
+                    "async_job_id" => {
+                        let job_id = decoded_response
+                            .async_job_id
+                            .expect("async_job_id should be present");
+                        let res = { || 
self.core.dropbox_delete_batch_check(job_id.clone()) }
+                            .retry(&*BACKOFF)
+                            .when(|e| e.is_temporary())
+                            .await?;
+
+                        Ok(res)
+                    }
+                    _ => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!(
+                            "delete batch failed with unexpected tag {}",
+                            decoded_response.tag
+                        ),
+                    )),
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
 }
 
 #[derive(Default, Debug, Deserialize)]
@@ -217,3 +285,21 @@ pub struct DropboxMetadataSharingInfo {
     pub traverse_only: Option<bool>,
     pub no_access: Option<bool>,
 }
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponse {
+    #[serde(rename(deserialize = ".tag"))]
+    pub tag: String,
+    pub async_job_id: Option<String>,
+    pub entries: Option<Vec<DropboxDeleteBatchResponseEntry>>,
+}
+
+#[derive(Default, Debug, Deserialize)]
+#[serde(default)]
+pub struct DropboxDeleteBatchResponseEntry {
+    #[serde(rename(deserialize = ".tag"))]
+    pub tag: String,
+    pub metadata: Option<DropboxMetadataResponse>,
+    pub error: Option<DropboxErrorResponse>,
+}
diff --git a/core/src/services/dropbox/builder.rs 
b/core/src/services/dropbox/builder.rs
index 2827d86a9..d31b26a75 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -49,16 +49,38 @@ use crate::*;
 ///
 /// # Configuration
 ///
-/// - `access_token`: set the access_token for google drive api
-/// - `root`: Set the work directory for backend
+/// - `root`: Set the work directory for this backend.
 ///
-/// You can refer to [`DropboxBuilder`]'s docs for more information
+/// ## Credentials related
+///
+/// ### Just provide Access Token (Temporary)
+///
+/// - `access_token`: set the access_token for this backend.
+/// Please notice its expiration.
+///
+/// ### Or provide Client ID and Client Secret and refresh token (Long Term)
+///
+/// If you want to let OpenDAL to refresh the access token automatically,
+/// please provide the following fields:
+///
+/// - `refresh_token`: set the refresh_token for dropbox api
+/// - `client_id`: set the client_id for dropbox api
+/// - `client_secret`: set the client_secret for dropbox api
+///
+/// OpenDAL is a library, it cannot do the first step of OAuth2 for you.
+/// You need to get authorization code from user by calling Dropbox's 
authorize url
+/// and exchange it for refresh token.
+///
+/// Please refer to [Dropbox OAuth2 
Guide](https://www.dropbox.com/developers/reference/oauth-guide)
+/// for more information.
+///
+/// You can refer to [`DropboxBuilder`]'s docs for more information.
 ///
 /// # Example
 ///
 /// ## Via Builder
 ///
-/// ```
+/// ```rust
 /// use anyhow::Result;
 /// use opendal::raw::OpWrite;
 /// use opendal::services::Dropbox;
@@ -66,8 +88,8 @@ use crate::*;
 ///
 /// #[tokio::main]
 /// async fn main() -> Result<()> {
-///     // create backend builder
 ///     let mut builder = Dropbox::default();
+///     builder.root("/test");
 ///     builder.access_token("<token>");
 ///
 ///     let op: Operator = Operator::new(builder)?.finish();
@@ -78,7 +100,9 @@ use crate::*;
 #[derive(Default)]
 pub struct DropboxBuilder {
     root: Option<String>,
+
     access_token: Option<String>,
+
     refresh_token: Option<String>,
     client_id: Option<String>,
     client_secret: Option<String>,
@@ -105,15 +129,16 @@ impl DropboxBuilder {
     ///
     /// You can get the access token from [Dropbox App 
Console](https://www.dropbox.com/developers/apps)
     ///
-    /// NOTE: this token will be expired in 4 hours. If you are trying to use 
dropbox services in a long time, please set a refresh_token instead.
+    /// NOTE: this token will be expired in 4 hours.
+    /// If you are trying to use the Dropbox service in a long time, please 
set a refresh_token instead.
     pub fn access_token(&mut self, access_token: &str) -> &mut Self {
         self.access_token = Some(access_token.to_string());
         self
     }
 
-    /// Refersh token is used for long term access to the Dropbox API.
+    /// Refresh token is used for long term access to the Dropbox API.
     ///
-    /// You can get the refresh token via OAuth2.0 Flow of dropbox.
+    /// You can get the refresh token via OAuth 2.0 Flow of Dropbox.
     ///
     /// OpenDAL will use this refresh token to get a new access token when the 
old one is expired.
     pub fn refresh_token(&mut self, refresh_token: &str) -> &mut Self {
@@ -121,17 +146,17 @@ impl DropboxBuilder {
         self
     }
 
-    /// Set the client id for dropbox.
+    /// Set the client id for Dropbox.
     ///
-    /// This is required for OAuth2.0 Flow with refresh token.
+    /// This is required for OAuth 2.0 Flow to refresh the access token.
     pub fn client_id(&mut self, client_id: &str) -> &mut Self {
         self.client_id = Some(client_id.to_string());
         self
     }
 
-    /// Set the client secret for dropbox.
+    /// Set the client secret for Dropbox.
     ///
-    /// This is required for OAuth2.0 Flow with refresh token.
+    /// This is required for OAuth 2.0 Flow with refresh the access token.
     pub fn client_secret(&mut self, client_secret: &str) -> &mut Self {
         self.client_secret = Some(client_secret.to_string());
         self
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index bbb2e5208..4f5be56bf 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -28,6 +28,7 @@ use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
 use http::Request;
 use http::Response;
+use http::StatusCode;
 use serde::Deserialize;
 use serde::Serialize;
 use tokio::sync::Mutex;
@@ -37,8 +38,16 @@ use crate::raw::new_json_deserialize_error;
 use crate::raw::new_json_serialize_error;
 use crate::raw::new_request_build_error;
 use crate::raw::AsyncBody;
+use crate::raw::BatchedReply;
 use crate::raw::HttpClient;
 use crate::raw::IncomingAsyncBody;
+use crate::raw::RpBatch;
+use crate::raw::RpDelete;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponse;
+use crate::services::dropbox::backend::DropboxDeleteBatchResponseEntry;
+use crate::services::dropbox::error::parse_error;
+use crate::types::Error;
+use crate::types::ErrorKind;
 use crate::types::Result;
 
 pub struct DropboxCore {
@@ -130,6 +139,86 @@ impl DropboxCore {
         self.client.send(request).await
     }
 
+    pub async fn dropbox_delete_batch(
+        &self,
+        paths: Vec<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = 
"https://api.dropboxapi.com/2/files/delete_batch".to_string();
+        let args = DropboxDeleteBatchArgs {
+            entries: paths
+                .into_iter()
+                .map(|path| DropboxDeleteBatchEntry {
+                    path: self.build_path(&path),
+                })
+                .collect(),
+        };
+
+        let bs = 
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+        let mut request = Request::post(&url)
+            .header(CONTENT_TYPE, "application/json")
+            .header(CONTENT_LENGTH, bs.len())
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut request).await?;
+        self.client.send(request).await
+    }
+
+    pub async fn dropbox_delete_batch_check_request(
+        &self,
+        async_job_id: String,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = 
"https://api.dropboxapi.com/2/files/delete_batch/check".to_string();
+        let args = DropboxDeleteBatchCheckArgs { async_job_id };
+
+        let bs = 
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+        let mut request = Request::post(&url)
+            .header(CONTENT_TYPE, "application/json")
+            .header(CONTENT_LENGTH, bs.len())
+            .body(AsyncBody::Bytes(bs))
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut request).await?;
+        self.client.send(request).await
+    }
+
+    pub async fn dropbox_delete_batch_check(&self, job_id: String) -> 
Result<RpBatch> {
+        let resp = self
+            .dropbox_delete_batch_check_request(job_id.clone())
+            .await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let decoded_response = 
serde_json::from_slice::<DropboxDeleteBatchResponse>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+                match decoded_response.tag.as_str() {
+                    "in_progress" => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "delete batch job still in progress",
+                    )
+                    .set_temporary()),
+                    "complete" => {
+                        let entries = 
decoded_response.entries.unwrap_or_default();
+                        let results = 
self.handle_batch_delete_complete_result(entries);
+                        Ok(RpBatch::new(results))
+                    }
+                    _ => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!(
+                            "delete batch check failed with unexpected tag {}",
+                            decoded_response.tag
+                        ),
+                    )),
+                }
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     pub async fn dropbox_create_folder(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let url = 
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
         let args = DropboxCreateFolderArgs {
@@ -175,7 +264,7 @@ impl DropboxCore {
         if !signer.access_token.is_empty() && signer.expires_in > Utc::now() {
             let value = format!("Bearer {}", signer.access_token)
                 .parse()
-                .expect("token must be valid header");
+                .expect("token must be valid header value");
             req.headers_mut().insert(header::AUTHORIZATION, value);
             return Ok(());
         }
@@ -202,7 +291,6 @@ impl DropboxCore {
             serde_json::from_slice(&body).map_err(new_json_deserialize_error)?;
 
         // Update signer after token refreshed.
-
         signer.access_token = token.access_token.clone();
 
         // Refresh it 2 minutes earlier.
@@ -211,11 +299,48 @@ impl DropboxCore {
 
         let value = format!("Bearer {}", token.access_token)
             .parse()
-            .expect("token must be valid header");
+            .expect("token must be valid header value");
         req.headers_mut().insert(header::AUTHORIZATION, value);
 
         Ok(())
     }
+
+    pub fn handle_batch_delete_complete_result(
+        &self,
+        entries: Vec<DropboxDeleteBatchResponseEntry>,
+    ) -> Vec<(String, Result<BatchedReply>)> {
+        let mut results = Vec::with_capacity(entries.len());
+        for entry in entries {
+            let result = match entry.tag.as_str() {
+                // Only success response has metadata and then path,
+                // so we cannot tell which path failed.
+                "success" => {
+                    let path = entry
+                        .metadata
+                        .expect("metadata should be present")
+                        .path_display;
+                    (path, Ok(RpDelete::default().into()))
+                }
+                "failure" => {
+                    let error = entry.error.expect("error should be present");
+                    let err = Error::new(
+                        ErrorKind::Unexpected,
+                        &format!("delete failed with error {}", 
error.error_summary),
+                    );
+                    ("".to_string(), Err(err))
+                }
+                _ => (
+                    "".to_string(),
+                    Err(Error::new(
+                        ErrorKind::Unexpected,
+                        &format!("delete failed with unexpected tag {}", 
entry.tag),
+                    )),
+                ),
+            };
+            results.push(result);
+        }
+        results
+    }
 }
 
 #[derive(Clone)]
@@ -272,6 +397,21 @@ struct DropboxDeleteArgs {
     path: String,
 }
 
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchEntry {
+    path: String,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchArgs {
+    entries: Vec<DropboxDeleteBatchEntry>,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxDeleteBatchCheckArgs {
+    async_job_id: String,
+}
+
 #[derive(Clone, Debug, Deserialize, Serialize)]
 struct DropboxCreateFolderArgs {
     path: String,
diff --git a/core/src/services/dropbox/error.rs 
b/core/src/services/dropbox/error.rs
index 57e3c177b..b27722636 100644
--- a/core/src/services/dropbox/error.rs
+++ b/core/src/services/dropbox/error.rs
@@ -28,7 +28,7 @@ use crate::Result;
 #[derive(Default, Debug, Deserialize)]
 #[serde(default)]
 pub struct DropboxErrorResponse {
-    error_summary: String,
+    pub error_summary: String,
 }
 
 /// Parse error response into Error.
@@ -78,6 +78,8 @@ pub fn parse_dropbox_error_summary(summary: &str) -> 
Option<(ErrorKind, bool)> {
         Some((ErrorKind::NotFound, false))
     } else if summary.starts_with("path/conflict") {
         Some((ErrorKind::AlreadyExists, false))
+    } else if summary.starts_with("too_many_write_operations") {
+        Some((ErrorKind::RateLimited, true))
     } else {
         None
     }

Reply via email to