This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch gdrive in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 1f5f1bf9a3b15c15e79db12d4b36b166693a741f Author: suyanhanx <[email protected]> AuthorDate: Mon Aug 21 17:54:52 2023 +0800 test pass Signed-off-by: suyanhanx <[email protected]> --- core/src/services/gdrive/backend.rs | 75 +++++++++++--- core/src/services/gdrive/core.rs | 106 +++++++++----------- core/src/services/gdrive/writer.rs | 195 +++++------------------------------- 3 files changed, 137 insertions(+), 239 deletions(-) diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 03fa4a26c..be49b8ddf 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -27,6 +27,7 @@ use super::error::parse_error; use super::writer::GdriveWriter; use crate::raw::*; use crate::services::gdrive::core::GdriveFile; +use crate::services::gdrive::core::GdriveFileList; use crate::types::Result; use crate::*; @@ -59,7 +60,7 @@ impl Accessor for GdriveBackend { fn info(&self) -> AccessorInfo { let mut ma = AccessorInfo::default(); - ma.set_scheme(crate::Scheme::Gdrive) + ma.set_scheme(Scheme::Gdrive) .set_root(&self.core.root) .set_full_capability(Capability { stat: true, @@ -88,8 +89,6 @@ impl Accessor for GdriveBackend { let status = resp.status(); - println!("status out: {:?}", status); - match status { StatusCode::OK => { let meta = self.parse_metadata(resp.into_body().bytes().await?)?; @@ -100,7 +99,28 @@ impl Accessor for GdriveBackend { } async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> { - let resp = self.core.gdrive_create_dir(path).await?; + let parent = self.core.ensure_parent_path(path).await?; + + let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + // As Google Drive allows files have the same name, we need to check if the folder exists. + let resp = self.core.gdrive_search_folder(path, &parent).await?; + let status = resp.status(); + + match status { + StatusCode::OK => { + let body = resp.into_body().bytes().await?; + let meta = serde_json::from_slice::<GdriveFileList>(&body) + .map_err(new_json_deserialize_error)?; + + if !meta.files.is_empty() { + return Ok(RpCreateDir::default()); + } + } + _ => return Err(parse_error(resp).await?), + } + + let resp = self.core.gdrive_create_folder(path, Some(parent)).await?; let status = resp.status(); @@ -145,20 +165,51 @@ impl Accessor for GdriveBackend { // As Google Drive allows files have the same name, we need to check if the file exists. // If the file exists, we will keep its ID and update it. + let mut file_id: Option<String> = None; + + let resp = self.core.gdrive_stat(path).await; + // We don't care about the error here. + // As long as the file doesn't exist, we will create a new one. + if resp.is_ok() { + let resp = resp.unwrap(); + let status = resp.status(); + + if status == StatusCode::OK { + let body = resp.into_body().bytes().await?; + let meta = serde_json::from_slice::<GdriveFile>(&body) + .map_err(new_json_deserialize_error)?; + + file_id = if meta.id.is_empty() { + None + } else { + Some(meta.id) + }; + } + } + Ok(( RpWrite::default(), - GdriveWriter::new(self.core.clone(), args, String::from(path), None), + GdriveWriter::new(self.core.clone(), String::from(path), file_id), )) } async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { - let resp = self.core.gdrive_delete(path).await?; - - let status = resp.status(); + let resp = self.core.gdrive_delete(path).await; + if resp.is_err() { + let e = resp.err().unwrap(); + if e.kind() == ErrorKind::NotFound { + return Ok(RpDelete::default()); + } else { + return Err(e); + } + } else { + let resp = resp.unwrap(); + let status = resp.status(); - match status { - StatusCode::NO_CONTENT => Ok(RpDelete::default()), - _ => Err(parse_error(resp).await?), + match status { + StatusCode::NO_CONTENT => Ok(RpDelete::default()), + _ => Err(parse_error(resp).await?), + } } } } @@ -168,8 +219,6 @@ impl GdriveBackend { let metadata = serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?; - println!("metadata: {:?}", metadata.size); - let mut meta = Metadata::new(match metadata.mime_type.as_str() { "application/vnd.google-apps.folder" => EntryMode::DIR, _ => EntryMode::FILE, diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index 3b5b9ebb5..642467b9f 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -25,20 +25,12 @@ use http::header; use http::Request; use http::Response; use http::StatusCode; -use reqwest::multipart::Form; -use reqwest::multipart::Part; use serde::Deserialize; use serde_json::json; use tokio::sync::Mutex; use super::error::parse_error; -use crate::raw::build_rooted_abs_path; -use crate::raw::new_json_deserialize_error; -use crate::raw::new_request_build_error; -use crate::raw::percent_encode_path; -use crate::raw::AsyncBody; -use crate::raw::HttpClient; -use crate::raw::IncomingAsyncBody; +use crate::raw::*; use crate::types::Result; use crate::Error; use crate::ErrorKind; @@ -75,11 +67,9 @@ impl GdriveCore { /// - A path is a sequence of file names separated by slashes. /// - A file only knows its parent id, but not its name. /// - To find the file id of a file, we need to traverse the path from the root to the file. - async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> { + pub(crate) async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> { let path = build_rooted_abs_path(&self.root, file_path); - println!("file path: {}", path); - let mut parent_id = "root".to_owned(); let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); @@ -92,8 +82,6 @@ impl GdriveCore { query += " and mimeType = 'application/vnd.google-apps.folder'"; } - println!("query: {}", query); - let req = self.sign( Request::get(format!( "https://www.googleapis.com/drive/v3/files?q={}", @@ -106,8 +94,6 @@ impl GdriveCore { let resp = self.client.send(req).await?; let status = resp.status(); - println!("status in: {:?}", status); - match status { StatusCode::OK => { let resp_body = &resp.into_body().bytes().await?; @@ -115,8 +101,6 @@ impl GdriveCore { let gdrive_file_list: GdriveFileList = serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?; - println!("gdrive_file_list: {:?}", gdrive_file_list.files.len()); - if gdrive_file_list.files.is_empty() { return Err(Error::new( ErrorKind::NotFound, @@ -146,7 +130,7 @@ impl GdriveCore { /// /// - The path is rooted at the root of the Google Drive. /// - Will create the parent path recursively. - async fn ensure_parent_path(&self, path: &str) -> Result<String> { + pub(crate) async fn ensure_parent_path(&self, path: &str) -> Result<String> { let path = build_rooted_abs_path(&self.root, path); let mut parent: String = "root".to_owned(); @@ -190,7 +174,6 @@ impl GdriveCore { .map_err(new_json_deserialize_error)?; parent = parent_meta.id; - println!("parent: {}", &parent); } else { parent = gdrive_file_list.files[0].id.clone(); } @@ -209,7 +192,6 @@ impl GdriveCore { parent = String::from_utf8_lossy(&parent_id).to_string(); } _ => { - println!("404 δΊ†"); return Err(parse_error(res).await?); } } @@ -223,13 +205,13 @@ impl GdriveCore { Ok(parent.to_owned()) } - pub async fn gdrive_search( + pub async fn gdrive_search_folder( &self, target: &str, parent: &str, ) -> Result<Response<IncomingAsyncBody>> { let query = format!( - "name = '{}' and '{}' in parents and trashed = false &fields=files(id,name,mimeType)", + "name = '{}' and '{}' in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", target, parent ); let url = format!( @@ -254,11 +236,6 @@ impl GdriveCore { name: &str, parent: Option<String>, ) -> Result<Response<IncomingAsyncBody>> { - println!( - "create folder: {} at {}", - name, - parent.clone().unwrap_or("root".to_owned()) - ); let url = "https://www.googleapis.com/drive/v3/files"; let req = Request::post(url) @@ -284,16 +261,6 @@ impl GdriveCore { self.client.send(req).await } - /// Create a folder. - /// Will create the path recursively. - pub async fn gdrive_create_dir(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { - let parent = self.ensure_parent_path(path).await?; - - let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); - - self.gdrive_create_folder(path, Some(parent)).await - } - pub async fn gdrive_stat(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { let path_id = self.get_file_id_by_path(path).await?; @@ -372,7 +339,7 @@ impl GdriveCore { &self, path: &str, size: u64, - body: AsyncBody, + body: bytes::Bytes, ) -> Result<Response<IncomingAsyncBody>> { let parent = self.ensure_parent_path(path).await?; @@ -385,31 +352,54 @@ impl GdriveCore { "parents": [parent], }); - let mut fd = Form::new().text("metadata", metadata.to_string()); + let req = Request::post(url).header("X-Upload-Content-Length", size); - match body { - AsyncBody::Bytes(bs) => { - fd = fd.part("file", Part::bytes(bs.to_vec())); - } - AsyncBody::Stream(bs) => { - fd = fd.part("file", Part::stream(reqwest::Body::wrap_stream(bs))); - } - AsyncBody::Empty => { - // We do nothing here. - fd = fd.part("file", Part::bytes(Vec::new())); - } - }; + let multipart = Multipart::new() + .part( + FormDataPart::new("metadata") + .header( + header::CONTENT_TYPE, + "application/json; charset=UTF-8".parse().unwrap(), + ) + .content(metadata.to_string()), + ) + .part( + FormDataPart::new("file") + .header( + header::CONTENT_TYPE, + "application/octet-stream".parse().unwrap(), + ) + .content(body), + ); - let mut req = Request::post(url) - .header(header::CONTENT_TYPE, "application/json; charset=UTF-8") + let mut req = multipart.apply(req)?; + + req = self.sign(req); + + self.client.send(req).await + } + + pub async fn gdrive_upload_overwrite_simple_request( + &self, + file_id: &str, + size: u64, + body: bytes::Bytes, + ) -> Result<Response<IncomingAsyncBody>> { + let url = format!( + "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media", + file_id + ); + + let mut req = Request::patch(url) + .header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_LENGTH, size) .header("X-Upload-Content-Length", size) - .body(fd) + .body(AsyncBody::Bytes(body)) .map_err(new_request_build_error)?; req = self.sign(req); - self.client.send_form_data(req).await + self.client.send(req).await } /// Start an upload session. @@ -577,6 +567,6 @@ pub struct GdriveFile { // refer to https://developers.google.com/drive/api/reference/rest/v3/files/list #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct GdriveFileList { - files: Vec<GdriveFile>, +pub(crate) struct GdriveFileList { + pub(crate) files: Vec<GdriveFile>, } diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index f4762dfb7..48d88f3ec 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -24,30 +24,24 @@ use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; use crate::raw::*; +use crate::services::gdrive::core::GdriveFile; use crate::*; pub struct GdriveWriter { core: Arc<GdriveCore>, - op: OpWrite, path: String, - target: Option<String>, - position: u64, - written: u64, - size: Option<u64>, + file_id: Option<String>, } impl GdriveWriter { - pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String, file_id: Option<String>) -> Self { + pub fn new(core: Arc<GdriveCore>, path: String, file_id: Option<String>) -> Self { GdriveWriter { core, - op, path, - position: 0, - written: 0, - size: None, - target: file_id, + + file_id, } } @@ -55,7 +49,7 @@ impl GdriveWriter { /// /// This is used for small objects. /// And should overwrite the object if it already exists. - pub async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { + pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> { let resp = self .core .gdrive_upload_simple_request(&self.path, size, body) @@ -65,194 +59,59 @@ impl GdriveWriter { match status { StatusCode::OK | StatusCode::CREATED => { - resp.into_body().consume().await?; + let bs = resp.into_body().bytes().await?; + + let file = serde_json::from_slice::<GdriveFile>(&bs) + .map_err(new_json_deserialize_error)?; + + self.file_id = Some(file.id); + Ok(()) } _ => Err(parse_error(resp).await?), } } - pub async fn initial_upload(&mut self) -> Result<()> { + pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> { + let file_id = self.file_id.as_ref().ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "file_id is required for overwrite") + })?; let resp = self .core - .gdrive_upload_initial_request(&self.path, None) + .gdrive_upload_overwrite_simple_request(file_id, size, body) .await?; let status = resp.status(); match status { StatusCode::OK => { - let headers = resp.headers(); - - match headers.get("location") { - Some(location) => { - self.target = Some( - location - .to_str() - .map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "initial upload failed: location header parse failed", - ) - })? - .to_string(), - ); - Ok(()) - } - _ => Err(Error::new( - ErrorKind::Unexpected, - "initial upload failed: location header not found", - )), - } + resp.into_body().consume().await?; + Ok(()) } _ => Err(parse_error(resp).await?), } } - - pub async fn write_part(&mut self, size: u64, part: AsyncBody) -> Result<()> { - if let Some(target) = &self.target { - let resp = self - .core - .gdrive_upload_part_request(target, size, self.position, self.size, part) - .await?; - - println!("size: {}", size); - - let status = resp.status(); - - match status { - StatusCode::PERMANENT_REDIRECT => { - let headers = resp.headers(); - - let range = headers.get("range").ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "write part failed: range header not found", - ) - })?; - self.position = range - .to_str() - .map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "write part failed: range header parse failed", - ) - })? - .split('-') - .last() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "write part failed: range header parse failed", - ) - })? - .parse() - .map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "write part failed: range header parse failed", - ) - })?; - - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } else { - Err(Error::new( - ErrorKind::Unexpected, - "write part failed: upload location not found", - )) - } - } - - pub async fn finish_upload(&self) -> Result<()> { - if let Some(target) = &self.target { - println!("final position: {}", self.position + 1); - println!("final size: {}", self.size.unwrap_or(self.position + 1)); - - let resp = self - .core - .gdrive_finish_upload_request( - target, - if let Some(size) = self.size { - if (self.position + 1) < size { - return Err(Error::new( - ErrorKind::Unexpected, - "finish upload failed: upload size mismatch", - )); - } - size - } else { - self.position + 1 - }, - ) - .await?; - - let status = resp.status(); - - match status { - StatusCode::OK | StatusCode::CREATED => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } else { - Ok(()) - } - } - - pub async fn abort_upload(&self) -> Result<()> { - if let Some(target) = &self.target { - let resp = self.core.gdrive_cancel_upload_request(target).await?; - - let status = resp.status(); - - match status { - StatusCode::OK => { - resp.into_body().consume().await?; - Ok(()) - } - _ => Err(parse_error(resp).await?), - } - } else { - Ok(()) - } - } } #[async_trait] impl oio::Write for GdriveWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - if bs.is_empty() { - return Ok(()); - } - - if self.target.is_none() { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 - { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } else { - self.initial_upload().await?; - } + if self.file_id.is_none() { + self.write_create(bs.len() as u64, bs).await + } else { + self.write_overwrite(bs.len() as u64, bs).await } - - self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await } async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { - Ok(()) + Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) } async fn abort(&mut self) -> Result<()> { - self.abort_upload().await + Ok(()) } async fn close(&mut self) -> Result<()> { - self.finish_upload().await + Ok(()) } }
