This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a2634c4dd feat(services/dropbox): impl create_dir and polish error
handling (#2600)
a2634c4dd is described below
commit a2634c4dd65a336df2bf6575a821743120010fc4
Author: Suyan <[email protected]>
AuthorDate: Thu Jul 6 20:01:57 2023 +0800
feat(services/dropbox): impl create_dir and polish error handling (#2600)
* test(services/dropbox): enable CI test for dropbox
Signed-off-by: suyanhanx <[email protected]>
* try
Signed-off-by: suyanhanx <[email protected]>
* try
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* support create_dir
Signed-off-by: suyanhanx <[email protected]>
* fix root not affected
Signed-off-by: suyanhanx <[email protected]>
* remove nextest
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* fix
Signed-off-by: suyanhanx <[email protected]>
* tip for test run
Signed-off-by: suyanhanx <[email protected]>
* make fmt happy
Signed-off-by: suyanhanx <[email protected]>
* make clippy happy
Signed-off-by: suyanhanx <[email protected]>
* remove CI config file
Signed-off-by: suyanhanx <[email protected]>
* polish
Signed-off-by: suyanhanx <[email protected]>
---------
Signed-off-by: suyanhanx <[email protected]>
---
.env.example | 4 +++
core/src/services/dropbox/backend.rs | 52 +++++++++++++++++++++++++++++++-----
core/src/services/dropbox/builder.rs | 1 +
core/src/services/dropbox/core.rs | 44 ++++++++++++++++++++++++++----
core/src/services/dropbox/error.rs | 30 +++++++++++++++++++--
5 files changed, 117 insertions(+), 14 deletions(-)
diff --git a/.env.example b/.env.example
index 4057e64ec..2e71f7b88 100644
--- a/.env.example
+++ b/.env.example
@@ -123,3 +123,7 @@ OPENDAL_REDB_TABLE=redb-table
# cacache
OPENDAL_CACACHE_TEST=false
OPENDAL_CACACHE_DATADIR=/tmp/opendal/cacache/
+#dropbox
+OPENDAL_DROPBOX_TEST=false
+OPENDAL_DROPBOX_ROOT=/tmp/opendal/
+OPENDAL_DROPBOX_ACCESS_TOKEN=<access_token>
diff --git a/core/src/services/dropbox/backend.rs
b/core/src/services/dropbox/backend.rs
index e9a2fecf1..f0ae8cde0 100644
--- a/core/src/services/dropbox/backend.rs
+++ b/core/src/services/dropbox/backend.rs
@@ -48,14 +48,36 @@ impl Accessor for DropboxBackend {
ma.set_scheme(Scheme::Dropbox)
.set_root(&self.core.root)
.set_capability(Capability {
+ stat: true,
+
read: true,
+
write: true,
+
+ create_dir: true,
+
delete: true,
+
..Default::default()
});
ma
}
+ async fn create_dir(&self, path: &str, _args: OpCreateDir) ->
Result<RpCreateDir> {
+ let resp = self.core.dropbox_create_folder(path).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK => Ok(RpCreateDir::default()),
+ _ => {
+ let err = parse_error(resp).await?;
+ match err.kind() {
+ ErrorKind::AlreadyExists => Ok(RpCreateDir::default()),
+ _ => Err(err),
+ }
+ }
+ }
+ }
+
async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
let resp = self.core.dropbox_get(path).await?;
let status = resp.status();
@@ -88,7 +110,13 @@ impl Accessor for DropboxBackend {
match status {
StatusCode::OK => Ok(RpDelete::default()),
- _ => Err(parse_error(resp).await?),
+ _ => {
+ let err = parse_error(resp).await?;
+ match err.kind() {
+ ErrorKind::NotFound => Ok(RpDelete::default()),
+ _ => Err(err),
+ }
+ }
}
}
@@ -109,12 +137,22 @@ impl Accessor for DropboxBackend {
_ => EntryMode::Unknown,
};
let mut metadata = Metadata::new(entry_mode);
- let last_modified = decoded_response.client_modified;
- let date_utc_last_modified =
parse_datetime_from_rfc3339(&last_modified)?;
- metadata.set_last_modified(date_utc_last_modified);
- if decoded_response.size.is_some() {
- let size = decoded_response.size.unwrap();
- metadata.set_content_length(size);
+ // Only set last_modified and size if entry_mode is FILE,
because Dropbox API
+ // returns last_modified and size only for files.
+ // FYI:
https://www.dropbox.com/developers/documentation/http/documentation#files-get_metadata
+ if entry_mode == EntryMode::FILE {
+ let date_utc_last_modified =
+
parse_datetime_from_rfc3339(&decoded_response.client_modified)?;
+ metadata.set_last_modified(date_utc_last_modified);
+
+ if let Some(size) = decoded_response.size {
+ metadata.set_content_length(size);
+ } else {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ &format!("no size found for file {}", path),
+ ));
+ }
}
Ok(RpStat::new(metadata))
}
diff --git a/core/src/services/dropbox/builder.rs
b/core/src/services/dropbox/builder.rs
index b8fd8efae..373c83a9c 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -114,6 +114,7 @@ impl Builder for DropboxBuilder {
fn from_map(map: HashMap<String, String>) -> Self {
let mut builder = Self::default();
+ map.get("root").map(|v| builder.root(v));
map.get("access_token").map(|v| builder.access_token(v));
builder
}
diff --git a/core/src/services/dropbox/core.rs
b/core/src/services/dropbox/core.rs
index ad4655a32..790d69fcd 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -49,6 +49,13 @@ impl Debug for DropboxCore {
}
impl DropboxCore {
+ fn build_path(&self, path: &str) -> String {
+ let path = build_rooted_abs_path(&self.root, path);
+ // For dropbox, even the path is a directory,
+ // we still need to remove the trailing slash.
+ path.trim_end_matches('/').to_string()
+ }
+
pub async fn dropbox_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let url: String =
"https://content.dropboxapi.com/2/files/download".to_string();
let download_args = DropboxDownloadArgs {
@@ -59,6 +66,7 @@ impl DropboxCore {
let request = self
.build_auth_header(Request::post(&url))
.header("Dropbox-API-Arg", request_payload)
+ .header(header::CONTENT_LENGTH, 0)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
self.client.send(request).await
@@ -80,9 +88,11 @@ impl DropboxCore {
if let Some(size) = size {
request_builder = request_builder.header(header::CONTENT_LENGTH,
size);
}
- if let Some(mime) = content_type {
- request_builder = request_builder.header(header::CONTENT_TYPE,
mime);
- }
+ request_builder = request_builder.header(
+ header::CONTENT_TYPE,
+ content_type.unwrap_or("application/octet-stream"),
+ );
+
let request = self
.build_auth_header(request_builder)
.header(
@@ -98,7 +108,24 @@ impl DropboxCore {
pub async fn dropbox_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let url = "https://api.dropboxapi.com/2/files/delete_v2".to_string();
let args = DropboxDeleteArgs {
- path: build_rooted_abs_path(&self.root, path),
+ path: self.build_path(path),
+ };
+
+ let bs =
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
+
+ let request = self
+ .build_auth_header(Request::post(&url))
+ .header(header::CONTENT_TYPE, "application/json")
+ .header(header::CONTENT_LENGTH, bs.len())
+ .body(AsyncBody::Bytes(bs))
+ .map_err(new_request_build_error)?;
+ self.client.send(request).await
+ }
+
+ pub async fn dropbox_create_folder(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let url =
"https://api.dropboxapi.com/2/files/create_folder_v2".to_string();
+ let args = DropboxCreateFolderArgs {
+ path: self.build_path(path),
};
let bs =
Bytes::from(serde_json::to_string(&args).map_err(new_json_serialize_error)?);
@@ -106,6 +133,7 @@ impl DropboxCore {
let request = self
.build_auth_header(Request::post(&url))
.header(header::CONTENT_TYPE, "application/json")
+ .header(header::CONTENT_LENGTH, bs.len())
.body(AsyncBody::Bytes(bs))
.map_err(new_request_build_error)?;
self.client.send(request).await
@@ -114,7 +142,7 @@ impl DropboxCore {
pub async fn dropbox_get_metadata(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let url =
"https://api.dropboxapi.com/2/files/get_metadata".to_string();
let args = DropboxMetadataArgs {
- path: build_rooted_abs_path(&self.root, path),
+ path: self.build_path(path),
..Default::default()
};
@@ -123,6 +151,7 @@ impl DropboxCore {
let request = self
.build_auth_header(Request::post(&url))
.header(header::CONTENT_TYPE, "application/json")
+ .header(header::CONTENT_LENGTH, bs.len())
.body(AsyncBody::Bytes(bs))
.map_err(new_request_build_error)?;
self.client.send(request).await
@@ -154,6 +183,11 @@ struct DropboxDeleteArgs {
path: String,
}
+#[derive(Clone, Debug, Deserialize, Serialize)]
+struct DropboxCreateFolderArgs {
+ path: String,
+}
+
#[derive(Clone, Debug, Deserialize, Serialize)]
struct DropboxMetadataArgs {
include_deleted: bool,
diff --git a/core/src/services/dropbox/error.rs
b/core/src/services/dropbox/error.rs
index db878f6a0..5e5831b15 100644
--- a/core/src/services/dropbox/error.rs
+++ b/core/src/services/dropbox/error.rs
@@ -38,12 +38,38 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
| StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};
+
let dropbox_error =
serde_json::from_slice::<DropboxErrorResponse>(&bs).map_err(new_json_deserialize_error);
match dropbox_error {
Ok(dropbox_error) => {
- let mut err = Error::new(kind,
dropbox_error.error_summary.as_ref())
- .with_context("response", format!("{parts:?}"));
+ // We cannot get the error type from the response header when the
status code is 409.
+ // Because Dropbox API v2 will put error summary in the response
body,
+ // we need to parse it to get the correct error type and then
error kind.
+ // See
https://www.dropbox.com/developers/documentation/http/documentation#error-handling
+ let error_summary = dropbox_error.error_summary.as_str();
+
+ let mut err = Error::new(
+ match parts.status {
+ // 409 Conflict means that Endpoint-specific error.
+ // Look to the JSON response body for the specifics of the
error.
+ StatusCode::CONFLICT => {
+ if error_summary.contains("path/not_found")
+ || error_summary.contains("path_lookup/not_found")
+ {
+ ErrorKind::NotFound
+ } else if error_summary.contains("path/conflict") {
+ ErrorKind::AlreadyExists
+ } else {
+ ErrorKind::Unexpected
+ }
+ }
+ // Otherwise, we can get the error type from the response
status code.
+ _ => kind,
+ },
+ error_summary,
+ )
+ .with_context("response", format!("{parts:?}"));
if retryable {
err = err.set_temporary();