This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch gdrive in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 791226c1ac0beff5fb3638e234ff9947c9364af6 Author: suyanhanx <[email protected]> AuthorDate: Wed Jul 26 13:09:01 2023 +0800 stash Signed-off-by: suyanhanx <[email protected]> --- .env.example | 9 +- core/src/raw/path.rs | 2 + core/src/services/gdrive/backend.rs | 84 ++++++- core/src/services/gdrive/core.rs | 450 ++++++++++++++++++++++++++++++------ core/src/services/gdrive/writer.rs | 156 +++++++++++-- 5 files changed, 604 insertions(+), 97 deletions(-) diff --git a/.env.example b/.env.example index 62a3f9473..ef73d622b 100644 --- a/.env.example +++ b/.env.example @@ -148,4 +148,11 @@ OPENDAL_ETCD_USERNAME=<username> OPENDAL_ETCD_PASSWORD=<password> OPENDAL_ETCD_CA_PATH=<ca_path> OPENDAL_ETCD_CERT_PATH=<cert_path> -OPENDAL_ETCD_KEY_PATH=<key_path> \ No newline at end of file +OPENDAL_ETCD_KEY_PATH=<key_path> +# google drive +OPENDAL_GDRIVE_TEST=false +OPENDAL_GDRIVE_ROOT=/tmp/opendal/ +OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token> +OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token> +OPENDAL_GDRIVE_CLIENT_ID=<client_id> +OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret> diff --git a/core/src/raw/path.rs b/core/src/raw/path.rs index cdc88d032..acb380524 100644 --- a/core/src/raw/path.rs +++ b/core/src/raw/path.rs @@ -49,6 +49,8 @@ pub fn build_rooted_abs_path(root: &str, path: &str) -> String { let p = root.to_string(); + println!("path: {}", path); + if path == "/" { p } else { diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 72d5740e8..9321fb3c3 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -19,16 +19,16 @@ use std::fmt::Debug; use std::sync::Arc; use async_trait::async_trait; +use chrono::Utc; use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; use super::writer::GdriveWriter; use crate::raw::*; +use crate::services::gdrive::core::GdriveFile; use crate::types::Result; -use crate::Capability; -use crate::Error; -use crate::ErrorKind; +use crate::*; #[derive(Clone, Debug)] pub struct GdriveBackend { @@ -62,24 +62,74 @@ impl Accessor for GdriveBackend { ma.set_scheme(crate::Scheme::Gdrive) .set_root(&self.core.root) .set_full_capability(Capability { + stat: true, + read: true, + write: true, + + create_dir: true, + delete: true, + ..Default::default() }); ma } + async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> { + // Stat root always returns a DIR. + if path == "/" { + return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); + } + + let resp = self.core.gdrive_stat(path).await?; + + let status = resp.status(); + + println!("status out: {:?}", status); + + match status { + StatusCode::OK => { + let meta = self.parse_metadata(resp.into_body().bytes().await?)?; + Ok(RpStat::new(meta)) + } + _ => Err(parse_error(resp).await?), + } + } + + async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> { + let resp = self.core.gdrive_create_dir(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(RpCreateDir::default()), + _ => Err(parse_error(resp).await?), + } + } + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.core.gdrive_get(path).await?; + // We need to request for metadata and body separately here. + // Request for metadata first to check if the file exists. + let resp = self.core.gdrive_stat(path).await?; let status = resp.status(); match status { StatusCode::OK => { - let meta = parse_into_metadata(path, resp.headers())?; - Ok((RpRead::with_metadata(meta), resp.into_body())) + let body = resp.into_body().bytes().await?; + let meta = self.parse_metadata(body)?; + + let resp = self.core.gdrive_get(path).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok((RpRead::with_metadata(meta), resp.into_body())), + _ => Err(parse_error(resp).await?), + } } _ => Err(parse_error(resp).await?), } @@ -110,3 +160,25 @@ impl Accessor for GdriveBackend { } } } + +impl GdriveBackend { + pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> Result<Metadata> { + let metadata = + serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?; + let mut meta = Metadata::new(match metadata.mime_type.as_str() { + "application/vnd.google-apps.folder" => EntryMode::DIR, + _ => EntryMode::FILE, + }); + meta = meta.with_content_length(match metadata.size.parse::<u64>() { + Ok(size) => size, + Err(_) => 0, + }); + meta = meta.with_last_modified( + metadata + .modified_time + .parse::<chrono::DateTime<Utc>>() + .unwrap_or_default(), + ); + Ok(meta) + } +} diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index b54504bca..cc9f544ec 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -20,12 +20,13 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use bytes; use http::header; -use http::request::Builder; use http::Request; use http::Response; use http::StatusCode; use serde::Deserialize; +use serde_json::json; use tokio::sync::Mutex; use super::error::parse_error; @@ -43,7 +44,13 @@ use crate::ErrorKind; pub struct GdriveCore { pub root: String, pub access_token: String, + pub client: HttpClient, + + /// Cache the mapping from path to file id + /// + /// Google Drive uses file id to identify a file. + /// As the path is immutable, we can cache the mapping from path to file id. pub path_cache: Arc<Mutex<HashMap<String, String>>>, } @@ -56,49 +63,20 @@ impl Debug for GdriveCore { } impl GdriveCore { - async fn get_abs_root_id(&self) -> Result<String> { - let root = "root"; - - if let Some(root_id) = self.path_cache.lock().await.get(root) { - return Ok(root_id.to_string()); - } - - let req = self - .sign(Request::get( - "https://www.googleapis.com/drive/v3/files/root", - )) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - let resp = self.client.send(req).await?; - let status = resp.status(); - - match status { - StatusCode::OK => { - let resp_body = &resp.into_body().bytes().await?; - - let gdrive_file: GdriveFile = - serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; - - let root_id = gdrive_file.id; - - let mut cache_guard = self.path_cache.lock().await; - cache_guard.insert(root.to_owned(), root_id.clone()); - - Ok(root_id) - } - _ => Err(parse_error(resp).await?), - } - } - + /// Get the file id by path. + /// Including file and folder. + /// + /// The path is rooted at the root of the Google Drive. + /// + /// # Notes + /// + /// - A path is a sequence of file names separated by slashes. + /// - A file only knows its parent id, but not its name. + /// - To find the file id of a file, we need to traverse the path from the root to the file. async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> { let path = build_rooted_abs_path(&self.root, file_path); - if let Some(file_id) = self.path_cache.lock().await.get(&path) { - return Ok(file_id.to_string()); - } - - let mut parent_id = self.get_abs_root_id().await?; + let mut parent_id = "root".to_owned(); let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); for (i, item) in file_path_items.iter().enumerate() { @@ -110,37 +88,221 @@ impl GdriveCore { query += "and mimeType = 'application/vnd.google-apps.folder'"; } - let req = self - .sign(Request::get(format!( + let req = self.sign( + Request::get(format!( "https://www.googleapis.com/drive/v3/files?q={}", percent_encode_path(&query) - ))) + )) .body(AsyncBody::default()) - .map_err(new_request_build_error)?; + .map_err(new_request_build_error)?, + ); let resp = self.client.send(req).await?; let status = resp.status(); - if status == StatusCode::OK { - let resp_body = &resp.into_body().bytes().await?; + println!("status in: {:?}", status); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + + let gdrive_file_list: GdriveFileList = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + + println!("gdrive_file_list: {:?}", gdrive_file_list.files.len()); - let gdrive_file_list: GdriveFileList = - serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + if gdrive_file_list.files.len() == 0 { + return Err(Error::new( + ErrorKind::NotFound, + &format!("path not found: {}", file_path), + )); + } - if gdrive_file_list.files.len() != 1 { - return Err(Error::new(ErrorKind::Unexpected, &format!("Please ensure that the file corresponding to the path exists and is unique. The response body is {}", String::from_utf8_lossy(resp_body)))); + if gdrive_file_list.files.len() > 1 { + return Err(Error::new(ErrorKind::Unexpected, &format!("please ensure that the file corresponding to the path exists and is unique. the response body is {}", String::from_utf8_lossy(resp_body)))); + } + + parent_id = gdrive_file_list.files[0].id.clone(); + } + _ => { + return Err(parse_error(resp).await?); } + } + } + + Ok(parent_id) + } + + /// Ensure the parent path exists. + /// If the parent path does not exist, create it. + /// + /// # Notes + /// + /// - The path is rooted at the root of the Google Drive. + /// - Will create the parent path recursively. + async fn ensure_parent_path(&self, path: &str) -> Result<String> { + let path = build_rooted_abs_path(&self.root, path); - parent_id = gdrive_file_list.files[0].id.clone(); - } else { - return Err(parse_error(resp).await?); + let mut parent: String = "root".to_owned(); + let mut file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); + file_path_items.pop(); + + for (i, item) in file_path_items.iter().enumerate() { + let query = format!( + "name = '{}' and '{}' in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", + item, parent + ); + + let req = self.sign( + Request::get(format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(&query) + )) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?, + ); + + let resp = self.client.send(req).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let resp_body = &resp.into_body().bytes().await?; + + let gdrive_file_list: GdriveFileList = + serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; + + if gdrive_file_list.files.len() != 1 { + let parent_name = file_path_items[i]; + let resp_body = self + .gdrive_create_folder(parent_name, Some(parent.to_owned())) + .await? + .into_body() + .bytes() + .await?; + let parent_meta: GdriveFile = serde_json::from_slice(&resp_body) + .map_err(new_json_deserialize_error)?; + + parent = parent_meta.id; + println!("parent: {}", &parent); + } else { + parent = gdrive_file_list.files[0].id.clone(); + } + } + StatusCode::NOT_FOUND => { + let parent_name = file_path_items[i]; + let res = self + .gdrive_create_folder(parent_name, Some(parent.to_owned())) + .await?; + + let status = res.status(); + + match status { + StatusCode::OK => { + let parent_id = res.into_body().bytes().await?; + parent = String::from_utf8_lossy(&parent_id).to_string(); + } + _ => { + println!("404 δΊ†"); + return Err(parse_error(res).await?); + } + } + } + _ => { + return Err(parse_error(resp).await?); + } } } - let mut cache_guard = self.path_cache.lock().await; - cache_guard.insert(path, parent_id.clone()); + Ok(parent.to_owned()) + } - Ok(parent_id) + pub async fn gdrive_search( + &self, + target: &str, + parent: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let query = format!( + "name = '{}' and '{}' in parents and trashed = false &fields=files(id,name,mimeType)", + target, parent + ); + let url = format!( + "https://www.googleapis.com/drive/v3/files?q={}", + percent_encode_path(query.as_str()) + ); + + let req = self.sign( + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?, + ); + + self.client.send(req).await + } + + /// Create a folder. + /// Should provide the parent folder id. + /// Or will create the folder in the root folder. + pub async fn gdrive_create_folder( + &self, + name: &str, + parent: Option<String>, + ) -> Result<Response<IncomingAsyncBody>> { + println!( + "create folder: {} at {}", + name, + parent.clone().unwrap_or("root".to_owned()) + ); + let url = "https://www.googleapis.com/drive/v3/files"; + + let req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json") + .body(AsyncBody::Bytes(bytes::Bytes::from( + serde_json::to_vec(&json!({ + "name": name, + "mimeType": "application/vnd.google-apps.folder", + // If the parent is not provided, the folder will be created in the root folder. + "parents": [parent.unwrap_or("root".to_owned())], + })) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + &format!("failed to serialize json(create folder result): {}", e), + ) + })?, + ))) + .map_err(new_request_build_error)?; + + let req = self.sign(req); + + self.client.send(req).await + } + + /// Create a folder. + /// Will create the path recursively. + pub async fn gdrive_create_dir(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let parent = self.ensure_parent_path(path).await?; + + let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + self.gdrive_create_folder(path, Some(parent)).await + } + + pub async fn gdrive_stat(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let path_id = self.get_file_id_by_path(path).await?; + + // The file metadata in the Google Drive API is very complex. + // For now, we only need the file id, name, mime type and modified time. + let req = self.sign( + Request::get(&format!( + "https://www.googleapis.com/drive/v3/files/{}?fields=files(id,name,mimeType,modifiedTime)", + self.get_file_id_by_path(path_id.as_str()).await? + )) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?, + ); + + self.client.send(req).await } pub async fn gdrive_get(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { @@ -149,10 +311,11 @@ impl GdriveCore { self.get_file_id_by_path(path).await? ); - let req = self - .sign(Request::get(&url)) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; + let req = self.sign( + Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?, + ); self.client.send(req).await } @@ -165,11 +328,11 @@ impl GdriveCore { body: AsyncBody, ) -> Result<Response<IncomingAsyncBody>> { let url = format!( - "https://www.googleapis.com/upload/drive/v3/files/{}", + "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media", self.get_file_id_by_path(path).await? ); - let mut req = Request::patch(&url); + let mut req = Request::put(&url); if let Some(size) = size { req = req.header(header::CONTENT_LENGTH, size) @@ -179,7 +342,7 @@ impl GdriveCore { req = req.header(header::CONTENT_TYPE, mime) } - let req = self.sign(req).body(body).map_err(new_request_build_error)?; + let req = self.sign(req.body(body).map_err(new_request_build_error)?); self.client.send(req).await } @@ -190,29 +353,176 @@ impl GdriveCore { self.get_file_id_by_path(path).await? ); - let req = self - .sign(Request::delete(&url)) + let mut req = Request::delete(&url) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; + req = self.sign(req); + self.client.send(req).await } - fn sign(&self, mut req: Builder) -> Builder { + /// Start an upload session. + /// + /// ## Notes + /// + /// - If we know the size of the file, we should provide it here. + pub async fn gdrive_upload_initial_request( + &self, + path: &str, + size: Option<u64>, + ) -> Result<Response<IncomingAsyncBody>> { + let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable"; + + let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + let bs = serde_json::to_vec(&json!({ + "name": file_name + })).map_err( + |e| Error::new( + ErrorKind::Unexpected, + &format!("failed to serialize json(upload initial request): {}", e), + ) + )?; + + let mut req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json; charset=UTF-8") + .header(header::CONTENT_LENGTH, bs.len()); + + if let Some(size) = size { + req = req.header("X-Upload-Content-Length", size); + } + + + let mut req = req.body(AsyncBody::Bytes(bytes::Bytes::from(bs))) + .map_err(new_request_build_error)?; + + req = self.sign(req); + + self.client.send(req).await + } + + /// Upload a part. + /// It's important to know the size of the part and the position of the part in the whole file. + /// + /// ## Notes + /// + /// - If don't know the whole size of the file, + /// it will be replaced by `*` in `content-range` header automatically. + /// - Please keep the whole size of the file consistent during the upload. + pub async fn gdrive_upload_part_request( + &self, + target: &str, + size: u64, + position: u64, + whole_size: Option<u64>, + body: AsyncBody, + ) -> Result<Response<IncomingAsyncBody>> { + let mut req = Request::put(target) + .header("Content-Length", size) + .header( + "Content-Range", + format!( + "bytes {}-{}/{}", + position, + position + size - 1, + match whole_size { + Some(ws) => ws.to_string(), + _ => "*".to_string(), + } + ), + ) + .body(body) + .map_err(new_request_build_error)?; + + req = self.sign(req); + + self.client.send(req).await + } + + /// Finish the upload. + /// + /// For Google Drive, to finish an upload session, we should send a request to the upload session + /// has received the whole file. + /// + /// So if we don't know the size of the file, when we have uploaded the whole file, we should + /// tell the server that the upload session has received the whole file. + /// + /// It could be done by sending a request to the upload session with the size of the file we have + /// uploaded. + /// So the server will know that the upload session has received the whole file. + /// We can count the size of all parts we have uploaded by ourselves. + /// + /// ## Notes + /// + /// **It's no need to call this if we know the size of the file.** + /// + /// When we know the file size, + /// we can indicate the total size in the Content-range header during the upload, + /// and this allows the server to know that it has received all parts, + /// like `Content-range: bytes 0-1023/1024`. + pub async fn gdrive_finish_upload_request( + &self, + target: &str, + size: u64, + ) -> Result<Response<IncomingAsyncBody>> { + let mut req = Request::put(target) + .header("Content-Length", 0) + .header("Content-Range", format!("bytes {}-{}/{}", size - 1, size - 1, size)) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + req = self.sign(req); + + self.client.send(req).await + } + + /// Cancel the upload. + /// Similar to the delete operation, but the target is an upload session. + pub async fn gdrive_cancel_upload_request( + &self, + target: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let mut req = Request::delete(target) + .header("Content-Length", 0) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + req = self.sign(req); + + self.client.send(req).await + } + + fn sign(&self, mut req: Request<AsyncBody>) -> Request<AsyncBody> { let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); + req.headers_mut().insert( + header::AUTHORIZATION, + auth_header_content + .parse() + .expect("failed to parse auth header"), + ); req } } +// This is the file struct returned by the Google Drive API. +// This is a complex struct, but we only add the fields we need. // refer to https://developers.google.com/drive/api/reference/rest/v3/files#File #[derive(Deserialize)] -struct GdriveFile { - id: String, +#[serde(rename_all = "camelCase")] +pub struct GdriveFile { + pub mime_type: String, + pub id: String, + pub name: String, + #[serde(default)] + pub size: String, + #[serde(default)] + pub modified_time: String, } // refer to https://developers.google.com/drive/api/reference/rest/v3/files/list #[derive(Deserialize)] +#[serde(rename_all = "camelCase")] struct GdriveFileList { files: Vec<GdriveFile>, } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index ba8f20b8f..f796eecb6 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -28,52 +28,168 @@ use crate::*; pub struct GdriveWriter { core: Arc<GdriveCore>, + op: OpWrite, path: String, + + target: Option<String>, + position: u64, + size: Option<u64>, } impl GdriveWriter { pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self { - GdriveWriter { core, op, path } + GdriveWriter { + core, + op, + path, + position: 0, + size: None, + target: None, + } } -} -#[async_trait] -impl oio::Write for GdriveWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { + pub async fn initial_upload(&mut self) -> Result<()> { let resp = self .core - .gdrive_update( - &self.path, - Some(bs.len()), - self.op.content_type(), - AsyncBody::Bytes(bs), - ) + .gdrive_upload_initial_request(&self.path, None) .await?; let status = resp.status(); match status { StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) + let headers = resp.headers(); + + match headers.get("location") { + Some(location) => { + self.target = Some(location.to_str().map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "initial upload failed: location header parse failed", + ) + })?.to_string()); + Ok(()) + } + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + "initial upload failed: location header not found", + )) + } + } } _ => Err(parse_error(resp).await?), } } - async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + pub async fn write_part(&mut self, size: u64, part: AsyncBody) -> Result<()> { + if let Some(target) = &self.target { + let resp = self + .core + .gdrive_upload_part_request(target, size, self.position, self.size, part) + .await?; + + println!("size: {}", size); + + let status = resp.status(); + + match status { + StatusCode::PERMANENT_REDIRECT => { + self.position = self.position + size; + + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } else { + Err(Error::new( + ErrorKind::Unexpected, + "write part failed: upload location not found", + )) + } + } + + pub async fn finish_upload(&self) -> Result<()> { + if let Some(target) = &self.target { + println!("final position: {}", self.position + 1); + println!("final size: {}", self.size.unwrap_or(self.position + 1)); + + let resp = self + .core + .gdrive_finish_upload_request( + target, + if let Some(size) = self.size { + if (self.position + 1) < size { + return Err(Error::new( + ErrorKind::Unexpected, + "finish upload failed: upload size mismatch", + )); + } + size + } else { + self.position + 1 + }, + ) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } else { + Ok(()) + } + } + + pub async fn abort_upload(&self) -> Result<()> { + if let Some(target) = &self.target { + let resp = self.core.gdrive_cancel_upload_request(target).await?; + + let status = resp.status(); + + match status { + StatusCode::OK => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } else { + Ok(()) + } + } +} + +#[async_trait] +impl oio::Write for GdriveWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + if self.target.is_none() { + self.initial_upload().await?; + } + + self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await + } + + async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { + if self.target.is_none() { + self.initial_upload().await?; + } + + self.write_part(size, AsyncBody::Stream(s)).await } async fn abort(&mut self) -> Result<()> { - Ok(()) + self.abort_upload().await } async fn close(&mut self) -> Result<()> { - Ok(()) + self.finish_upload().await } }
