This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch gdrive in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 7dff3c44fb7832ff98622f778483073f27ca47ae Author: suyanhanx <[email protected]> AuthorDate: Tue Aug 1 10:28:20 2023 +0800 stash Signed-off-by: suyanhanx <[email protected]> --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/services/gdrive/backend.rs | 8 +-- core/src/services/gdrive/core.rs | 71 +++++++++++++++++++------ core/src/services/gdrive/writer.rs | 101 +++++++++++++++++++++++++++++------- 5 files changed, 143 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea91cdd01..ec5c2aca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4884,6 +4884,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "native-tls", "once_cell", "percent-encoding", diff --git a/core/Cargo.toml b/core/Cargo.toml index b46cf3494..313d568b0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -249,6 +249,7 @@ redis = { version = "0.23.1", features = [ reqsign = { version = "0.14.1", default-features = false, optional = true } reqwest = { version = "0.11.18", features = [ "stream", + "multipart", ], default-features = false } rocksdb = { version = "0.21.0", default-features = false, optional = true } serde = { version = "1", features = ["derive"] } diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 9321fb3c3..464d86312 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -165,14 +165,14 @@ impl GdriveBackend { pub(crate) fn parse_metadata(&self, body: bytes::Bytes) -> Result<Metadata> { let metadata = serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?; + + println!("metadata: {:?}", metadata.size); + let mut meta = Metadata::new(match metadata.mime_type.as_str() { "application/vnd.google-apps.folder" => EntryMode::DIR, _ => EntryMode::FILE, }); - meta = meta.with_content_length(match metadata.size.parse::<u64>() { - Ok(size) => size, - Err(_) => 0, - }); + meta = meta.with_content_length(metadata.size.parse::<u64>().unwrap_or(0)); meta = meta.with_last_modified( metadata .modified_time diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index cc9f544ec..a93461d9e 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -76,18 +76,22 @@ impl GdriveCore { async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> { let path = build_rooted_abs_path(&self.root, file_path); + println!("file path: {}", path); + let mut parent_id = "root".to_owned(); let file_path_items: Vec<&str> = path.split('/').filter(|&x| !x.is_empty()).collect(); for (i, item) in file_path_items.iter().enumerate() { let mut query = format!( - "name = '{}' and parents = '{}' and trashed = false", + "name = \"{}\" and \"{}\" in parents and trashed = false", item, parent_id ); if i != file_path_items.len() - 1 { - query += "and mimeType = 'application/vnd.google-apps.folder'"; + query += " and mimeType = 'application/vnd.google-apps.folder'"; } + println!("query: {}", query); + let req = self.sign( Request::get(format!( "https://www.googleapis.com/drive/v3/files?q={}", @@ -111,10 +115,10 @@ impl GdriveCore { println!("gdrive_file_list: {:?}", gdrive_file_list.files.len()); - if gdrive_file_list.files.len() == 0 { + if gdrive_file_list.files.is_empty() { return Err(Error::new( ErrorKind::NotFound, - &format!("path not found: {}", file_path), + &format!("path not found: {}", item), )); } @@ -149,7 +153,7 @@ impl GdriveCore { for (i, item) in file_path_items.iter().enumerate() { let query = format!( - "name = '{}' and '{}' in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", + "name = \"{}\" and \"{}\" in parents and trashed = false and mimeType = 'application/vnd.google-apps.folder'", item, parent ); @@ -295,8 +299,8 @@ impl GdriveCore { // For now, we only need the file id, name, mime type and modified time. let req = self.sign( Request::get(&format!( - "https://www.googleapis.com/drive/v3/files/{}?fields=files(id,name,mimeType,modifiedTime)", - self.get_file_id_by_path(path_id.as_str()).await? + "https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime", + path_id.as_str() )) .body(AsyncBody::Empty) .map_err(new_request_build_error)?, @@ -362,6 +366,39 @@ impl GdriveCore { self.client.send(req).await } + pub async fn gdrive_upload_simple_request( + &self, + path: &str, + size: u64, + bs: bytes::Bytes, + ) -> Result<Response<IncomingAsyncBody>> { + let parent = self.ensure_parent_path(path).await?; + + let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=media"; + + let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); + + let metadata = &json!({ + "name": file_name, + "parents": [parent], + }); + + let fd = reqwest::multipart::Form::new() + .text("metadata", metadata.to_string()) + .part("file", reqwest::multipart::Part::bytes(bs)); + + let mut req = Request::post(url) + .header(header::CONTENT_TYPE, "application/json; charset=UTF-8") + .header(header::CONTENT_LENGTH, size) + .header("X-Upload-Content-Length", size) + .body(fd) + .map_err(new_request_build_error)?; + + req = self.sign(req); + + self.client.send(req).await + } + /// Start an upload session. /// /// ## Notes @@ -372,18 +409,22 @@ impl GdriveCore { path: &str, size: Option<u64>, ) -> Result<Response<IncomingAsyncBody>> { + let parent = self.ensure_parent_path(path).await?; + let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=resumable"; let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); let bs = serde_json::to_vec(&json!({ - "name": file_name - })).map_err( - |e| Error::new( + "name": file_name, + "parents": [parent], + })) + .map_err(|e| { + Error::new( ErrorKind::Unexpected, &format!("failed to serialize json(upload initial request): {}", e), ) - )?; + })?; let mut req = Request::post(url) .header(header::CONTENT_TYPE, "application/json; charset=UTF-8") @@ -393,8 +434,8 @@ impl GdriveCore { req = req.header("X-Upload-Content-Length", size); } - - let mut req = req.body(AsyncBody::Bytes(bytes::Bytes::from(bs))) + let mut req = req + .body(AsyncBody::Bytes(bytes::Bytes::from(bs))) .map_err(new_request_build_error)?; req = self.sign(req); @@ -468,7 +509,7 @@ impl GdriveCore { ) -> Result<Response<IncomingAsyncBody>> { let mut req = Request::put(target) .header("Content-Length", 0) - .header("Content-Range", format!("bytes {}-{}/{}", size - 1, size - 1, size)) + .header("Content-Range", format!("bytes */{}", size)) .body(AsyncBody::Empty) .map_err(new_request_build_error)?; @@ -493,7 +534,7 @@ impl GdriveCore { self.client.send(req).await } - fn sign(&self, mut req: Request<AsyncBody>) -> Request<AsyncBody> { + fn sign<T>(&self, mut req: Request<T>) -> Request<T> { let auth_header_content = format!("Bearer {}", self.access_token); req.headers_mut().insert( header::AUTHORIZATION, diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index f796eecb6..958df558d 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -34,6 +34,7 @@ pub struct GdriveWriter { target: Option<String>, position: u64, + written: u64, size: Option<u64>, } @@ -44,11 +45,33 @@ impl GdriveWriter { op, path, position: 0, + written: 0, size: None, target: None, } } + /// Write a single chunk of data to the object. + /// + /// This is used for small objects. + /// And should overwrite the object if it already exists. + pub async fn write_oneshot(&self, bs: Bytes) -> Result<()> { + let resp = self + .core + .gdrive_upload_simple_request(&self.path, bs.len() as u64, bs) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK | StatusCode::CREATED => { + resp.into_body().consume().await?; + Ok(()) + } + _ => Err(parse_error(resp).await?), + } + } + pub async fn initial_upload(&mut self) -> Result<()> { let resp = self .core @@ -63,20 +86,23 @@ impl GdriveWriter { match headers.get("location") { Some(location) => { - self.target = Some(location.to_str().map_err(|_| { - Error::new( - ErrorKind::Unexpected, - "initial upload failed: location header parse failed", - ) - })?.to_string()); + self.target = Some( + location + .to_str() + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "initial upload failed: location header parse failed", + ) + })? + .to_string(), + ); Ok(()) } - _ => { - return Err(Error::new( - ErrorKind::Unexpected, - "initial upload failed: location header not found", - )) - } + _ => Err(Error::new( + ErrorKind::Unexpected, + "initial upload failed: location header not found", + )), } } _ => Err(parse_error(resp).await?), @@ -96,7 +122,37 @@ impl GdriveWriter { match status { StatusCode::PERMANENT_REDIRECT => { - self.position = self.position + size; + let headers = resp.headers(); + + let range = headers.get("range").ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "write part failed: range header not found", + ) + })?; + self.position = range + .to_str() + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "write part failed: range header parse failed", + ) + })? + .split('-') + .last() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "write part failed: range header parse failed", + ) + })? + .parse() + .map_err(|_| { + Error::new( + ErrorKind::Unexpected, + "write part failed: range header parse failed", + ) + })?; resp.into_body().consume().await?; Ok(()) @@ -170,19 +226,24 @@ impl GdriveWriter { #[async_trait] impl oio::Write for GdriveWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { + if bs.is_empty() { + return Ok(()); + } + if self.target.is_none() { - self.initial_upload().await?; + if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 + { + return self.write_oneshot(bs).await; + } else { + self.initial_upload().await?; + } } self.write_part(bs.len() as u64, AsyncBody::Bytes(bs)).await } - async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> { - if self.target.is_none() { - self.initial_upload().await?; - } - - self.write_part(size, AsyncBody::Stream(s)).await + async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> { + Ok(()) } async fn abort(&mut self) -> Result<()> {
