This is an automated email from the ASF dual-hosted git repository.

suyanhanx pushed a commit to branch gdrive
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 8582597fb6bb300ef956af24c436dc639f11d730
Author: suyanhanx <[email protected]>
AuthorDate: Sun Aug 6 17:59:13 2023 +0800

    stash
    
    Signed-off-by: suyanhanx <[email protected]>
---
 core/src/raw/http_util/client.rs    | 84 +++++++++++++++++++++++++++++++++++++
 core/src/services/gdrive/backend.rs |  4 +-
 core/src/services/gdrive/core.rs    | 25 ++++++++---
 core/src/services/gdrive/writer.rs  | 12 +++---
 4 files changed, 113 insertions(+), 12 deletions(-)

diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 9578af9ce..1738468eb 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -23,6 +23,7 @@ use std::str::FromStr;
 use futures::TryStreamExt;
 use http::Request;
 use http::Response;
+use reqwest::multipart::Form;
 
 use super::body::IncomingAsyncBody;
 use super::parse_content_length;
@@ -156,4 +157,87 @@ impl HttpClient {
 
         Ok(resp)
     }
+
+    /// Send a request in async way.
+    ///
+    /// For a request with form data, there could be different types of data
+    /// in the form, so we need to use `Form` to represent it.
+    pub async fn send_form_data(&self, req: Request<Form>) -> 
Result<Response<IncomingAsyncBody>> {
+        // Uri stores all string alike data in `Bytes` which means
+        // the clone here is cheap.
+        let uri = req.uri().clone();
+        let is_head = req.method() == http::Method::HEAD;
+
+        let (parts, body) = req.into_parts();
+
+        let mut req_builder = self
+            .client
+            .request(
+                parts.method,
+                reqwest::Url::from_str(&uri.to_string()).expect("input request 
url must be valid"),
+            )
+            .version(parts.version)
+            .headers(parts.headers);
+
+        // Because the inner data's type will be set when we
+        // append it to the form, we just re-use the `body` here.
+        req_builder = req_builder.multipart(body);
+
+        let mut resp = req_builder.send().await.map_err(|err| {
+            let is_temporary = !(
+                // Builder related error should not be retried.
+                err.is_builder() ||
+                // Error returned by RedirectPolicy.
+                //
+                // We don't set this by hand, just don't allow retry.
+                err.is_redirect() ||
+                 // We never use `Response::error_for_status`, just don't 
allow retry.
+                //
+                // Status should be checked by our services.
+                err.is_status()
+            );
+
+            let mut oerr = Error::new(ErrorKind::Unexpected, "send async 
request")
+                .with_operation("http_util::Client::send_async")
+                .with_context("url", uri.to_string())
+                .set_source(err);
+            if is_temporary {
+                oerr = oerr.set_temporary();
+            }
+
+            oerr
+        })?;
+
+        // Get content length from header so that we can check it.
+        // If the request method is HEAD, we will ignore this.
+        let content_length = if is_head {
+            None
+        } else {
+            parse_content_length(resp.headers()).expect("response content 
length must be valid")
+        };
+
+        let mut hr = Response::builder()
+            .version(resp.version())
+            .status(resp.status())
+            // Insert uri into response extension so that we can fetch
+            // it later.
+            .extension(uri.clone());
+        // Swap headers directly instead of copy the entire map.
+        mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
+
+        let stream = resp.bytes_stream().map_err(move |err| {
+            // If stream returns a body related error, we can convert
+            // it to interrupt so we can retry it.
+            Error::new(ErrorKind::Unexpected, "read data from http stream")
+                .map(|v| if err.is_body() { v.set_temporary() } else { v })
+                .with_context("url", uri.to_string())
+                .set_source(err)
+        });
+
+        let body = IncomingAsyncBody::new(Box::new(oio::into_stream(stream)), 
content_length);
+
+        let resp = hr.body(body).expect("response must build succeed");
+
+        Ok(resp)
+    }
 }
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index 464d86312..03fa4a26c 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -143,9 +143,11 @@ impl Accessor for GdriveBackend {
             ));
         }
 
