This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch gdrive
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 7dff3c44fb7832ff98622f778483073f27ca47ae
Author: suyanhanx <[email protected]>
AuthorDate: Tue Aug 1 10:28:20 2023 +0800

    stash
    
    Signed-off-by: suyanhanx <[email protected]>
---
 Cargo.lock                          |   1 +
 core/Cargo.toml                     |   1 +
 core/src/services/gdrive/backend.rs |   8 +--
 core/src/services/gdrive/core.rs    |  71 +++++++++++++++++++------
 core/src/services/gdrive/writer.rs  | 101 +++++++++++++++++++++++++++++-------
 5 files changed, 143 insertions(+), 39 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index ea91cdd01..ec5c2aca9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4884,6 +4884,7 @@ dependencies = [
  "js-sys",
  "log",
  "mime",
+ "mime_guess",
  "native-tls",
  "once_cell",
  "percent-encoding",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b46cf3494..313d568b0 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -249,6 +249,7 @@ redis = { version = "0.23.1", features = [
 reqsign = { version = "0.14.1", default-features = false, optional = true }
 reqwest = { version = "0.11.18", features = [
   "stream",
+  "multipart",
 ], default-features = false }
 rocksdb = { version = "0.21.0", default-features = false, optional = true }
 serde = { version = "1", features = ["derive"] }
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 9321fb3c3..464d86312 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -165,14 +165,14 @@ impl GdriveBackend {
     pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> 
Result<Metadata> {
         let metadata =
             
serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?;
+
+        println!("metadata: {:?}", metadata.size);
+
         let mut meta = Metadata::new(match metadata.mime_type.as_str() {
             "application/vnd.google-apps.folder" => EntryMode::DIR,
             _ => EntryMode::FILE,
         });
-        meta = meta.with_content_length(match metadata.size.parse::<u64>() {
-            Ok(size) => size,
-            Err(_) => 0,
-        });
+        meta = 
meta.with_content_length(metadata.size.parse::<u64>().unwrap_or(0));
         meta = meta.with_last_modified(
             metadata
                 .modified_time
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index cc9f544ec..a93461d9e 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -76,18 +76,22 @@ impl GdriveCore {
     async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
         let path = build_rooted_abs_path(&self.root, file_path);
 
+        println!("file path: {}", path);
+
         let mut parent_id = "root".to_owned();
         let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
 
         for (i, item) in file_path_items.iter().enumerate() {
             let mut query = format!(
-                "name = '{}' and parents = '{}' and trashed = false",
+                "name = \"{}\" and \"{}\" in parents and trashed = false",
                 item, parent_id
             );
             if i != file_path_items.len() - 1 {
-                query += "and mimeType = 'application/vnd.google-apps.folder'";
+                query += " and mimeType = 
'application/vnd.google-apps.folder'";
             }
 
+            println!("query: {}", query);
+
             let req = self.sign(
                 Request::get(format!(
                     "https://www.googleapis.com/drive/v3/files?q={}";,
@@ -111,10 +115,10 @@ impl GdriveCore {
 
                     println!("gdrive_file_list: {:?}", 
gdrive_file_list.files.len());
 
-                    if gdrive_file_list.files.len() == 0 {
+                    if gdrive_file_list.files.is_empty() {
                         return Err(Error::new(
                             ErrorKind::NotFound,
-                            &format!("path not found: {}", file_path),
+                            &format!("path not found: {}", item),
                         ));
                     }
 
@@ -149,7 +153,7 @@ impl GdriveCore {
 
         for (i, item) in file_path_items.iter().enumerate() {
             let query = format!(
-                "name = '{}' and '{}' in parents and trashed = false and 
mimeType = 'application/vnd.google-apps.folder'",
+                "name = \"{}\" and \"{}\" in parents and trashed = false and 
mimeType = 'application/vnd.google-apps.folder'",
                 item, parent
             );
 
@@ -295,8 +299,8 @@ impl GdriveCore {
         // For now, we only need the file id, name, mime type and modified 
time.
         let req = self.sign(
             Request::get(&format!(
-                
"https://www.googleapis.com/drive/v3/files/{}?fields=files(id,name,mimeType,modifiedTime)",
-                self.get_file_id_by_path(path_id.as_str()).await?
+                
"https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime";,
+                path_id.as_str()
             ))
                 .body(AsyncBody::Empty)
                 .map_err(new_request_build_error)?,
@@ -362,6 +366,39 @@ impl GdriveCore {
         self.client.send(req).await
     }
 
+    pub async fn gdrive_upload_simple_request(
+        &self,
+        path: &str,
+        size: u64,
+        bs: bytes::Bytes,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let parent = self.ensure_parent_path(path).await?;
+
+        let url = 
"https://www.googleapis.com/upload/drive/v3/files?uploadType=media";;
+
+        let file_name = path.split('/').filter(|&x| 
!x.is_empty()).last().unwrap();
+
+        let metadata = &json!({
+            "name": file_name,
+            "parents": [parent],
+        });
+
+        let fd = reqwest::multipart::Form::new()
+            .text("metadata", metadata.to_string())
+            .part("file", reqwest::multipart::Part::bytes(bs));
+
+        let mut req = Request::post(url)
+            .header(header::CONTENT_TYPE, "application/json; charset=UTF-8")
+            .header(header::CONTENT_LENGTH, size)
+            .header("X-Upload-Content-Length", size)
+            .body(fd)
+            .map_err(new_request_build_error)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
     /// Start an upload session.
     ///
     /// ## Notes
@@ -372,18 +409,22 @@ impl GdriveCore {
         path: &str,
         size: Option<u64>,
     ) -> Result<Response<IncomingAsyncBody>> {
+        let parent = self.ensure_parent_path(path).await?;
+
         let url = 
"https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";;
 
         let file_name = path.split('/').filter(|&x| 
!x.is_empty()).last().unwrap();
 
         let bs = serde_json::to_vec(&json!({
-            "name": file_name
-        })).map_err(
-            |e| Error::new(
+            "name": file_name,
+            "parents": [parent],
+        }))
+        .map_err(|e| {
+            Error::new(
                 ErrorKind::Unexpected,
                 &format!("failed to serialize json(upload initial request): 
{}", e),
             )
-        )?;
+        })?;
 
         let mut req = Request::post(url)
             .header(header::CONTENT_TYPE, "application/json; charset=UTF-8")
@@ -393,8 +434,8 @@ impl GdriveCore {
             req = req.header("X-Upload-Content-Length", size);
         }
 
-
-            let mut req = req.body(AsyncBody::Bytes(bytes::Bytes::from(bs)))
+        let mut req = req
+            .body(AsyncBody::Bytes(bytes::Bytes::from(bs)))
             .map_err(new_request_build_error)?;
 
         req = self.sign(req);
@@ -468,7 +509,7 @@ impl GdriveCore {
     ) -> Result<Response<IncomingAsyncBody>> {
         let mut req = Request::put(target)
             .header("Content-Length", 0)
-            .header("Content-Range", format!("bytes {}-{}/{}", size - 1, size 
- 1, size))
+            .header("Content-Range", format!("bytes */{}", size))
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
@@ -493,7 +534,7 @@ impl GdriveCore {
         self.client.send(req).await
     }
 
-    fn sign(&self, mut req: Request<AsyncBody>) -> Request<AsyncBody> {
+    fn sign<T>(&self, mut req: Request<T>) -> Request<T> {
         let auth_header_content = format!("Bearer {}", self.access_token);
         req.headers_mut().insert(
             header::AUTHORIZATION,
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index f796eecb6..958df558d 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -34,6 +34,7 @@ pub struct GdriveWriter {
 
     target: Option<String>,
     position: u64,
+    written: u64,
     size: Option<u64>,
 }
 
@@ -44,11 +45,33 @@ impl GdriveWriter {
             op,
             path,
             position: 0,
+            written: 0,
             size: None,
             target: None,
         }
     }
 
+    /// Write a single chunk of data to the object.
+    ///
+    /// This is used for small objects.
+    /// And should overwrite the object if it already exists.
+    pub async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
+        let resp = self
+            .core
+            .gdrive_upload_simple_request(&self.path, bs.len() as u64, bs)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK | StatusCode::CREATED => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     pub async fn initial_upload(&mut self) -> Result<()> {
         let resp = self
             .core
@@ -63,20 +86,23 @@ impl GdriveWriter {
 
                 match headers.get("location") {
                     Some(location) => {
-                        self.target = Some(location.to_str().map_err(|_| {
-                            Error::new(
-                                ErrorKind::Unexpected,
-                                "initial upload failed: location header parse 
failed",
-                            )
-                        })?.to_string());
+                        self.target = Some(
+                            location
+                                .to_str()
+                                .map_err(|_| {
+                                    Error::new(
+                                        ErrorKind::Unexpected,
+                                        "initial upload failed: location 
header parse failed",
+                                    )
+                                })?
+                                .to_string(),
+                        );
                         Ok(())
                     }
-                    _ => {
-                        return Err(Error::new(
-                            ErrorKind::Unexpected,
-                            "initial upload failed: location header not found",
-                        ))
-                    }
+                    _ => Err(Error::new(
+                        ErrorKind::Unexpected,
+                        "initial upload failed: location header not found",
+                    )),
                 }
             }
             _ => Err(parse_error(resp).await?),
@@ -96,7 +122,37 @@ impl GdriveWriter {
 
             match status {
                 StatusCode::PERMANENT_REDIRECT => {
-                    self.position = self.position + size;
+                    let headers = resp.headers();
+
+                    let range = headers.get("range").ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "write part failed: range header not found",
+                        )
+                    })?;
+                    self.position = range
+                        .to_str()
+                        .map_err(|_| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                "write part failed: range header parse failed",
+                            )
+                        })?
+                        .split('-')
+                        .last()
+                        .ok_or_else(|| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                "write part failed: range header parse failed",
+                            )
+                        })?
+                        .parse()
+                        .map_err(|_| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                "write part failed: range header parse failed",
+                            )
+                        })?;
 
                     resp.into_body().consume().await?;
                     Ok(())
@@ -170,19 +226,24 @@ impl GdriveWriter {
 #[async_trait]
 impl oio::Write for GdriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
+        if bs.is_empty() {
+            return Ok(());
+        }
+
         if self.target.is_none() {
-            self.initial_upload().await?;
+            if self.op.content_length().unwrap_or_default() == bs.len() as u64 
&& self.written == 0
+            {
+                return self.write_oneshot(bs).await;
+            } else {
+                self.initial_upload().await?;
+            }
         }
 
         self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        if self.target.is_none() {
-            self.initial_upload().await?;
-        }
-
-        self.write_part(size, AsyncBody::Stream(s)).await
+    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to