This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit faa619f24229b8f40c0c76be63d9989b73f74a47 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:38:03 2023 +0800 Migrate gdrive Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/http_util/multipart.rs | 45 +++++++++---------------------------- core/src/services/gcs/core.rs | 2 +- core/src/services/gdrive/core.rs | 8 +++---- core/src/services/gdrive/writer.rs | 23 +++++++++---------- 4 files changed, 26 insertions(+), 52 deletions(-) diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index 309ae3daa..aa4232f93 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -246,7 +246,6 @@ pub trait Part: Sized + 'static { pub struct FormDataPart { headers: HeaderMap, - content_length: u64, content: Streamer, } @@ -266,7 +265,6 @@ impl FormDataPart { Self { headers, - content_length: 0, content: Box::new(oio::Cursor::new()), } } @@ -281,14 +279,12 @@ impl FormDataPart { pub fn content(mut self, content: impl Into<Bytes>) -> Self { let content = content.into(); - self.content_length = content.len() as u64; self.content = Box::new(oio::Cursor::from(content)); self } /// Set the stream content for this part. - pub fn stream(mut self, size: u64, content: Streamer) -> Self { - self.content_length = size; + pub fn stream(mut self, content: Streamer) -> Self { self.content = content; self } @@ -312,7 +308,7 @@ impl Part for FormDataPart { let bs = bs.freeze(); // pre-content + content + post-content (b`\r\n`) - let total_size = bs.len() as u64 + self.content_length + 2; + let total_size = bs.len() as u64 + self.content.size() + 2; FormDataPartStream { size: total_size, @@ -383,7 +379,6 @@ pub struct MixedPart { /// Common version: Version, headers: HeaderMap, - content_length: u64, content: Option<Streamer>, /// Request only @@ -408,7 +403,6 @@ impl MixedPart { version: Version::HTTP_11, headers: HeaderMap::new(), - content_length: 0, content: None, uri: Some(uri), @@ -426,21 +420,10 @@ impl MixedPart { let (parts, body) = req.into_parts(); - let (content_length, content) = match body { - AsyncBody::Empty => (0, None), - AsyncBody::Bytes(bs) => ( - bs.len() as u64, - Some(Box::new(oio::Cursor::from(bs)) as Streamer), - ), - AsyncBody::Stream(stream) => { - let len = parts - .headers - .get(CONTENT_LENGTH) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse::<u64>().ok()) - .expect("the content length of a mixed part must be valid"); - (len, Some(stream)) - } + let content = match body { + AsyncBody::Empty => None, + AsyncBody::Bytes(bs) => Some(Box::new(oio::Cursor::from(bs)) as Streamer), + AsyncBody::Stream(stream) => Some(stream), }; Self { @@ -457,7 +440,6 @@ impl MixedPart { ), version: parts.version, headers: parts.headers, - content_length, content, method: Some(parts.method), @@ -475,7 +457,8 @@ impl MixedPart { mem::swap(builder.headers_mut().unwrap(), &mut self.headers); let body = if let Some(stream) = self.content { - IncomingAsyncBody::new(stream, Some(self.content_length)) + let size = stream.size(); + IncomingAsyncBody::new(stream, Some(size)) } else { IncomingAsyncBody::new(Box::new(oio::into_stream(0, stream::empty())), Some(0)) }; @@ -513,14 +496,12 @@ impl MixedPart { pub fn content(mut self, content: impl Into<Bytes>) -> Self { let content = content.into(); - self.content_length = content.len() as u64; self.content = Some(Box::new(oio::Cursor::from(content))); self } /// Set the stream content for this part. - pub fn stream(mut self, size: u64, content: Streamer) -> Self { - self.content_length = size; + pub fn stream(mut self, content: Streamer) -> Self { self.content = Some(content); self } @@ -594,8 +575,8 @@ impl Part for MixedPart { // pre-content + content + post-content; let mut total_size = bs.len() as u64; - if self.content.is_some() { - total_size += self.content_length + 2; + if let Some(stream) = &self.content { + total_size += stream.size() + 2; } MixedPartStream { @@ -676,7 +657,6 @@ impl Part for MixedPart { part_headers, version: Version::HTTP_11, headers, - content_length: body_bytes.len() as u64, content: Some(Box::new(oio::Cursor::from(body_bytes))), method: None, @@ -1171,7 +1151,6 @@ Content-Length: 846 h }); - assert_eq!(multipart.parts[0].content_length, part0_bs.len() as u64); assert_eq!(multipart.parts[0].uri, None); assert_eq!(multipart.parts[0].method, None); assert_eq!( @@ -1211,7 +1190,6 @@ Content-Length: 846 h }); - assert_eq!(multipart.parts[1].content_length, part1_bs.len() as u64); assert_eq!(multipart.parts[1].uri, None); assert_eq!(multipart.parts[1].method, None); assert_eq!( @@ -1251,7 +1229,6 @@ Content-Length: 846 h }); - assert_eq!(multipart.parts[2].content_length, part2_bs.len() as u64); assert_eq!(multipart.parts[2].uri, None); assert_eq!(multipart.parts[2].method, None); assert_eq!( diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs index 9a5a35c48..c7d0551be 100644 --- a/core/src/services/gcs/core.rs +++ b/core/src/services/gcs/core.rs @@ -282,7 +282,7 @@ impl GcsCore { media_part = media_part.content(bytes); } AsyncBody::Stream(stream) => { - media_part = media_part.stream(size.unwrap(), stream); + media_part = media_part.stream(stream); } } diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs index 24ff1cf89..2551349b4 100644 --- a/core/src/services/gdrive/core.rs +++ b/core/src/services/gdrive/core.rs @@ -366,7 +366,7 @@ impl GdriveCore { &self, path: &str, size: u64, - body: bytes::Bytes, + stream: oio::Streamer, ) -> Result<Response<IncomingAsyncBody>> { let parent = self.ensure_parent_path(path).await?; @@ -396,7 +396,7 @@ impl GdriveCore { header::CONTENT_TYPE, "application/octet-stream".parse().unwrap(), ) - .content(body), + .stream(stream), ); let mut req = multipart.apply(req)?; @@ -410,7 +410,7 @@ impl GdriveCore { &self, file_id: &str, size: u64, - body: bytes::Bytes, + stream: oio::Streamer, ) -> Result<Response<IncomingAsyncBody>> { let url = format!( "https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media", @@ -421,7 +421,7 @@ impl GdriveCore { .header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_LENGTH, size) .header("X-Upload-Content-Length", size) - .body(AsyncBody::Bytes(body)) + .body(AsyncBody::Stream(stream)) .map_err(new_request_build_error)?; self.sign(&mut req).await?; diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 12dd1cc43..9c67cebd4 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -23,6 +23,7 @@ use http::StatusCode; use super::core::GdriveCore; use super::error::parse_error; +use crate::raw::oio::Stream; use crate::raw::*; use crate::services::gdrive::core::GdriveFile; use crate::*; @@ -49,7 +50,7 @@ impl GdriveWriter { /// /// This is used for small objects. /// And should overwrite the object if it already exists. - pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()> { + pub async fn write_create(&mut self, size: u64, body: oio::Streamer) -> Result<()> { let resp = self .core .gdrive_upload_simple_request(&self.path, size, body) @@ -72,13 +73,13 @@ impl GdriveWriter { } } - pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> { + pub async fn write_overwrite(&self, size: u64, stream: oio::Streamer) -> Result<()> { let file_id = self.file_id.as_ref().ok_or_else(|| { Error::new(ErrorKind::Unexpected, "file_id is required for overwrite") })?; let resp = self .core - .gdrive_upload_overwrite_simple_request(file_id, size, body) + .gdrive_upload_overwrite_simple_request(file_id, size, stream) .await?; let status = resp.status(); @@ -91,20 +92,16 @@ impl GdriveWriter { _ => Err(parse_error(resp).await?), } } - - async fn write(&mut self, bs: Bytes) -> Result<()> { - if self.file_id.is_none() { - self.write_create(bs.len() as u64, bs).await - } else { - self.write_overwrite(bs.len() as u64, bs).await - } - } } #[async_trait] impl oio::Write for GdriveWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new(ErrorKind::Unsupported, "sink is not supported")) + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + if self.file_id.is_none() { + self.write_create(s.size(), s).await + } else { + self.write_overwrite(s.size(), s).await + } } async fn abort(&mut self) -> Result<()> {
