This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch gdrive
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 1f5f1bf9a3b15c15e79db12d4b36b166693a741f
Author: suyanhanx <[email protected]>
AuthorDate: Mon Aug 21 17:54:52 2023 +0800

    test pass
    
    Signed-off-by: suyanhanx <[email protected]>
---
 core/src/services/gdrive/backend.rs |  75 +++++++++++---
 core/src/services/gdrive/core.rs    | 106 +++++++++-----------
 core/src/services/gdrive/writer.rs  | 195 +++++-------------------------------
 3 files changed, 137 insertions(+), 239 deletions(-)

diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 03fa4a26c..be49b8ddf 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -27,6 +27,7 @@ use super::error::parse_error;
 use super::writer::GdriveWriter;
 use crate::raw::*;
 use crate::services::gdrive::core::GdriveFile;
+use crate::services::gdrive::core::GdriveFileList;
 use crate::types::Result;
 use crate::*;
 
@@ -59,7 +60,7 @@ impl Accessor for GdriveBackend {
 
     fn info(&self) -> AccessorInfo {
         let mut ma = AccessorInfo::default();
-        ma.set_scheme(crate::Scheme::Gdrive)
+        ma.set_scheme(Scheme::Gdrive)
             .set_root(&self.core.root)
             .set_full_capability(Capability {
                 stat: true,
@@ -88,8 +89,6 @@ impl Accessor for GdriveBackend {
 
         let status = resp.status();
 
-        println!("status out: {:?}", status);
-
         match status {
             StatusCode::OK => {
                 let meta = 
self.parse_metadata(resp.into_body().bytes().await?)?;
@@ -100,7 +99,28 @@ impl Accessor for GdriveBackend {
     }
 
     async fn create_dir(&self, path: &str, _args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let resp = self.core.gdrive_create_dir(path).await?;
+        let parent = self.core.ensure_parent_path(path).await?;
+
+        let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap();
+
+        // As Google Drive allows files have the same name, we need to check 
if the folder exists.
+        let resp = self.core.gdrive_search_folder(path, &parent).await?;
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let body = resp.into_body().bytes().await?;
+                let meta = serde_json::from_slice::<GdriveFileList>(&body)
+                    .map_err(new_json_deserialize_error)?;
+
+                if !meta.files.is_empty() {
+                    return Ok(RpCreateDir::default());
+                }
+            }
+            _ => return Err(parse_error(resp).await?),
+        }
+
+        let resp = self.core.gdrive_create_folder(path, Some(parent)).await?;
 
         let status = resp.status();
 
@@ -145,20 +165,51 @@ impl Accessor for GdriveBackend {
 
         // As Google Drive allows files have the same name, we need to check 
if the file exists.
         // If the file exists, we will keep its ID and update it.
+        let mut file_id: Option<String> = None;
+
+        let resp = self.core.gdrive_stat(path).await;
+        // We don't care about the error here.
+        // As long as the file doesn't exist, we will create a new one.
+        if resp.is_ok() {
+            let resp = resp.unwrap();
+            let status = resp.status();
+
+            if status == StatusCode::OK {
+                let body = resp.into_body().bytes().await?;
+                let meta = serde_json::from_slice::<GdriveFile>(&body)
+                    .map_err(new_json_deserialize_error)?;
+
+                file_id = if meta.id.is_empty() {
+                    None
+                } else {
+                    Some(meta.id)
+                };
+            }
+        }
+
         Ok((
             RpWrite::default(),
-            GdriveWriter::new(self.core.clone(), args, String::from(path), 
None),
+            GdriveWriter::new(self.core.clone(), String::from(path), file_id),
         ))
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.core.gdrive_delete(path).await?;
-
-        let status = resp.status();
+        let resp = self.core.gdrive_delete(path).await;
+        if resp.is_err() {
+            let e = resp.err().unwrap();
+            if e.kind() == ErrorKind::NotFound {
+                return Ok(RpDelete::default());
+            } else {
+                return Err(e);
+            }
+        } else {
+            let resp = resp.unwrap();
+            let status = resp.status();
 
-        match status {
-            StatusCode::NO_CONTENT => Ok(RpDelete::default()),
-            _ => Err(parse_error(resp).await?),
+            match status {
+                StatusCode::NO_CONTENT => Ok(RpDelete::default()),
+                _ => Err(parse_error(resp).await?),
+            }
         }
     }
 }
@@ -168,8 +219,6 @@ impl GdriveBackend {
         let metadata =
             
serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?;
 
-        println!("metadata: {:?}", metadata.size);
-
         let mut meta = Metadata::new(match metadata.mime_type.as_str() {
             "application/vnd.google-apps.folder" => EntryMode::DIR,
             _ => EntryMode::FILE,
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 3b5b9ebb5..642467b9f 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -25,20 +25,12 @@ use http::header;
 use http::Request;
 use http::Response;
 use http::StatusCode;
-use reqwest::multipart::Form;
-use reqwest::multipart::Part;
 use serde::Deserialize;
 use serde_json::json;
 use tokio::sync::Mutex;
 
 use super::error::parse_error;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_json_deserialize_error;
-use crate::raw::new_request_build_error;
-use crate::raw::percent_encode_path;
-use crate::raw::AsyncBody;
-use crate::raw::HttpClient;
-use crate::raw::IncomingAsyncBody;
+use crate::raw::*;
 use crate::types::Result;
 use crate::Error;
 use crate::ErrorKind;
@@ -75,11 +67,9 @@ impl GdriveCore {
     /// - A path is a sequence of file names separated by slashes.
     /// - A file only knows its parent id, but not its name.
     /// - To find the file id of a file, we need to traverse the path from the 
root to the file.
-    async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
+    pub(crate) async fn get_file_id_by_path(&self, file_path: &str) -> 
Result<String> {
         let path = build_rooted_abs_path(&self.root, file_path);
 
-        println!("file path: {}", path);
-
         let mut parent_id = "root".to_owned();
         let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
 
@@ -92,8 +82,6 @@ impl GdriveCore {
                 query += " and mimeType = 
'application/vnd.google-apps.folder'";
             }
 
-            println!("query: {}", query);
-
             let req = self.sign(
                 Request::get(format!(
                     "https://www.googleapis.com/drive/v3/files?q={}";,
@@ -106,8 +94,6 @@ impl GdriveCore {
             let resp = self.client.send(req).await?;
             let status = resp.status();
 
-            println!("status in: {:?}", status);
-
             match status {
                 StatusCode::OK => {
                     let resp_body = &resp.into_body().bytes().await?;
@@ -115,8 +101,6 @@ impl GdriveCore {
                     let gdrive_file_list: GdriveFileList =
                         
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
 
-                    println!("gdrive_file_list: {:?}", 
gdrive_file_list.files.len());
-
                     if gdrive_file_list.files.is_empty() {
                         return Err(Error::new(
                             ErrorKind::NotFound,
@@ -146,7 +130,7 @@ impl GdriveCore {
     ///
     /// - The path is rooted at the root of the Google Drive.
     /// - Will create the parent path recursively.
-    async fn ensure_parent_path(&self, path: &str) -> Result<String> {
+    pub(crate) async fn ensure_parent_path(&self, path: &str) -> 
Result<String> {
         let path = build_rooted_abs_path(&self.root, path);
 
         let mut parent: String = "root".to_owned();
@@ -190,7 +174,6 @@ impl GdriveCore {
                             .map_err(new_json_deserialize_error)?;
 
                         parent = parent_meta.id;
-                        println!("parent: {}", &parent);
                     } else {
                         parent = gdrive_file_list.files[0].id.clone();
                     }
@@ -209,7 +192,6 @@ impl GdriveCore {
                             parent = 
String::from_utf8_lossy(&parent_id).to_string();
                         }
                         _ => {
-                            println!("404 δΊ†");
                             return Err(parse_error(res).await?);
                         }
                     }
@@ -223,13 +205,13 @@ impl GdriveCore {
         Ok(parent.to_owned())
     }
 
-    pub async fn gdrive_search(
+    pub async fn gdrive_search_folder(
         &self,
         target: &str,
         parent: &str,
     ) -> Result<Response<IncomingAsyncBody>> {
         let query = format!(
-            "name = '{}' and '{}' in parents and trashed = false 
&fields=files(id,name,mimeType)",
+            "name = '{}' and '{}' in parents and trashed = false and mimeType 
= 'application/vnd.google-apps.folder'",
             target, parent
         );
         let url = format!(
@@ -254,11 +236,6 @@ impl GdriveCore {
         name: &str,
         parent: Option<String>,
     ) -> Result<Response<IncomingAsyncBody>> {
-        println!(
-            "create folder: {} at {}",
-            name,
-            parent.clone().unwrap_or("root".to_owned())
-        );
         let url = "https://www.googleapis.com/drive/v3/files";;
 
         let req = Request::post(url)
@@ -284,16 +261,6 @@ impl GdriveCore {
         self.client.send(req).await
     }
 
-    /// Create a folder.
-    /// Will create the path recursively.
-    pub async fn gdrive_create_dir(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let parent = self.ensure_parent_path(path).await?;
-
-        let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap();
-
-        self.gdrive_create_folder(path, Some(parent)).await
-    }
-
     pub async fn gdrive_stat(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let path_id = self.get_file_id_by_path(path).await?;
 
@@ -372,7 +339,7 @@ impl GdriveCore {
         &self,
         path: &str,
         size: u64,
-        body: AsyncBody,
+        body: bytes::Bytes,
     ) -> Result<Response<IncomingAsyncBody>> {
         let parent = self.ensure_parent_path(path).await?;
 
@@ -385,31 +352,54 @@ impl GdriveCore {
             "parents": [parent],
         });
 
-        let mut fd = Form::new().text("metadata", metadata.to_string());
+        let req = Request::post(url).header("X-Upload-Content-Length", size);
 
-        match body {
-            AsyncBody::Bytes(bs) => {
-                fd = fd.part("file", Part::bytes(bs.to_vec()));
-            }
-            AsyncBody::Stream(bs) => {
-                fd = fd.part("file", 
Part::stream(reqwest::Body::wrap_stream(bs)));
-            }
-            AsyncBody::Empty => {
-                // We do nothing here.
-                fd = fd.part("file", Part::bytes(Vec::new()));
-            }
-        };
+        let multipart = Multipart::new()
+            .part(
+                FormDataPart::new("metadata")
+                    .header(
+                        header::CONTENT_TYPE,
+                        "application/json; charset=UTF-8".parse().unwrap(),
+                    )
+                    .content(metadata.to_string()),
+            )
+            .part(
+                FormDataPart::new("file")
+                    .header(
+                        header::CONTENT_TYPE,
+                        "application/octet-stream".parse().unwrap(),
+                    )
+                    .content(body),
+            );
 
-        let mut req = Request::post(url)
-            .header(header::CONTENT_TYPE, "application/json; charset=UTF-8")
+        let mut req = multipart.apply(req)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    pub async fn gdrive_upload_overwrite_simple_request(
+        &self,
+        file_id: &str,
+        size: u64,
+        body: bytes::Bytes,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            
"https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media";,
+            file_id
+        );
+
+        let mut req = Request::patch(url)
+            .header(header::CONTENT_TYPE, "application/octet-stream")
             .header(header::CONTENT_LENGTH, size)
             .header("X-Upload-Content-Length", size)
-            .body(fd)
+            .body(AsyncBody::Bytes(body))
             .map_err(new_request_build_error)?;
 
         req = self.sign(req);
 
-        self.client.send_form_data(req).await
+        self.client.send(req).await
     }
 
     /// Start an upload session.
@@ -577,6 +567,6 @@ pub struct GdriveFile {
 // refer to 
https://developers.google.com/drive/api/reference/rest/v3/files/list
 #[derive(Deserialize)]
 #[serde(rename_all = "camelCase")]
-struct GdriveFileList {
-    files: Vec<GdriveFile>,
+pub(crate) struct GdriveFileList {
+    pub(crate) files: Vec<GdriveFile>,
 }
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index f4762dfb7..48d88f3ec 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -24,30 +24,24 @@ use http::StatusCode;
 use super::core::GdriveCore;
 use super::error::parse_error;
 use crate::raw::*;
+use crate::services::gdrive::core::GdriveFile;
 use crate::*;
 
 pub struct GdriveWriter {
     core: Arc<GdriveCore>,
 
-    op: OpWrite,
     path: String,
 
-    target: Option<String>,
-    position: u64,
-    written: u64,
-    size: Option<u64>,
+    file_id: Option<String>,
 }
 
 impl GdriveWriter {
-    pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String, file_id: 
Option<String>) -> Self {
+    pub fn new(core: Arc<GdriveCore>, path: String, file_id: Option<String>) 
-> Self {
         GdriveWriter {
             core,
-            op,
             path,
-            position: 0,
-            written: 0,
-            size: None,
-            target: file_id,
+
+            file_id,
         }
     }
 
@@ -55,7 +49,7 @@ impl GdriveWriter {
     ///
     /// This is used for small objects.
     /// And should overwrite the object if it already exists.
-    pub async fn write_oneshot(&self, size: u64, body: AsyncBody) -> 
Result<()> {
+    pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> 
{
         let resp = self
             .core
             .gdrive_upload_simple_request(&self.path, size, body)
@@ -65,194 +59,59 @@ impl GdriveWriter {
 
         match status {
             StatusCode::OK | StatusCode::CREATED => {
-                resp.into_body().consume().await?;
+                let bs = resp.into_body().bytes().await?;
+
+                let file = serde_json::from_slice::<GdriveFile>(&bs)
+                    .map_err(new_json_deserialize_error)?;
+
+                self.file_id = Some(file.id);
+
                 Ok(())
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    pub async fn initial_upload(&mut self) -> Result<()> {
+    pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> {
+        let file_id = self.file_id.as_ref().ok_or_else(|| {
+            Error::new(ErrorKind::Unexpected, "file_id is required for 
overwrite")
+        })?;
         let resp = self
             .core
-            .gdrive_upload_initial_request(&self.path, None)
+            .gdrive_upload_overwrite_simple_request(file_id, size, body)
             .await?;
 
         let status = resp.status();
 
         match status {
             StatusCode::OK => {
-                let headers = resp.headers();
-
-                match headers.get("location") {
-                    Some(location) => {
-                        self.target = Some(
-                            location
-                                .to_str()
-                                .map_err(|_| {
-                                    Error::new(
-                                        ErrorKind::Unexpected,
-                                        "initial upload failed: location 
header parse failed",
-                                    )
-                                })?
-                                .to_string(),
-                        );
-                        Ok(())
-                    }
-                    _ => Err(Error::new(
-                        ErrorKind::Unexpected,
-                        "initial upload failed: location header not found",
-                    )),
-                }
+                resp.into_body().consume().await?;
+                Ok(())
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    pub async fn write_part(&mut self, size: u64, part: AsyncBody) -> 
Result<()> {
-        if let Some(target) = &self.target {
-            let resp = self
-                .core
-                .gdrive_upload_part_request(target, size, self.position, 
self.size, part)
-                .await?;
-
-            println!("size: {}", size);
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::PERMANENT_REDIRECT => {
-                    let headers = resp.headers();
-
-                    let range = headers.get("range").ok_or_else(|| {
-                        Error::new(
-                            ErrorKind::Unexpected,
-                            "write part failed: range header not found",
-                        )
-                    })?;
-                    self.position = range
-                        .to_str()
-                        .map_err(|_| {
-                            Error::new(
-                                ErrorKind::Unexpected,
-                                "write part failed: range header parse failed",
-                            )
-                        })?
-                        .split('-')
-                        .last()
-                        .ok_or_else(|| {
-                            Error::new(
-                                ErrorKind::Unexpected,
-                                "write part failed: range header parse failed",
-                            )
-                        })?
-                        .parse()
-                        .map_err(|_| {
-                            Error::new(
-                                ErrorKind::Unexpected,
-                                "write part failed: range header parse failed",
-                            )
-                        })?;
-
-                    resp.into_body().consume().await?;
-                    Ok(())
-                }
-                _ => Err(parse_error(resp).await?),
-            }
-        } else {
-            Err(Error::new(
-                ErrorKind::Unexpected,
-                "write part failed: upload location not found",
-            ))
-        }
-    }
-
-    pub async fn finish_upload(&self) -> Result<()> {
-        if let Some(target) = &self.target {
-            println!("final position: {}", self.position + 1);
-            println!("final size: {}", self.size.unwrap_or(self.position + 1));
-
-            let resp = self
-                .core
-                .gdrive_finish_upload_request(
-                    target,
-                    if let Some(size) = self.size {
-                        if (self.position + 1) < size {
-                            return Err(Error::new(
-                                ErrorKind::Unexpected,
-                                "finish upload failed: upload size mismatch",
-                            ));
-                        }
-                        size
-                    } else {
-                        self.position + 1
-                    },
-                )
-                .await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK | StatusCode::CREATED => {
-                    resp.into_body().consume().await?;
-                    Ok(())
-                }
-                _ => Err(parse_error(resp).await?),
-            }
-        } else {
-            Ok(())
-        }
-    }
-
-    pub async fn abort_upload(&self) -> Result<()> {
-        if let Some(target) = &self.target {
-            let resp = self.core.gdrive_cancel_upload_request(target).await?;
-
-            let status = resp.status();
-
-            match status {
-                StatusCode::OK => {
-                    resp.into_body().consume().await?;
-                    Ok(())
-                }
-                _ => Err(parse_error(resp).await?),
-            }
-        } else {
-            Ok(())
-        }
-    }
 }
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        if bs.is_empty() {
-            return Ok(());
-        }
-
-        if self.target.is_none() {
-            if self.op.content_length().unwrap_or_default() == bs.len() as u64 
&& self.written == 0
-            {
-                return self
-                    .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                    .await;
-            } else {
-                self.initial_upload().await?;
-            }
+        if self.file_id.is_none() {
+            self.write_create(bs.len() as u64, bs).await
+        } else {
+            self.write_overwrite(bs.len() as u64, bs).await
         }
-
-        self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await
     }
 
     async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
-        Ok(())
+        Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.abort_upload().await
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
-        self.finish_upload().await
+        Ok(())
     }
 }

Reply via email to