This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e921cf15 refactor(aliyun-drive): rewrite writer part (#4744)
41e921cf15 is described below

commit 41e921cf15706f65c6fc4e2ce3d1a2feb1b62f76
Author: Hanchin Hsieh <[email protected]>
AuthorDate: Mon Jun 17 18:06:23 2024 +0800

    refactor(aliyun-drive): rewrite writer part (#4744)
    
    refactor(aliyun-drive): remove optional rapid-upload
---
 core/Cargo.toml                           |   2 +-
 core/src/services/aliyun_drive/backend.rs |  17 ----
 core/src/services/aliyun_drive/core.rs    |   3 +-
 core/src/services/aliyun_drive/docs.md    |   5 -
 core/src/services/aliyun_drive/error.rs   |   1 +
 core/src/services/aliyun_drive/writer.rs  | 160 +++++++++---------------------
 6 files changed, 48 insertions(+), 140 deletions(-)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 73c7fea926..d49b9de04e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -193,7 +193,7 @@ services-vercel-blob = []
 services-webdav = []
 services-webhdfs = []
 services-yandex-disk = []
-services-aliyun-drive = ["dep:sha1"]
+services-aliyun-drive = []
 
 [lib]
 bench = false
diff --git a/core/src/services/aliyun_drive/backend.rs 
b/core/src/services/aliyun_drive/backend.rs
index 766c059955..9974757bca 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -75,12 +75,6 @@ pub struct AliyunDriveConfig {
     ///
     /// Fallback to default if not set or no other drives can be found.
     pub drive_type: String,
-    /// rapid_upload of this backend.
-    ///
-    /// Skip uploading files that are already in the drive by hashing their 
content.
-    ///
-    /// Only works under the write_once operation.
-    pub rapid_upload: bool,
 }
 
 impl Debug for AliyunDriveConfig {
@@ -160,13 +154,6 @@ impl AliyunDriveBuilder {
         self
     }
 
-    /// Set rapid_upload of this backend.
-    pub fn rapid_upload(&mut self, rapid_upload: bool) -> &mut Self {
-        self.config.rapid_upload = rapid_upload;
-
-        self
-    }
-
     /// Specify the http client that used by this service.
     ///
     /// # Notes
@@ -243,15 +230,11 @@ impl Builder for AliyunDriveBuilder {
         };
         debug!("backend use drive_type {:?}", drive_type);
 
-        let rapid_upload = self.config.rapid_upload;
-        debug!("backend use rapid_upload {}", rapid_upload);
-
         Ok(AliyunDriveBackend {
             core: Arc::new(AliyunDriveCore {
                 endpoint: "https://openapi.alipan.com".to_string(),
                 root,
                 drive_type,
-                rapid_upload,
                 signer: Arc::new(Mutex::new(AliyunDriveSigner {
                     drive_id: None,
                     sign,
diff --git a/core/src/services/aliyun_drive/core.rs 
b/core/src/services/aliyun_drive/core.rs
index 51f30665c7..00ac06b926 100644
--- a/core/src/services/aliyun_drive/core.rs
+++ b/core/src/services/aliyun_drive/core.rs
@@ -63,7 +63,6 @@ pub struct AliyunDriveCore {
     pub endpoint: String,
     pub root: String,
     pub drive_type: DriveType,
-    pub rapid_upload: bool,
 
     pub signer: Arc<Mutex<AliyunDriveSigner>>,
     pub client: HttpClient,
@@ -475,8 +474,8 @@ pub struct UploadUrlResponse {
 pub struct CreateResponse {
     pub file_id: String,
     pub upload_id: Option<String>,
-    pub rapid_upload: Option<bool>,
     pub part_info_list: Option<Vec<PartInfo>>,
+    pub exist: Option<bool>,
 }
 
 #[derive(Serialize, Deserialize)]
diff --git a/core/src/services/aliyun_drive/docs.md 
b/core/src/services/aliyun_drive/docs.md
index b356ebe3f3..e963fd8a11 100644
--- a/core/src/services/aliyun_drive/docs.md
+++ b/core/src/services/aliyun_drive/docs.md
@@ -21,7 +21,6 @@ This service can be used to:
 - `client_secret`: Set the client_secret for backend.
 - `refresh_token`: Set the refresh_token for backend.
 - `drive_type`: Set the drive_type for backend.
-- `rapid_upload`: Set the rapid_upload for backend.
 
 Refer to [`AliyunDriveBuilder`]`s  public API docs for more information.
 
@@ -54,10 +53,6 @@ async fn main() -> Result<()> {
     //
     // Fallback to the default type if no other types found.
     builder.drive_type("resource");
-    // Set the rapid_upload.
-    //
-    // Works only under the write_once operation for now.
-    builder.rapid_upload(true);
 
     let op: Operator = Operator::new(builder)?.finish();
 
diff --git a/core/src/services/aliyun_drive/error.rs 
b/core/src/services/aliyun_drive/error.rs
index fa343e4f6f..ec2ad85e08 100644
--- a/core/src/services/aliyun_drive/error.rs
+++ b/core/src/services/aliyun_drive/error.rs
@@ -41,6 +41,7 @@ pub async fn parse_error(res: Response<Buffer>) -> 
Result<Error> {
             Some(code) if code == "PreHashMatched" => (ErrorKind::IsSameFile, 
false),
             _ => (ErrorKind::Unexpected, false),
         },
+        409 => (ErrorKind::AlreadyExists, false),
         429 => match code {
             Some(code) if code == "TooManyRequests" => 
(ErrorKind::RateLimited, true),
             _ => (ErrorKind::Unexpected, false),
diff --git a/core/src/services/aliyun_drive/writer.rs 
b/core/src/services/aliyun_drive/writer.rs
index 174c52a12c..c1a3efa947 100644
--- a/core/src/services/aliyun_drive/writer.rs
+++ b/core/src/services/aliyun_drive/writer.rs
@@ -15,25 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::sync::Arc;
-
-use base64::engine::general_purpose;
-use base64::Engine;
+use super::core::{AliyunDriveCore, UploadUrlResponse};
+use crate::{
+    raw::*,
+    services::aliyun_drive::core::{CheckNameMode, CreateResponse, CreateType},
+    *,
+};
 use bytes::Buf;
-use md5::Digest;
-use md5::Md5;
-use sha1::Sha1;
+use std::sync::Arc;
 use tokio::sync::RwLock;
 
-use super::core::AliyunDriveCore;
-use super::core::RapidUpload;
-use super::core::UploadUrlResponse;
-use crate::raw::*;
-use crate::services::aliyun_drive::core::CheckNameMode;
-use crate::services::aliyun_drive::core::CreateResponse;
-use crate::services::aliyun_drive::core::CreateType;
-use crate::*;
-
 pub type AliyunDriveWriters = oio::MultipartWriter<AliyunDriveWriter>;
 
 pub struct AliyunDriveWriter {
@@ -72,65 +63,11 @@ impl AliyunDriveWriter {
         Ok(file_id.clone())
     }
 
-    async fn get_rapid_upload(
-        &self,
-        size: Option<u64>,
-        body: Option<crate::Buffer>,
-        pre_hash: bool,
-    ) -> Result<Option<RapidUpload>> {
-        let Some(size) = size else {
-            return Ok(None);
-        };
-        let Some(body) = body else {
-            return Ok(None);
-        };
-        if pre_hash && size > 1024 * 100 {
-            return Ok(Some(RapidUpload {
-                pre_hash: Some(format!(
-                    "{:x}",
-                    
Sha1::new_with_prefix(body.slice(0..1024).to_vec()).finalize()
-                )),
-                content_hash: None,
-                proof_code: None,
-            }));
-        }
-        let (token, _) = self.core.get_token_and_drive().await?;
-        let Ok(index) = u64::from_str_radix(
-            &format!("{:x}", 
Md5::new_with_prefix(token.unwrap()).finalize())[0..16],
-            16,
-        ) else {
-            return Err(Error::new(
-                ErrorKind::Unexpected,
-                "cannot parse hexadecimal",
-            ));
-        };
-        let size = size as usize;
-        let index = index as usize % size;
-        let (range_start, range_end) = if index + 8 > size {
-            (index, size)
-        } else {
-            (index, index + 8)
-        };
-
-        Ok(Some(RapidUpload {
-            pre_hash: None,
-            content_hash: Some(format!(
-                "{:x}",
-                Sha1::new_with_prefix(body.to_vec()).finalize()
-            )),
-            proof_code: Some(
-                
general_purpose::STANDARD.encode(body.to_bytes().slice(range_start..range_end)),
-            ),
-        }))
-    }
-
     async fn write(
         &self,
-        size: Option<u64>,
-        body: Option<crate::Buffer>,
-        pre_hash: bool,
+        body: Option<Buffer>,
         upload_url: Option<&str>,
-    ) -> Result<(bool, Option<String>)> {
+    ) -> Result<Option<String>> {
         if let Some(upload_url) = upload_url {
             let Some(body) = body else {
                 return Err(Error::new(
@@ -138,25 +75,27 @@ impl AliyunDriveWriter {
                     "cannot upload without body",
                 ));
             };
-            self.core.upload(upload_url, body).await?;
-            return Ok((false, None));
+            if let Err(err) = self.core.upload(upload_url, body).await {
+                if err.kind() != ErrorKind::AlreadyExists {
+                    return Err(err);
+                }
+            };
+            return Ok(None);
         }
 
         let res = self
             .core
-            .create_with_rapid_upload(
+            .create(
                 Some(&self.parent_file_id),
                 &self.name,
                 CreateType::File,
                 CheckNameMode::Refuse,
-                size,
-                self.get_rapid_upload(size, body.clone(), pre_hash).await?,
             )
             .await;
 
         let res = match res {
             Err(err) if err.kind() == ErrorKind::IsSameFile => {
-                return Ok((true, None));
+                return Ok(None);
             }
             Err(err) => {
                 return Err(err);
@@ -167,8 +106,11 @@ impl AliyunDriveWriter {
         let output: CreateResponse =
             
serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
         self.write_file_id(output.file_id).await;
+        if output.exist.is_some_and(|x| x) {
+            return Err(Error::new(ErrorKind::AlreadyExists, "file exists"));
+        }
 
-        if output.upload_id.is_some() && output.rapid_upload.is_some_and(|x| 
!x) {
+        if output.upload_id.is_some() {
             if let Some(body) = body {
                 let Some(part_info_list) = output.part_info_list else {
                     return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_url"));
@@ -176,50 +118,41 @@ impl AliyunDriveWriter {
                 if part_info_list.is_empty() {
                     return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_url"));
                 }
-                self.core
-                    .upload(&part_info_list[0].upload_url, body)
-                    .await?;
+                if let Err(err) = 
self.core.upload(&part_info_list[0].upload_url, body).await {
+                    if err.kind() != ErrorKind::AlreadyExists {
+                        return Err(err);
+                    }
+                }
             }
         }
 
-        Ok((false, output.upload_id))
+        Ok(output.upload_id)
     }
-}
-
-impl oio::MultipartWrite for AliyunDriveWriter {
-    async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
-        let upload_id = if self.core.rapid_upload {
-            let (rapid, mut upload_id) = self
-                .write(Some(size), Some(body.clone()), true, None)
-                .await?;
 
-            let size = if rapid { Some(size) } else { None };
-            let (_, new_upload_id) = self.write(size, Some(body), false, 
None).await?;
+    async fn complete(&self, upload_id: &str) -> Result<Buffer> {
+        let file_id = self.read_file_id().await?;
 
-            if new_upload_id.is_some() {
-                upload_id = new_upload_id;
-            }
+        self.core.complete(&file_id, upload_id).await
+    }
 
-            let Some(upload_id) = upload_id else {
-                return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_id"));
-            };
-            upload_id
-        } else {
-            let upload_id = self.initiate_part().await?;
-            self.write_part(&upload_id, 0, size, body).await?;
-            upload_id
-        };
+    async fn delete(&self) -> Result<()> {
         let file_id = self.read_file_id().await?;
 
-        self.core.complete(&file_id, &upload_id).await?;
+        self.core.delete_path(&file_id).await
+    }
+}
+
+impl oio::MultipartWrite for AliyunDriveWriter {
+    async fn write_once(&self, size: u64, body: crate::Buffer) -> Result<()> {
+        let upload_id = self.initiate_part().await?;
+        self.write_part(&upload_id, 0, size, body).await?;
 
+        self.complete(&upload_id).await?;
         Ok(())
     }
 
     async fn initiate_part(&self) -> Result<String> {
-        let (_, upload_id) = self.write(None, None, false, None).await?;
-
-        let Some(upload_id) = upload_id else {
+        let Some(upload_id) = self.write(None, None).await? else {
             return Err(Error::new(ErrorKind::Unsupported, "cannot find 
upload_id"));
         };
 
@@ -250,7 +183,7 @@ impl oio::MultipartWrite for AliyunDriveWriter {
         if part_info_list.is_empty() {
             return Err(Error::new(ErrorKind::Unexpected, "cannot find 
upload_url"));
         }
-        self.write(None, Some(body), false, 
Some(&part_info_list[0].upload_url))
+        self.write(Some(body), Some(&part_info_list[0].upload_url))
             .await?;
 
         Ok(oio::MultipartPart {
@@ -261,15 +194,12 @@ impl oio::MultipartWrite for AliyunDriveWriter {
     }
 
     async fn complete_part(&self, upload_id: &str, _parts: 
&[oio::MultipartPart]) -> Result<()> {
-        let file_id = self.read_file_id().await?;
-        self.core.complete(&file_id, upload_id).await?;
+        self.complete(upload_id).await?;
 
         Ok(())
     }
 
     async fn abort_part(&self, _upload_id: &str) -> Result<()> {
-        let file_id = self.read_file_id().await?;
-
-        self.core.delete_path(&file_id).await
+        self.delete().await
     }
 }

Reply via email to