This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 41e921cf15 refactor(aliyun-drive): rewrite writer part (#4744)
41e921cf15 is described below
commit 41e921cf15706f65c6fc4e2ce3d1a2feb1b62f76
Author: Hanchin Hsieh <[email protected]>
AuthorDate: Mon Jun 17 18:06:23 2024 +0800
refactor(aliyun-drive): rewrite writer part (#4744)
refactor(aliyun-drive): remove optional rapid-upload
---
core/Cargo.toml | 2 +-
core/src/services/aliyun_drive/backend.rs | 17 ----
core/src/services/aliyun_drive/core.rs | 3 +-
core/src/services/aliyun_drive/docs.md | 5 -
core/src/services/aliyun_drive/error.rs | 1 +
core/src/services/aliyun_drive/writer.rs | 160 +++++++++---------------------
6 files changed, 48 insertions(+), 140 deletions(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 73c7fea926..d49b9de04e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -193,7 +193,7 @@ services-vercel-blob = []
services-webdav = []
services-webhdfs = []
services-yandex-disk = []
-services-aliyun-drive = ["dep:sha1"]
+services-aliyun-drive = []
[lib]
bench = false
diff --git a/core/src/services/aliyun_drive/backend.rs
b/core/src/services/aliyun_drive/backend.rs
index 766c059955..9974757bca 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -75,12 +75,6 @@ pub struct AliyunDriveConfig {
///
/// Fallback to default if not set or no other drives can be found.
pub drive_type: String,
- /// rapid_upload of this backend.
- ///
- /// Skip uploading files that are already in the drive by hashing their
content.
- ///
- /// Only works under the write_once operation.
- pub rapid_upload: bool,
}
impl Debug for AliyunDriveConfig {
@@ -160,13 +154,6 @@ impl AliyunDriveBuilder {
self
}
- /// Set rapid_upload of this backend.
- pub fn rapid_upload(&mut self, rapid_upload: bool) -> &mut Self {
- self.config.rapid_upload = rapid_upload;
-
- self
- }
-
/// Specify the http client that used by this service.
///
/// # Notes
@@ -243,15 +230,11 @@ impl Builder for AliyunDriveBuilder {
};
debug!("backend use drive_type {:?}", drive_type);
- let rapid_upload = self.config.rapid_upload;
- debug!("backend use rapid_upload {}", rapid_upload);
-
Ok(AliyunDriveBackend {
core: Arc::new(AliyunDriveCore {
endpoint: "https://openapi.alipan.com".to_string(),
root,
drive_type,
- rapid_upload,
signer: Arc::new(Mutex::new(AliyunDriveSigner {
drive_id: None,
sign,
diff --git a/core/src/services/aliyun_drive/core.rs
b/core/src/services/aliyun_drive/core.rs
index 51f30665c7..00ac06b926 100644
--- a/core/src/services/aliyun_drive/core.rs
+++ b/core/src/services/aliyun_drive/core.rs
@@ -63,7 +63,6 @@ pub struct AliyunDriveCore {
pub endpoint: String,
pub root: String,
pub drive_type: DriveType,
- pub rapid_upload: bool,
pub signer: Arc<Mutex<AliyunDriveSigner>>,
pub client: HttpClient,
@@ -475,8 +474,8 @@ pub struct UploadUrlResponse {
pub struct CreateResponse {
pub file_id: String,
pub upload_id: Option<String>,
- pub rapid_upload: Option<bool>,
pub part_info_list: Option<Vec<PartInfo>>,
+ pub exist: Option<bool>,
}
#[derive(Serialize, Deserialize)]
diff --git a/core/src/services/aliyun_drive/docs.md
b/core/src/services/aliyun_drive/docs.md
index b356ebe3f3..e963fd8a11 100644
--- a/core/src/services/aliyun_drive/docs.md
+++ b/core/src/services/aliyun_drive/docs.md
@@ -21,7 +21,6 @@ This service can be used to:
- `client_secret`: Set the client_secret for backend.
- `refresh_token`: Set the refresh_token for backend.
- `drive_type`: Set the drive_type for backend.
-- `rapid_upload`: Set the rapid_upload for backend.
Refer to [`AliyunDriveBuilder`]`s public API docs for more information.
@@ -54,10 +53,6 @@ async fn main() -> Result<()> {
//
// Fallback to the default type if no other types found.
builder.drive_type("resource");
- // Set the rapid_upload.
- //
- // Works only under the write_once operation for now.
- builder.rapid_upload(true);
let op: Operator = Operator::new(builder)?.finish();
diff --git a/core/src/services/aliyun_drive/error.rs
b/core/src/services/aliyun_drive/error.rs
index fa343e4f6f..ec2ad85e08 100644
--- a/core/src/services/aliyun_drive/error.rs
+++ b/core/src/services/aliyun_drive/error.rs
@@ -41,6 +41,7 @@ pub async fn parse_error(res: Response<Buffer>) ->
Result<Error> {
Some(code) if code == "PreHashMatched" => (ErrorKind::IsSameFile,
false),
_ => (ErrorKind::Unexpected, false),
},
+ 409 => (ErrorKind::AlreadyExists, false),
429 => match code {
Some(code) if code == "TooManyRequests" =>
(ErrorKind::RateLimited, true),
_ => (ErrorKind::Unexpected, false),
diff --git a/core/src/services/aliyun_drive/writer.rs
b/core/src/services/aliyun_drive/writer.rs
index 174c52a12c..c1a3efa947 100644
--- a/core/src/services/aliyun_drive/writer.rs
+++ b/core/src/services/aliyun_drive/writer.rs
@@ -15,25 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
-use base64::engine::general_purpose;
-use base64::Engine;
+use super::core::{AliyunDriveCore, UploadUrlResponse};
+use crate::{
+ raw::*,
+ services::aliyun_drive::core::{CheckNameMode, CreateResponse, CreateType},
+ *,
+};
use bytes::Buf;
-use md5::Digest;
-use md5::Md5;
-use sha1::Sha1;
+use std::sync::Arc;
use tokio::sync::RwLock;
-use super::core::AliyunDriveCore;
-use super::core::RapidUpload;
-use super::core::UploadUrlResponse;
-use crate::raw::*;
-use crate::services::aliyun_drive::core::CheckNameMode;
-use crate::services::aliyun_drive::core::CreateResponse;
-use crate::services::aliyun_drive::core::CreateType;
-use crate::*;
-
pub type AliyunDriveWriters = oio::MultipartWriter<AliyunDriveWriter>;
pub struct AliyunDriveWriter {
@@ -72,65 +63,11 @@ impl AliyunDriveWriter {
Ok(file_id.clone())
}
- async fn get_rapid_upload(
- &self,
- size: Option<u64>,
- body: Option<crate::Buffer>,
- pre_hash: bool,
- ) -> Result<Option<RapidUpload>> {
- let Some(size) = size else {
- return Ok(None);
- };
- let Some(body) = body else {
- return Ok(None);
- };
- if pre_hash && size > 1024 * 100 {
- return Ok(Some(RapidUpload {
- pre_hash: Some(format!(
- "{:x}",
-
Sha1::new_with_prefix(body.slice(0..1024).to_vec()).finalize()
- )),
- content_hash: None,
- proof_code: None,
- }));
- }
- let (token, _) = self.core.get_token_and_drive().await?;
- let Ok(index) = u64::from_str_radix(
- &format!("{:x}",
Md5::new_with_prefix(token.unwrap()).finalize())[0..16],
- 16,
- ) else {
- return Err(Error::new(
- ErrorKind::Unexpected,
- "cannot parse hexadecimal",
- ));
- };
- let size = size as usize;
- let index = index as usize % size;
- let (range_start, range_end) = if index + 8 > size {
- (index, size)
- } else {
- (index, index + 8)
- };
-
- Ok(Some(RapidUpload {
- pre_hash: None,
- content_hash: Some(format!(
- "{:x}",
- Sha1::new_with_prefix(body.to_vec()).finalize()
- )),
- proof_code: Some(
-
general_purpose::STANDARD.encode(body.to_bytes().slice(range_start..range_end)),
- ),
- }))
- }
-
async fn write(
&self,
- size: Option<u64>,
- body: Option<crate::Buffer>,
- pre_hash: bool,
+ body: Option<Buffer>,
upload_url: Option<&str>,
- ) -> Result<(bool, Option<String>)> {
+ ) -> Result<Option<String>> {
if let Some(upload_url) = upload_url {
let Some(body) = body else {
return Err(Error::new(
@@ -138,25 +75,27 @@ impl AliyunDriveWriter {
"cannot upload without body",
));
};
- self.core.upload(upload_url, body).await?;
- return Ok((false, None));
+ if let Err(err) = self.core.upload(upload_url, body).await {
+ if err.kind() != ErrorKind::AlreadyExists {
+ return Err(err);
+ }
+ };
+ return Ok(None);
}
let res = self
.core
- .create_with_rapid_upload(
+ .create(
Some(&self.parent_file_id),
&self.name,
CreateType::File,
CheckNameMode::Refuse,
- size,
- self.get_rapid_upload(size, body.clone(), pre_hash).await?,
)
.await;
let res = match res {
Err(err) if err.kind() == ErrorKind::IsSameFile => {
- return Ok((true, None));
+ return Ok(None);
}
Err(err) => {
return Err(err);
@@ -167,8 +106,11 @@ impl AliyunDriveWriter {
let output: CreateResponse =
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
self.write_file_id(output.file_id).await;
+ if output.exist.is_some_and(|x| x) {
+ return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
+ }
- if output.upload_id.is_some() && output.rapid_upload.is_some_and(|x|
!x) {
+ if output.upload_id.is_some() {
if let Some(body) = body {
let Some(part_info_list) = output.part_info_list else {
return Err(Error::new(ErrorKind::Unexpected, "cannot find
upload_url"));
@@ -176,50 +118,41 @@ impl AliyunDriveWriter {
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find
upload_url"));
}
- self.core
- .upload(&part_info_list[0].upload_url, body)
- .await?;
+ if let Err(err) =
self.core.upload(&part_info_list[0].upload_url, body).await {
+ if err.kind() != ErrorKind::AlreadyExists {
+ return Err(err);
+ }
+ }
}
}
- Ok((false, output.upload_id))
+ Ok(output.upload_id)
}
-}
-
-impl oio::MultipartWrite for AliyunDriveWriter {
- async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
- let upload_id = if self.core.rapid_upload {
- let (rapid, mut upload_id) = self
- .write(Some(size), Some(body.clone()), true, None)
- .await?;
- let size = if rapid { Some(size) } else { None };
- let (_, new_upload_id) = self.write(size, Some(body), false,
None).await?;
+ async fn complete(&self, upload_id: &str) -> Result<Buffer> {
+ let file_id = self.read_file_id().await?;
- if new_upload_id.is_some() {
- upload_id = new_upload_id;
- }
+ self.core.complete(&file_id, upload_id).await
+ }
- let Some(upload_id) = upload_id else {
- return Err(Error::new(ErrorKind::Unexpected, "cannot find
upload_id"));
- };
- upload_id
- } else {
- let upload_id = self.initiate_part().await?;
- self.write_part(&upload_id, 0, size, body).await?;
- upload_id
- };
+ async fn delete(&self) -> Result<()> {
let file_id = self.read_file_id().await?;
- self.core.complete(&file_id, &upload_id).await?;
+ self.core.delete_path(&file_id).await
+ }
+}
+
+impl oio::MultipartWrite for AliyunDriveWriter {
+ async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
+ let upload_id = self.initiate_part().await?;
+ self.write_part(&upload_id, 0, size, body).await?;
+ self.complete(&upload_id).await?;
Ok(())
}
async fn initiate_part(&self) -> Result<String> {
- let (_, upload_id) = self.write(None, None, false, None).await?;
-
- let Some(upload_id) = upload_id else {
+ let Some(upload_id) = self.write(None, None).await? else {
return Err(Error::new(ErrorKind::Unsupported, "cannot find
upload_id"));
};
@@ -250,7 +183,7 @@ impl oio::MultipartWrite for AliyunDriveWriter {
if part_info_list.is_empty() {
return Err(Error::new(ErrorKind::Unexpected, "cannot find
upload_url"));
}
- self.write(None, Some(body), false,
Some(&part_info_list[0].upload_url))
+ self.write(Some(body), Some(&part_info_list[0].upload_url))
.await?;
Ok(oio::MultipartPart {
@@ -261,15 +194,12 @@ impl oio::MultipartWrite for AliyunDriveWriter {
}
async fn complete_part(&self, upload_id: &str, _parts:
&[oio::MultipartPart]) -> Result<()> {
- let file_id = self.read_file_id().await?;
- self.core.complete(&file_id, upload_id).await?;
+ self.complete(upload_id).await?;
Ok(())
}
async fn abort_part(&self, _upload_id: &str) -> Result<()> {
- let file_id = self.read_file_id().await?;
-
- self.core.delete_path(&file_id).await
+ self.delete().await
}
}