+        // As Google Drive allows files have the same name, we need to check 
if the file exists.
+        // If the file exists, we will keep its ID and update it.
         Ok((
             RpWrite::default(),
-            GdriveWriter::new(self.core.clone(), args, String::from(path)),
+            GdriveWriter::new(self.core.clone(), args, String::from(path), 
None),
         ))
     }
 
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index a93461d9e..3b5b9ebb5 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -25,6 +25,8 @@ use http::header;
 use http::Request;
 use http::Response;
 use http::StatusCode;
+use reqwest::multipart::Form;
+use reqwest::multipart::Part;
 use serde::Deserialize;
 use serde_json::json;
 use tokio::sync::Mutex;
@@ -370,11 +372,11 @@ impl GdriveCore {
         &self,
         path: &str,
         size: u64,
-        bs: bytes::Bytes,
+        body: AsyncBody,
     ) -> Result<Response<IncomingAsyncBody>> {
         let parent = self.ensure_parent_path(path).await?;
 
-        let url = 
"https://www.googleapis.com/upload/drive/v3/files?uploadType=media";;
+        let url = 
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";;
 
         let file_name = path.split('/').filter(|&x| 
!x.is_empty()).last().unwrap();
 
@@ -383,9 +385,20 @@ impl GdriveCore {
             "parents": [parent],
         });
 
-        let fd = reqwest::multipart::Form::new()
-            .text("metadata", metadata.to_string())
-            .part("file", reqwest::multipart::Part::bytes(bs));
+        let mut fd = Form::new().text("metadata", metadata.to_string());
+
+        match body {
+            AsyncBody::Bytes(bs) => {
+                fd = fd.part("file", Part::bytes(bs.to_vec()));
+            }
+            AsyncBody::Stream(bs) => {
+                fd = fd.part("file", 
Part::stream(reqwest::Body::wrap_stream(bs)));
+            }
+            AsyncBody::Empty => {
+                // We do nothing here.
+                fd = fd.part("file", Part::bytes(Vec::new()));
+            }
+        };
 
         let mut req = Request::post(url)
             .header(header::CONTENT_TYPE, "application/json; charset=UTF-8")
@@ -396,7 +409,7 @@ impl GdriveCore {
 
         req = self.sign(req);
 
-        self.client.send(req).await
+        self.client.send_form_data(req).await
     }
 
     /// Start an upload session.
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 958df558d..f4762dfb7 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -39,7 +39,7 @@ pub struct GdriveWriter {
 }
 
 impl GdriveWriter {
-    pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self {
+    pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String, file_id: 
Option<String>) -> Self {
         GdriveWriter {
             core,
             op,
@@ -47,7 +47,7 @@ impl GdriveWriter {
             position: 0,
             written: 0,
             size: None,
-            target: None,
+            target: file_id,
         }
     }
 
@@ -55,10 +55,10 @@ impl GdriveWriter {
     ///
     /// This is used for small objects.
     /// And should overwrite the object if it already exists.
-    pub async fn write_oneshot(&self, bs: Bytes) -> Result<()> {
+    pub async fn write_oneshot(&self, size: u64, body: AsyncBody) -> 
Result<()> {
         let resp = self
             .core
-            .gdrive_upload_simple_request(&self.path, bs.len() as u64, bs)
+            .gdrive_upload_simple_request(&self.path, size, body)
             .await?;
 
         let status = resp.status();
@@ -233,7 +233,9 @@ impl oio::Write for GdriveWriter {
         if self.target.is_none() {
             if self.op.content_length().unwrap_or_default() == bs.len() as u64 
&& self.written == 0
             {
-                return self.write_oneshot(bs).await;
+                return self
+                    .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
+                    .await;
             } else {
                 self.initial_upload().await?;
             }

Reply via email to