This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch gdrive
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 791226c1ac0beff5fb3638e234ff9947c9364af6
Author: suyanhanx <[email protected]>
AuthorDate: Wed Jul 26 13:09:01 2023 +0800

    stash
    
    Signed-off-by: suyanhanx <[email protected]>
---
 .env.example                        |   9 +-
 core/src/raw/path.rs                |   2 +
 core/src/services/gdrive/backend.rs |  84 ++++++-
 core/src/services/gdrive/core.rs    | 450 ++++++++++++++++++++++++++++++------
 core/src/services/gdrive/writer.rs  | 156 +++++++++++--
 5 files changed, 604 insertions(+), 97 deletions(-)

diff --git a/.env.example b/.env.example
index 62a3f9473..ef73d622b 100644
--- a/.env.example
+++ b/.env.example
@@ -148,4 +148,11 @@ OPENDAL_ETCD_USERNAME=<username>
 OPENDAL_ETCD_PASSWORD=<password>
 OPENDAL_ETCD_CA_PATH=<ca_path>
 OPENDAL_ETCD_CERT_PATH=<cert_path>
-OPENDAL_ETCD_KEY_PATH=<key_path>
\ No newline at end of file
+OPENDAL_ETCD_KEY_PATH=<key_path>
+# google drive
+OPENDAL_GDRIVE_TEST=false
+OPENDAL_GDRIVE_ROOT=/tmp/opendal/
+OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
+OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
+OPENDAL_GDRIVE_CLIENT_ID=<client_id>
+OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
diff --git a/core/src/raw/path.rs b/core/src/raw/path.rs
index cdc88d032..acb380524 100644
--- a/core/src/raw/path.rs
+++ b/core/src/raw/path.rs
@@ -49,6 +49,8 @@ pub fn build_rooted_abs_path(root: &str, path: &str) -> 
String {
 
     let p = root.to_string();
 
+    println!("path: {}", path);
+
     if path == "/" {
         p
     } else {
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 72d5740e8..9321fb3c3 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -19,16 +19,16 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use async_trait::async_trait;
+use chrono::Utc;
 use http::StatusCode;
 
 use super::core::GdriveCore;
 use super::error::parse_error;
 use super::writer::GdriveWriter;
 use crate::raw::*;
+use crate::services::gdrive::core::GdriveFile;
 use crate::types::Result;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
+use crate::*;
 
 #[derive(Clone, Debug)]
 pub struct GdriveBackend {
@@ -62,24 +62,74 @@ impl Accessor for GdriveBackend {
         ma.set_scheme(crate::Scheme::Gdrive)
             .set_root(&self.core.root)
             .set_full_capability(Capability {
+                stat: true,
+
                 read: true,
+
                 write: true,
+
+                create_dir: true,
+
                 delete: true,
+
                 ..Default::default()
             });
 
         ma
     }
 
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        // Stat root always returns a DIR.
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        let resp = self.core.gdrive_stat(path).await?;
+
+        let status = resp.status();
+
+        println!("status out: {:?}", status);
+
+        match status {
+            StatusCode::OK => {
+                let meta = 
self.parse_metadata(resp.into_body().bytes().await?)?;
+                Ok(RpStat::new(meta))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> 
Result<RpCreateDir> {
+        let resp = self.core.gdrive_create_dir(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpCreateDir::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
     async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.core.gdrive_get(path).await?;
+        // We need to request for metadata and body separately here.
+        // Request for metadata first to check if the file exists.
+        let resp = self.core.gdrive_stat(path).await?;
 
         let status = resp.status();
 
         match status {
             StatusCode::OK => {
-                let meta = parse_into_metadata(path, resp.headers())?;
-                Ok((RpRead::with_metadata(meta), resp.into_body()))
+                let body = resp.into_body().bytes().await?;
+                let meta = self.parse_metadata(body)?;
+
+                let resp = self.core.gdrive_get(path).await?;
+
+                let status = resp.status();
+
+                match status {
+                    StatusCode::OK => Ok((RpRead::with_metadata(meta), 
resp.into_body())),
+                    _ => Err(parse_error(resp).await?),
+                }
             }
             _ => Err(parse_error(resp).await?),
         }
@@ -110,3 +160,25 @@ impl Accessor for GdriveBackend {
         }
     }
 }
+
+impl GdriveBackend {
+    pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> 
Result<Metadata> {
+        let metadata =
+            
serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?;
+        let mut meta = Metadata::new(match metadata.mime_type.as_str() {
+            "application/vnd.google-apps.folder" => EntryMode::DIR,
+            _ => EntryMode::FILE,
+        });
+        meta = meta.with_content_length(match metadata.size.parse::<u64>() {
+            Ok(size) => size,
+            Err(_) => 0,
+        });
+        meta = meta.with_last_modified(
+            metadata
+                .modified_time
+                .parse::<chrono::DateTime<Utc>>()
+                .unwrap_or_default(),
+        );
+        Ok(meta)
+    }
+}
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index b54504bca..cc9f544ec 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -20,12 +20,13 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
+use bytes;
 use http::header;
-use http::request::Builder;
 use http::Request;
 use http::Response;
 use http::StatusCode;
 use serde::Deserialize;
+use serde_json::json;
 use tokio::sync::Mutex;
 
 use super::error::parse_error;
@@ -43,7 +44,13 @@ use crate::ErrorKind;
 pub struct GdriveCore {
     pub root: String,
     pub access_token: String,
+
     pub client: HttpClient,
+
+    /// Cache the mapping from path to file id
+    ///
+    /// Google Drive uses file id to identify a file.
+    /// As the path is immutable, we can cache the mapping from path to file 
id.
     pub path_cache: Arc<Mutex<HashMap<String, String>>>,
 }
 
@@ -56,49 +63,20 @@ impl Debug for GdriveCore {
 }
 
 impl GdriveCore {
-    async fn get_abs_root_id(&self) -> Result<String> {
-        let root = "root";
-
-        if let Some(root_id) = self.path_cache.lock().await.get(root) {
-            return Ok(root_id.to_string());
-        }
-
-        let req = self
-            .sign(Request::get(
-                "https://www.googleapis.com/drive/v3/files/root";,
-            ))
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        let resp = self.client.send(req).await?;
-        let status = resp.status();
-
-        match status {
-            StatusCode::OK => {
-                let resp_body = &resp.into_body().bytes().await?;
-
-                let gdrive_file: GdriveFile =
-                    
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
-
-                let root_id = gdrive_file.id;
-
-                let mut cache_guard = self.path_cache.lock().await;
-                cache_guard.insert(root.to_owned(), root_id.clone());
-
-                Ok(root_id)
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
-
+    /// Get the file id by path.
+    /// Including file and folder.
+    ///
+    /// The path is rooted at the root of the Google Drive.
+    ///
+    /// # Notes
+    ///
+    /// - A path is a sequence of file names separated by slashes.
+    /// - A file only knows its parent id, but not its name.
+    /// - To find the file id of a file, we need to traverse the path from the 
root to the file.
     async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
         let path = build_rooted_abs_path(&self.root, file_path);
 
-        if let Some(file_id) = self.path_cache.lock().await.get(&path) {
-            return Ok(file_id.to_string());
-        }
-
-        let mut parent_id = self.get_abs_root_id().await?;
+        let mut parent_id = "root".to_owned();
         let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
 
         for (i, item) in file_path_items.iter().enumerate() {
@@ -110,37 +88,221 @@ impl GdriveCore {
                 query += "and mimeType = 'application/vnd.google-apps.folder'";
             }
 
-            let req = self
-                .sign(Request::get(format!(
+            let req = self.sign(
+                Request::get(format!(
                     "https://www.googleapis.com/drive/v3/files?q={}";,
                     percent_encode_path(&query)
-                )))
+                ))
                 .body(AsyncBody::default())
-                .map_err(new_request_build_error)?;
+                .map_err(new_request_build_error)?,
+            );
 
             let resp = self.client.send(req).await?;
             let status = resp.status();
 
-            if status == StatusCode::OK {
-                let resp_body = &resp.into_body().bytes().await?;
+            println!("status in: {:?}", status);
+
+            match status {
+                StatusCode::OK => {
+                    let resp_body = &resp.into_body().bytes().await?;
+
+                    let gdrive_file_list: GdriveFileList =
+                        
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+                    println!("gdrive_file_list: {:?}", 
gdrive_file_list.files.len());
 
-                let gdrive_file_list: GdriveFileList =
-                    
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+                    if gdrive_file_list.files.len() == 0 {
+                        return Err(Error::new(
+                            ErrorKind::NotFound,
+                            &format!("path not found: {}", file_path),
+                        ));
+                    }
 
-                if gdrive_file_list.files.len() != 1 {
-                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("Please ensure that the file corresponding to the path exists and is 
unique. The response body is {}", String::from_utf8_lossy(resp_body))));
+                    if gdrive_file_list.files.len() > 1 {
+                        return Err(Error::new(ErrorKind::Unexpected, 
&format!("please ensure that the file corresponding to the path exists and is 
unique. the response body is {}", String::from_utf8_lossy(resp_body))));
+                    }
+
+                    parent_id = gdrive_file_list.files[0].id.clone();
+                }
+                _ => {
+                    return Err(parse_error(resp).await?);
                 }
+            }
+        }
+
+        Ok(parent_id)
+    }
+
+    /// Ensure the parent path exists.
+    /// If the parent path does not exist, create it.
+    ///
+    /// # Notes
+    ///
+    /// - The path is rooted at the root of the Google Drive.
+    /// - Will create the parent path recursively.
+    async fn ensure_parent_path(&self, path: &str) -> Result<String> {
+        let path = build_rooted_abs_path(&self.root, path);
 
-                parent_id = gdrive_file_list.files[0].id.clone();
-            } else {
-                return Err(parse_error(resp).await?);
+        let mut parent: String = "root".to_owned();
+        let mut file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
+        file_path_items.pop();
+
+        for (i, item) in file_path_items.iter().enumerate() {
+            let query = format!(
+                "name = '{}' and '{}' in parents and trashed = false and 
mimeType = 'application/vnd.google-apps.folder'",
+                item, parent
+            );
+
+            let req = self.sign(
+                Request::get(format!(
+                    "https://www.googleapis.com/drive/v3/files?q={}";,
+                    percent_encode_path(&query)
+                ))
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?,
+            );
+
+            let resp = self.client.send(req).await?;
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK => {
+                    let resp_body = &resp.into_body().bytes().await?;
+
+                    let gdrive_file_list: GdriveFileList =
+                        
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+                    if gdrive_file_list.files.len() != 1 {
+                        let parent_name = file_path_items[i];
+                        let resp_body = self
+                            .gdrive_create_folder(parent_name, 
Some(parent.to_owned()))
+                            .await?
+                            .into_body()
+                            .bytes()
+                            .await?;
+                        let parent_meta: GdriveFile = 
serde_json::from_slice(&resp_body)
+                            .map_err(new_json_deserialize_error)?;
+
+                        parent = parent_meta.id;
+                        println!("parent: {}", &parent);
+                    } else {
+                        parent = gdrive_file_list.files[0].id.clone();
+                    }
+                }
+                StatusCode::NOT_FOUND => {
+                    let parent_name = file_path_items[i];
+                    let res = self
+                        .gdrive_create_folder(parent_name, 
Some(parent.to_owned()))
+                        .await?;
+
+                    let status = res.status();
+
+                    match status {
+                        StatusCode::OK => {
+                            let parent_id = res.into_body().bytes().await?;
+                            parent = 
String::from_utf8_lossy(&parent_id).to_string();
+                        }
+                        _ => {
+                            println!("404 δΊ†");
+                            return Err(parse_error(res).await?);
+                        }
+                    }
+                }
+                _ => {
+                    return Err(parse_error(resp).await?);
+                }
             }
         }
 
-        let mut cache_guard = self.path_cache.lock().await;
-        cache_guard.insert(path, parent_id.clone());
+        Ok(parent.to_owned())
+    }
 
-        Ok(parent_id)
+    pub async fn gdrive_search(
+        &self,
+        target: &str,
+        parent: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let query = format!(
+            "name = '{}' and '{}' in parents and trashed = false 
&fields=files(id,name,mimeType)",
+            target, parent
+        );
+        let url = format!(
+            "https://www.googleapis.com/drive/v3/files?q={}";,
+            percent_encode_path(query.as_str())
+        );
+
+        let req = self.sign(
+            Request::get(&url)
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?,
+        );
+
+        self.client.send(req).await
+    }
+
+    /// Create a folder.
+    /// Should provide the parent folder id.
+    /// Or will create the folder in the root folder.
+    pub async fn gdrive_create_folder(
+        &self,
+        name: &str,
+        parent: Option<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        println!(
+            "create folder: {} at {}",
+            name,
+            parent.clone().unwrap_or("root".to_owned())
+        );
+        let url = "https://www.googleapis.com/drive/v3/files";;
+
+        let req = Request::post(url)
+            .header(header::CONTENT_TYPE, "application/json")
+            .body(AsyncBody::Bytes(bytes::Bytes::from(
+                serde_json::to_vec(&json!({
+                    "name": name,
+                    "mimeType": "application/vnd.google-apps.folder",
+                    // If the parent is not provided, the folder will be 
created in the root folder.
+                    "parents": [parent.unwrap_or("root".to_owned())],
+                }))
+                .map_err(|e| {
+                    Error::new(
+                        ErrorKind::Unexpected,
+                        &format!("failed to serialize json(create folder 
result): {}", e),
+                    )
+                })?,
+            )))
+            .map_err(new_request_build_error)?;
+
+        let req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    /// Create a folder.
+    /// Will create the path recursively.
+    pub async fn gdrive_create_dir(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let parent = self.ensure_parent_path(path).await?;
+
+        let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap();
+
+        self.gdrive_create_folder(path, Some(parent)).await
+    }
+
+    pub async fn gdrive_stat(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path_id = self.get_file_id_by_path(path).await?;
+
+        // The file metadata in the Google Drive API is very complex.
+        // For now, we only need the file id, name, mime type and modified 
time.
+        let req = self.sign(
+            Request::get(&format!(
+                
"https://www.googleapis.com/drive/v3/files/{}?fields=files(id,name,mimeType,modifiedTime)",
+                self.get_file_id_by_path(path_id.as_str()).await?
+            ))
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?,
+        );
+
+        self.client.send(req).await
     }
 
     pub async fn gdrive_get(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
@@ -149,10 +311,11 @@ impl GdriveCore {
             self.get_file_id_by_path(path).await?
         );
 
-        let req = self
-            .sign(Request::get(&url))
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
+        let req = self.sign(
+            Request::get(&url)
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?,
+        );
 
         self.client.send(req).await
     }
@@ -165,11 +328,11 @@ impl GdriveCore {
         body: AsyncBody,
     ) -> Result<Response<IncomingAsyncBody>> {
         let url = format!(
-            "https://www.googleapis.com/upload/drive/v3/files/{}";,
+            
"https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media";,
             self.get_file_id_by_path(path).await?
         );
 
-        let mut req = Request::patch(&url);
+        let mut req = Request::put(&url);
 
         if let Some(size) = size {
             req = req.header(header::CONTENT_LENGTH, size)
@@ -179,7 +342,7 @@ impl GdriveCore {
             req = req.header(header::CONTENT_TYPE, mime)
         }
 
-        let req = self.sign(req).body(body).map_err(new_request_build_error)?;
+        let req = self.sign(req.body(body).map_err(new_request_build_error)?);
 
         self.client.send(req).await
     }
@@ -190,29 +353,176 @@ impl GdriveCore {
             self.get_file_id_by_path(path).await?
         );
 
-        let req = self
-            .sign(Request::delete(&url))
+        let mut req = Request::delete(&url)
             .body(AsyncBody::Empty)
             .map_err(new_request_build_error)?;
 
+        req = self.sign(req);
+
         self.client.send(req).await
     }
 
-    fn sign(&self, mut req: Builder) -> Builder {
+    /// Start an upload session.
+    ///
+    /// ## Notes
+    ///
+    /// - If we know the size of the file, we should provide it here.
+    pub async fn gdrive_upload_initial_request(
+        &self,
+        path: &str,
+        size: Option<u64>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = 
"https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable";;
+
+        let file_name = path.split('/').filter(|&x| 
!x.is_empty()).last().unwrap();
+
+        let bs = serde_json::to_vec(&json!({
+            "name": file_name
+        })).map_err(
+            |e| Error::new(
+                ErrorKind::Unexpected,
+                &format!("failed to serialize json(upload initial request): 
{}", e),
+            )
+        )?;
+
+        let mut req = Request::post(url)
+            .header(header::CONTENT_TYPE, "application/json; charset=UTF-8")
+            .header(header::CONTENT_LENGTH, bs.len());
+
+        if let Some(size) = size {
+            req = req.header("X-Upload-Content-Length", size);
+        }
+
+
+            let mut req = req.body(AsyncBody::Bytes(bytes::Bytes::from(bs)))
+            .map_err(new_request_build_error)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    /// Upload a part.
+    /// It's important to know the size of the part and the position of the 
part in the whole file.
+    ///
+    /// ## Notes
+    ///
+    /// - If don't know the whole size of the file,
+    /// it will be replaced by `*` in `content-range` header automatically.
+    /// - Please keep the whole size of the file consistent during the upload.
+    pub async fn gdrive_upload_part_request(
+        &self,
+        target: &str,
+        size: u64,
+        position: u64,
+        whole_size: Option<u64>,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::put(target)
+            .header("Content-Length", size)
+            .header(
+                "Content-Range",
+                format!(
+                    "bytes {}-{}/{}",
+                    position,
+                    position + size - 1,
+                    match whole_size {
+                        Some(ws) => ws.to_string(),
+                        _ => "*".to_string(),
+                    }
+                ),
+            )
+            .body(body)
+            .map_err(new_request_build_error)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    /// Finish the upload.
+    ///
+    /// For Google Drive, to finish an upload session, we should send a 
request to the upload session
+    /// has received the whole file.
+    ///
+    /// So if we don't know the size of the file, when we have uploaded the 
whole file, we should
+    /// tell the server that the upload session has received the whole file.
+    ///
+    /// It could be done by sending a request to the upload session with the 
size of the file we have
+    /// uploaded.
+    /// So the server will know that the upload session has received the whole 
file.
+    /// We can count the size of all parts we have uploaded by ourselves.
+    ///
+    /// ## Notes
+    ///
+    /// **It's no need to call this if we know the size of the file.**
+    ///
+    /// When we know the file size,
+    /// we can indicate the total size in the Content-range header during the 
upload,
+    /// and this allows the server to know that it has received all parts,
+    /// like `Content-range: bytes 0-1023/1024`.
+    pub async fn gdrive_finish_upload_request(
+        &self,
+        target: &str,
+        size: u64,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::put(target)
+            .header("Content-Length", 0)
+            .header("Content-Range", format!("bytes {}-{}/{}", size - 1, size 
- 1, size))
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    /// Cancel the upload.
+    /// Similar to the delete operation, but the target is an upload session.
+    pub async fn gdrive_cancel_upload_request(
+        &self,
+        target: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::delete(target)
+            .header("Content-Length", 0)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        req = self.sign(req);
+
+        self.client.send(req).await
+    }
+
+    fn sign(&self, mut req: Request<AsyncBody>) -> Request<AsyncBody> {
         let auth_header_content = format!("Bearer {}", self.access_token);
-        req = req.header(header::AUTHORIZATION, auth_header_content);
+        req.headers_mut().insert(
+            header::AUTHORIZATION,
+            auth_header_content
+                .parse()
+                .expect("failed to parse auth header"),
+        );
         req
     }
 }
 
+// This is the file struct returned by the Google Drive API.
+// This is a complex struct, but we only add the fields we need.
 // refer to 
https://developers.google.com/drive/api/reference/rest/v3/files#File
 #[derive(Deserialize)]
-struct GdriveFile {
-    id: String,
+#[serde(rename_all = "camelCase")]
+pub struct GdriveFile {
+    pub mime_type: String,
+    pub id: String,
+    pub name: String,
+    #[serde(default)]
+    pub size: String,
+    #[serde(default)]
+    pub modified_time: String,
 }
 
 // refer to 
https://developers.google.com/drive/api/reference/rest/v3/files/list
 #[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
 struct GdriveFileList {
     files: Vec<GdriveFile>,
 }
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index ba8f20b8f..f796eecb6 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -28,52 +28,168 @@ use crate::*;
 
 pub struct GdriveWriter {
     core: Arc<GdriveCore>,
+
     op: OpWrite,
     path: String,
+
+    target: Option<String>,
+    position: u64,
+    size: Option<u64>,
 }
 
 impl GdriveWriter {
     pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self {
-        GdriveWriter { core, op, path }
+        GdriveWriter {
+            core,
+            op,
+            path,
+            position: 0,
+            size: None,
+            target: None,
+        }
     }
-}
 
-#[async_trait]
-impl oio::Write for GdriveWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
+    pub async fn initial_upload(&mut self) -> Result<()> {
         let resp = self
             .core
-            .gdrive_update(
-                &self.path,
-                Some(bs.len()),
-                self.op.content_type(),
-                AsyncBody::Bytes(bs),
-            )
+            .gdrive_upload_initial_request(&self.path, None)
             .await?;
 
         let status = resp.status();
 
         match status {
             StatusCode::OK => {
-                resp.into_body().consume().await?;
-                Ok(())
+                let headers = resp.headers();
+
+                match headers.get("location") {
+                    Some(location) => {
+                        self.target = Some(location.to_str().map_err(|_| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                "initial upload failed: location header parse 
failed",
+                            )
+                        })?.to_string());
+                        Ok(())
+                    }
+                    _ => {
+                        return Err(Error::new(
+                            ErrorKind::Unexpected,
+                            "initial upload failed: location header not found",
+                        ))
+                    }
+                }
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    pub async fn write_part(&mut self, size: u64, part: AsyncBody) -> 
Result<()> {
+        if let Some(target) = &self.target {
+            let resp = self
+                .core
+                .gdrive_upload_part_request(target, size, self.position, 
self.size, part)
+                .await?;
+
+            println!("size: {}", size);
+
+            let status = resp.status();
+
+            match status {
+                StatusCode::PERMANENT_REDIRECT => {
+                    self.position = self.position + size;
+
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
+            }
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                "write part failed: upload location not found",
+            ))
+        }
+    }
+
+    pub async fn finish_upload(&self) -> Result<()> {
+        if let Some(target) = &self.target {
+            println!("final position: {}", self.position + 1);
+            println!("final size: {}", self.size.unwrap_or(self.position + 1));
+
+            let resp = self
+                .core
+                .gdrive_finish_upload_request(
+                    target,
+                    if let Some(size) = self.size {
+                        if (self.position + 1) < size {
+                            return Err(Error::new(
+                                ErrorKind::Unexpected,
+                                "finish upload failed: upload size mismatch",
+                            ));
+                        }
+                        size
+                    } else {
+                        self.position + 1
+                    },
+                )
+                .await?;
+
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK | StatusCode::CREATED => {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
+            }
+        } else {
+            Ok(())
+        }
+    }
+
+    pub async fn abort_upload(&self) -> Result<()> {
+        if let Some(target) = &self.target {
+            let resp = self.core.gdrive_cancel_upload_request(target).await?;
+
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK => {
+                    resp.into_body().consume().await?;
+                    Ok(())
+                }
+                _ => Err(parse_error(resp).await?),
+            }
+        } else {
+            Ok(())
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Write for GdriveWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        if self.target.is_none() {
+            self.initial_upload().await?;
+        }
+
+        self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await
+    }
+
+    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
+        if self.target.is_none() {
+            self.initial_upload().await?;
+        }
+
+        self.write_part(size, AsyncBody::Stream(s)).await
     }
 
     async fn abort(&mut self) -> Result<()> {
-        Ok(())
+        self.abort_upload().await
     }
 
     async fn close(&mut self) -> Result<()> {
-        Ok(())
+        self.finish_upload().await
     }
 }

Reply via email to