This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit faa619f24229b8f40c0c76be63d9989b73f74a47
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:38:03 2023 +0800

    Migrate gdrive
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/http_util/multipart.rs | 45 +++++++++----------------------------
 core/src/services/gcs/core.rs       |  2 +-
 core/src/services/gdrive/core.rs    |  8 +++----
 core/src/services/gdrive/writer.rs  | 23 +++++++++----------
 4 files changed, 26 insertions(+), 52 deletions(-)

diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index 309ae3daa..aa4232f93 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -246,7 +246,6 @@ pub trait Part: Sized + 'static {
 pub struct FormDataPart {
     headers: HeaderMap,
 
-    content_length: u64,
     content: Streamer,
 }
 
@@ -266,7 +265,6 @@ impl FormDataPart {
 
         Self {
             headers,
-            content_length: 0,
             content: Box::new(oio::Cursor::new()),
         }
     }
@@ -281,14 +279,12 @@ impl FormDataPart {
     pub fn content(mut self, content: impl Into<Bytes>) -> Self {
         let content = content.into();
 
-        self.content_length = content.len() as u64;
         self.content = Box::new(oio::Cursor::from(content));
         self
     }
 
     /// Set the stream content for this part.
-    pub fn stream(mut self, size: u64, content: Streamer) -> Self {
-        self.content_length = size;
+    pub fn stream(mut self, content: Streamer) -> Self {
         self.content = content;
         self
     }
@@ -312,7 +308,7 @@ impl Part for FormDataPart {
         let bs = bs.freeze();
 
         // pre-content + content + post-content (b`\r\n`)
-        let total_size = bs.len() as u64 + self.content_length + 2;
+        let total_size = bs.len() as u64 + self.content.size() + 2;
 
         FormDataPartStream {
             size: total_size,
@@ -383,7 +379,6 @@ pub struct MixedPart {
     /// Common
     version: Version,
     headers: HeaderMap,
-    content_length: u64,
     content: Option<Streamer>,
 
     /// Request only
@@ -408,7 +403,6 @@ impl MixedPart {
 
             version: Version::HTTP_11,
             headers: HeaderMap::new(),
-            content_length: 0,
             content: None,
 
             uri: Some(uri),
@@ -426,21 +420,10 @@ impl MixedPart {
 
         let (parts, body) = req.into_parts();
 
-        let (content_length, content) = match body {
-            AsyncBody::Empty => (0, None),
-            AsyncBody::Bytes(bs) => (
-                bs.len() as u64,
-                Some(Box::new(oio::Cursor::from(bs)) as Streamer),
-            ),
-            AsyncBody::Stream(stream) => {
-                let len = parts
-                    .headers
-                    .get(CONTENT_LENGTH)
-                    .and_then(|v| v.to_str().ok())
-                    .and_then(|v| v.parse::<u64>().ok())
-                    .expect("the content length of a mixed part must be 
valid");
-                (len, Some(stream))
-            }
+        let content = match body {
+            AsyncBody::Empty => None,
+            AsyncBody::Bytes(bs) => Some(Box::new(oio::Cursor::from(bs)) as 
Streamer),
+            AsyncBody::Stream(stream) => Some(stream),
         };
 
         Self {
@@ -457,7 +440,6 @@ impl MixedPart {
             ),
             version: parts.version,
             headers: parts.headers,
-            content_length,
             content,
 
             method: Some(parts.method),
@@ -475,7 +457,8 @@ impl MixedPart {
         mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
 
         let body = if let Some(stream) = self.content {
-            IncomingAsyncBody::new(stream, Some(self.content_length))
+            let size = stream.size();
+            IncomingAsyncBody::new(stream, Some(size))
         } else {
             IncomingAsyncBody::new(Box::new(oio::into_stream(0, 
stream::empty())), Some(0))
         };
@@ -513,14 +496,12 @@ impl MixedPart {
     pub fn content(mut self, content: impl Into<Bytes>) -> Self {
         let content = content.into();
 
-        self.content_length = content.len() as u64;
         self.content = Some(Box::new(oio::Cursor::from(content)));
         self
     }
 
     /// Set the stream content for this part.
-    pub fn stream(mut self, size: u64, content: Streamer) -> Self {
-        self.content_length = size;
+    pub fn stream(mut self, content: Streamer) -> Self {
         self.content = Some(content);
         self
     }
@@ -594,8 +575,8 @@ impl Part for MixedPart {
         // pre-content + content + post-content;
         let mut total_size = bs.len() as u64;
 
-        if self.content.is_some() {
-            total_size += self.content_length + 2;
+        if let Some(stream) = &self.content {
+            total_size += stream.size() + 2;
         }
 
         MixedPartStream {
@@ -676,7 +657,6 @@ impl Part for MixedPart {
             part_headers,
             version: Version::HTTP_11,
             headers,
-            content_length: body_bytes.len() as u64,
             content: Some(Box::new(oio::Cursor::from(body_bytes))),
 
             method: None,
@@ -1171,7 +1151,6 @@ Content-Length: 846
 
             h
         });
-        assert_eq!(multipart.parts[0].content_length, part0_bs.len() as u64);
         assert_eq!(multipart.parts[0].uri, None);
         assert_eq!(multipart.parts[0].method, None);
         assert_eq!(
@@ -1211,7 +1190,6 @@ Content-Length: 846
 
             h
         });
-        assert_eq!(multipart.parts[1].content_length, part1_bs.len() as u64);
         assert_eq!(multipart.parts[1].uri, None);
         assert_eq!(multipart.parts[1].method, None);
         assert_eq!(
@@ -1251,7 +1229,6 @@ Content-Length: 846
 
             h
         });
-        assert_eq!(multipart.parts[2].content_length, part2_bs.len() as u64);
         assert_eq!(multipart.parts[2].uri, None);
         assert_eq!(multipart.parts[2].method, None);
         assert_eq!(
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
index 9a5a35c48..c7d0551be 100644
--- a/core/src/services/gcs/core.rs
+++ b/core/src/services/gcs/core.rs
@@ -282,7 +282,7 @@ impl GcsCore {
                     media_part = media_part.content(bytes);
                 }
                 AsyncBody::Stream(stream) => {
-                    media_part = media_part.stream(size.unwrap(), stream);
+                    media_part = media_part.stream(stream);
                 }
             }
 
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 24ff1cf89..2551349b4 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -366,7 +366,7 @@ impl GdriveCore {
         &self,
         path: &str,
         size: u64,
-        body: bytes::Bytes,
+        stream: oio::Streamer,
     ) -> Result<Response<IncomingAsyncBody>> {
         let parent = self.ensure_parent_path(path).await?;
 
@@ -396,7 +396,7 @@ impl GdriveCore {
                         header::CONTENT_TYPE,
                         "application/octet-stream".parse().unwrap(),
                     )
-                    .content(body),
+                    .stream(stream),
             );
 
         let mut req = multipart.apply(req)?;
@@ -410,7 +410,7 @@ impl GdriveCore {
         &self,
         file_id: &str,
         size: u64,
-        body: bytes::Bytes,
+        stream: oio::Streamer,
     ) -> Result<Response<IncomingAsyncBody>> {
         let url = format!(
             
"https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media";,
@@ -421,7 +421,7 @@ impl GdriveCore {
             .header(header::CONTENT_TYPE, "application/octet-stream")
             .header(header::CONTENT_LENGTH, size)
             .header("X-Upload-Content-Length", size)
-            .body(AsyncBody::Bytes(body))
+            .body(AsyncBody::Stream(stream))
             .map_err(new_request_build_error)?;
 
         self.sign(&mut req).await?;
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index 12dd1cc43..9c67cebd4 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -23,6 +23,7 @@ use http::StatusCode;
 
 use super::core::GdriveCore;
 use super::error::parse_error;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::services::gdrive::core::GdriveFile;
 use crate::*;
@@ -49,7 +50,7 @@ impl GdriveWriter {
     ///
     /// This is used for small objects.
     /// And should overwrite the object if it already exists.
-    pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> 
{
+    pub async fn write_create(&mut self, size: u64, body: oio::Streamer) -> 
Result<()> {
         let resp = self
             .core
             .gdrive_upload_simple_request(&self.path, size, body)
@@ -72,13 +73,13 @@ impl GdriveWriter {
         }
     }
 
-    pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> {
+    pub async fn write_overwrite(&self, size: u64, stream: oio::Streamer) -> 
Result<()> {
         let file_id = self.file_id.as_ref().ok_or_else(|| {
             Error::new(ErrorKind::Unexpected, "file_id is required for 
overwrite")
         })?;
         let resp = self
             .core
-            .gdrive_upload_overwrite_simple_request(file_id, size, body)
+            .gdrive_upload_overwrite_simple_request(file_id, size, stream)
             .await?;
 
         let status = resp.status();
@@ -91,20 +92,16 @@ impl GdriveWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        if self.file_id.is_none() {
-            self.write_create(bs.len() as u64, bs).await
-        } else {
-            self.write_overwrite(bs.len() as u64, bs).await
-        }
-    }
 }
 
 #[async_trait]
 impl oio::Write for GdriveWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        if self.file_id.is_none() {
+            self.write_create(s.size(), s).await
+        } else {
+            self.write_overwrite(s.size(), s).await
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to