This is an automated email from the ASF dual-hosted git repository. suyanhanx pushed a commit to branch gdrive in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 8582597fb6bb300ef956af24c436dc639f11d730 Author: suyanhanx <[email protected]> AuthorDate: Sun Aug 6 17:59:13 2023 +0800 stash Signed-off-by: suyanhanx <[email protected]> --- core/src/raw/http_util/client.rs | 84 +++++++++++++++++++++++++++++++++++++ core/src/services/gdrive/backend.rs | 4 +- core/src/services/gdrive/core.rs | 25 ++++++++--- core/src/services/gdrive/writer.rs | 12 +++--- 4 files changed, 113 insertions(+), 12 deletions(-) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 9578af9ce..1738468eb 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use futures::TryStreamExt; use http::Request; use http::Response; +use reqwest::multipart::Form; use super::body::IncomingAsyncBody; use super::parse_content_length; @@ -156,4 +157,87 @@ impl HttpClient { Ok(resp) } + + /// Send a request in async way. + /// + /// For a request with form data, there could be different types of data + /// in the form, so we need to use `Form` to represent it. + pub async fn send_form_data(&self, req: Request<Form>) -> Result<Response<IncomingAsyncBody>> { + // Uri stores all string alike data in `Bytes` which means + // the clone here is cheap. + let uri = req.uri().clone(); + let is_head = req.method() == http::Method::HEAD; + + let (parts, body) = req.into_parts(); + + let mut req_builder = self + .client + .request( + parts.method, + reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), + ) + .version(parts.version) + .headers(parts.headers); + + // Because the inner data's type will be set when we + // append it to the form, we just re-use the `body` here. + req_builder = req_builder.multipart(body); + + let mut resp = req_builder.send().await.map_err(|err| { + let is_temporary = !( + // Builder related error should not be retried. + err.is_builder() || + // Error returned by RedirectPolicy. + // + // We don't set this by hand, just don't allow retry. + err.is_redirect() || + // We never use `Response::error_for_status`, just don't allow retry. + // + // Status should be checked by our services. + err.is_status() + ); + + let mut oerr = Error::new(ErrorKind::Unexpected, "send async request") + .with_operation("http_util::Client::send_async") + .with_context("url", uri.to_string()) + .set_source(err); + if is_temporary { + oerr = oerr.set_temporary(); + } + + oerr + })?; + + // Get content length from header so that we can check it. + // If the request method is HEAD, we will ignore this. + let content_length = if is_head { + None + } else { + parse_content_length(resp.headers()).expect("response content length must be valid") + }; + + let mut hr = Response::builder() + .version(resp.version()) + .status(resp.status()) + // Insert uri into response extension so that we can fetch + // it later. + .extension(uri.clone()); + // Swap headers directly instead of copy the entire map. + mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); + + let stream = resp.bytes_stream().map_err(move |err| { + // If stream returns a body related error, we can convert + // it to interrupt so we can retry it. + Error::new(ErrorKind::Unexpected, "read data from http stream") + .map(|v| if err.is_body() { v.set_temporary() } else { v }) + .with_context("url", uri.to_string()) + .set_source(err) + }); + + let body = IncomingAsyncBody::new(Box::new(oio::into_stream(stream)), content_length); + + let resp = hr.body(body).expect("response must build succeed"); + + Ok(resp) + } } diff --git a/core/src/services/gdrive/backend.rs b/core/src/services/gdrive/backend.rs index 464d86312..03fa4a26c 100644 --- a/core/src/services/gdrive/backend.rs +++ b/core/src/services/gdrive/backend.rs @@ -143,9 +143,11 @@ impl Accessor for GdriveBackend { )); } + // As Google Drive allows files have the same name, we need to check if the file exists. + // If the file exists, we will keep its ID and update it. Ok(( RpWrite::default(), - GdriveWriter::new(self.core.clone(), args, String::from(path)), + GdriveWriter::new(self.core.clone(), args, String::from(path), None), )) } diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index a93461d9e..3b5b9ebb5 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -25,6 +25,8 @@ use http::header; use http::Request; use http::Response; use http::StatusCode; +use reqwest::multipart::Form; +use reqwest::multipart::Part; use serde::Deserialize; use serde_json::json; use tokio::sync::Mutex; @@ -370,11 +372,11 @@ impl GdriveCore { &self, path: &str, size: u64, - bs: bytes::Bytes, + body: AsyncBody, ) -> Result<Response<IncomingAsyncBody>> { let parent = self.ensure_parent_path(path).await?; - let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=media"; + let url = "https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart"; let file_name = path.split('/').filter(|&x| !x.is_empty()).last().unwrap(); @@ -383,9 +385,20 @@ impl GdriveCore { "parents": [parent], }); - let fd = reqwest::multipart::Form::new() - .text("metadata", metadata.to_string()) - .part("file", reqwest::multipart::Part::bytes(bs)); + let mut fd = Form::new().text("metadata", metadata.to_string()); + + match body { + AsyncBody::Bytes(bs) => { + fd = fd.part("file", Part::bytes(bs.to_vec())); + } + AsyncBody::Stream(bs) => { + fd = fd.part("file", Part::stream(reqwest::Body::wrap_stream(bs))); + } + AsyncBody::Empty => { + // We do nothing here. + fd = fd.part("file", Part::bytes(Vec::new())); + } + }; let mut req = Request::post(url) .header(header::CONTENT_TYPE, "application/json; charset=UTF-8") @@ -396,7 +409,7 @@ impl GdriveCore { req = self.sign(req); - self.client.send(req).await + self.client.send_form_data(req).await } /// Start an upload session. diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 958df558d..f4762dfb7 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -39,7 +39,7 @@ pub struct GdriveWriter { } impl GdriveWriter { - pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self { + pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String, file_id: Option<String>) -> Self { GdriveWriter { core, op, @@ -47,7 +47,7 @@ impl GdriveWriter { position: 0, written: 0, size: None, - target: None, + target: file_id, } } @@ -55,10 +55,10 @@ impl GdriveWriter { /// /// This is used for small objects. /// And should overwrite the object if it already exists. - pub async fn write_oneshot(&self, bs: Bytes) -> Result<()> { + pub async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> { let resp = self .core - .gdrive_upload_simple_request(&self.path, bs.len() as u64, bs) + .gdrive_upload_simple_request(&self.path, size, body) .await?; let status = resp.status(); @@ -233,7 +233,9 @@ impl oio::Write for GdriveWriter { if self.target.is_none() { if self.op.content_length().unwrap_or_default() == bs.len() as u64 && self.written == 0 { - return self.write_oneshot(bs).await; + return self + .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + .await; } else { self.initial_upload().await?; }
