This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 777b3d7528a987f3fe6ff17f91fb292023ed78ec Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 19:00:30 2023 +0800 Make clippy happy Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/http_util/multipart.rs | 29 +++-- core/src/services/azblob/writer.rs | 1 - core/src/services/dropbox/writer.rs | 1 - core/src/services/ftp/writer.rs | 1 - core/src/services/gcs/writer.rs | 174 +++++++++++++-------------- core/src/services/gdrive/writer.rs | 1 - core/src/services/ghac/writer.rs | 1 - core/src/services/ipmfs/backend.rs | 1 - core/src/services/ipmfs/writer.rs | 1 - core/src/services/onedrive/backend.rs | 98 ++++++++------- core/src/services/onedrive/graph_model.rs | 22 ++-- core/src/services/onedrive/writer.rs | 4 +- core/src/services/sftp/writer.rs | 1 - core/src/services/supabase/writer.rs | 1 - core/src/services/vercel_artifacts/writer.rs | 1 - core/src/services/wasabi/writer.rs | 1 - core/src/services/webdav/writer.rs | 1 - core/src/services/webhdfs/writer.rs | 1 - 18 files changed, 160 insertions(+), 180 deletions(-) diff --git a/core/src/raw/http_util/multipart.rs b/core/src/raw/http_util/multipart.rs index aa4232f93..6acc691b4 100644 --- a/core/src/raw/http_util/multipart.rs +++ b/core/src/raw/http_util/multipart.rs @@ -772,7 +772,7 @@ mod tests { .part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU=")) .part(FormDataPart::new("file").header(CONTENT_TYPE, "image/jpeg".parse().unwrap()).content("...file content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3")); - let mut body = multipart.build(); + let body = multipart.build(); let size = body.size(); let bs = body.collect().await?; assert_eq!(size, bs.len() as u64); @@ -1055,8 +1055,8 @@ content-length: 0 } /// This test is inspired by <https://cloud.google.com/storage/docs/batch> - #[test] - fn test_multipart_mixed_gcs_batch_metadata_response() { + #[tokio::test] + async fn test_multipart_mixed_gcs_batch_metadata_response() { let response = r#"--batch_pK7JBAk73-E=_AA5eFwv4m2Q= Content-Type: application/http Content-ID: <response-b29c5de2-0db4-490b-b421-6a51b598bd22+1> @@ -1101,22 +1101,11 @@ Content-Length: 846 --batch_pK7JBAk73-E=_AA5eFwv4m2Q=--"#.replace('\n', "\r\n"); - let multipart: Multipart<MixedPart> = Multipart::new() + let mut multipart: Multipart<MixedPart> = Multipart::new() .with_boundary("batch_pK7JBAk73-E=_AA5eFwv4m2Q=") .parse(Bytes::from(response)) .unwrap(); - let part0_bs = Bytes::from_static( - r#"{"kind": "storage#object","id": "example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"#.as_bytes()); - let part1_bs = Bytes::from_static( - r#"{"kind": "storage#object","id": "example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"# - .as_bytes() - ); - let part2_bs = Bytes::from_static( - r#"{"kind": "storage#object","id": "example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"# - .as_bytes() - ); - assert_eq!(multipart.parts.len(), 3); assert_eq!(multipart.parts[0].part_headers, { @@ -1157,6 +1146,8 @@ Content-Length: 846 multipart.parts[0].status_code, Some(StatusCode::from_u16(200).unwrap()) ); + assert_eq!(multipart.parts[0].content.take().unwrap().collect().await.unwrap(), Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"#.as_bytes())); assert_eq!(multipart.parts[1].part_headers, { let mut h = HeaderMap::new(); @@ -1196,6 +1187,10 @@ Content-Length: 846 multipart.parts[1].status_code, Some(StatusCode::from_u16(200).unwrap()) ); + assert_eq!(multipart.parts[1].content.take().unwrap().collect().await.unwrap(), Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"# + .as_bytes() + )); assert_eq!(multipart.parts[2].part_headers, { let mut h = HeaderMap::new(); @@ -1235,5 +1230,9 @@ Content-Length: 846 multipart.parts[2].status_code, Some(StatusCode::from_u16(200).unwrap()) ); + assert_eq!(multipart.parts[2].content.take().unwrap().collect().await.unwrap(), Bytes::from_static( + r#"{"kind": "storage#object","id": "example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"# + .as_bytes() + )); } } diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 7e121cd05..446bd8f0c 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -27,7 +27,6 @@ use crate::raw::*; use crate::*; const X_MS_BLOB_TYPE: &str = "x-ms-blob-type"; -const X_MS_BLOB_APPEND_OFFSET: &str = "x-ms-blob-append-offset"; pub type AzblobWriters = oio::TwoWaysWriter<oio::OneShotWriter<AzblobWriter>, oio::AppendObjectWriter<AzblobWriter>>; diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index a43c89f5a..23c71ed0a 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::DropboxCore; diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index 734fd4f74..7d46f409b 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use futures::AsyncWriteExt; use super::backend::FtpBackend; diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index add598ba6..517781e14 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::GcsCore; @@ -35,13 +34,11 @@ pub struct GcsWriter { location: Option<String>, written: u64, buffer: oio::VectorCursor, - write_fixed_size: usize, } /// TODO we need to add buffer support for gcs. impl GcsWriter { pub fn new(core: Arc<GcsCore>, path: &str, op: OpWrite) -> Self { - let write_fixed_size = core.write_fixed_size; GcsWriter { core, path: path.to_string(), @@ -50,7 +47,6 @@ impl GcsWriter { location: None, written: 0, buffer: oio::VectorCursor::new(), - write_fixed_size, } } @@ -77,91 +73,91 @@ impl GcsWriter { } } - async fn initiate_upload(&self) -> Result<String> { - let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?; - let status = resp.status(); - - match status { - StatusCode::OK => { - let bs = parse_location(resp.headers())?; - if let Some(location) = bs { - Ok(location.to_string()) - } else { - Err(Error::new( - ErrorKind::Unexpected, - "location is not in the response header", - )) - } - } - _ => Err(parse_error(resp).await?), - } - } - - async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> { - let mut req = self.core.gcs_upload_in_resumable_upload( - location, - bs.len() as u64, - self.written, - false, - AsyncBody::Bytes(bs), - )?; - - self.core.sign(&mut req).await?; - - let resp = self.core.send(req).await?; - - let status = resp.status(); - match status { - StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()), - _ => Err(parse_error(resp).await?), - } - } - - async fn write(&mut self, bs: Bytes) -> Result<()> { - let location = match &self.location { - Some(location) => location, - None => { - if self.op.content_length().unwrap_or_default() == bs.len() as u64 - && self.written == 0 - { - return self - .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) - .await; - } else { - let location = self.initiate_upload().await?; - self.location = Some(location); - self.location.as_deref().unwrap() - } - } - }; - - // Ignore empty bytes - if bs.is_empty() { - return Ok(()); - } - - self.buffer.push(bs); - // Return directly if the buffer is not full - if self.buffer.len() <= self.write_fixed_size { - return Ok(()); - } - - let bs = self.buffer.peak_exact(self.write_fixed_size); - - match self.write_part(location, bs).await { - Ok(_) => { - self.buffer.take(self.write_fixed_size); - self.written += self.write_fixed_size as u64; - Ok(()) - } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } - } + // async fn initiate_upload(&self) -> Result<String> { + // let resp = self.core.gcs_initiate_resumable_upload(&self.path).await?; + // let status = resp.status(); + // + // match status { + // StatusCode::OK => { + // let bs = parse_location(resp.headers())?; + // if let Some(location) = bs { + // Ok(location.to_string()) + // } else { + // Err(Error::new( + // ErrorKind::Unexpected, + // "location is not in the response header", + // )) + // } + // } + // _ => Err(parse_error(resp).await?), + // } + // } + + // async fn write_part(&self, location: &str, bs: Bytes) -> Result<()> { + // let mut req = self.core.gcs_upload_in_resumable_upload( + // location, + // bs.len() as u64, + // self.written, + // false, + // AsyncBody::Bytes(bs), + // )?; + // + // self.core.sign(&mut req).await?; + // + // let resp = self.core.send(req).await?; + // + // let status = resp.status(); + // match status { + // StatusCode::OK | StatusCode::PERMANENT_REDIRECT => Ok(()), + // _ => Err(parse_error(resp).await?), + // } + // } + + // async fn write(&mut self, bs: Bytes) -> Result<()> { + // let location = match &self.location { + // Some(location) => location, + // None => { + // if self.op.content_length().unwrap_or_default() == bs.len() as u64 + // && self.written == 0 + // { + // return self + // .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs)) + // .await; + // } else { + // let location = self.initiate_upload().await?; + // self.location = Some(location); + // self.location.as_deref().unwrap() + // } + // } + // }; + // + // // Ignore empty bytes + // if bs.is_empty() { + // return Ok(()); + // } + // + // self.buffer.push(bs); + // // Return directly if the buffer is not full + // if self.buffer.len() <= self.write_fixed_size { + // return Ok(()); + // } + // + // let bs = self.buffer.peak_exact(self.write_fixed_size); + // + // match self.write_part(location, bs).await { + // Ok(_) => { + // self.buffer.take(self.write_fixed_size); + // self.written += self.write_fixed_size as u64; + // Ok(()) + // } + // Err(e) => { + // // If the upload fails, we should pop the given bs to make sure + // // write is re-enter safe. + // self.buffer.pop(); + // Err(e) + // } + // } + // } } #[async_trait] diff --git a/core/src/services/gdrive/writer.rs b/core/src/services/gdrive/writer.rs index 9c67cebd4..e87c9410f 100644 --- a/core/src/services/gdrive/writer.rs +++ b/core/src/services/gdrive/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::GdriveCore; diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index a9fcff2e5..bc6b57571 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use super::backend::GhacBackend; use super::error::parse_error; diff --git a/core/src/services/ipmfs/backend.rs b/core/src/services/ipmfs/backend.rs index c5fa4c7ba..8b2a19eb3 100644 --- a/core/src/services/ipmfs/backend.rs +++ b/core/src/services/ipmfs/backend.rs @@ -21,7 +21,6 @@ use std::str; use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::Request; use http::Response; use http::StatusCode; diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 5e5359b37..514255ad0 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::IpmfsBackend; diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 76429107c..9898c34f2 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -18,7 +18,6 @@ use std::fmt::Debug; use async_trait::async_trait; -use bytes::Bytes; use http::header; use http::Request; use http::Response; @@ -27,7 +26,6 @@ use http::StatusCode; use super::error::parse_error; use super::graph_model::CreateDirPayload; use super::graph_model::ItemType; -use super::graph_model::OneDriveUploadSessionCreationRequestBody; use super::graph_model::OnedriveGetItemBody; use super::pager::OnedrivePager; use super::writer::OneDriveWriter; @@ -202,7 +200,7 @@ impl Accessor for OnedriveBackend { } impl OnedriveBackend { - pub(crate) const BASE_URL: &'static str = "https://graph.microsoft.com/v1.0/me"; + // pub(crate) const BASE_URL: &'static str = "https://graph.microsoft.com/v1.0/me"; async fn onedrive_get_stat(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { let path = build_rooted_abs_path(&self.root, path); @@ -290,53 +288,53 @@ impl OnedriveBackend { self.client.send(req).await } - pub(crate) async fn onedrive_chunked_upload( - &self, - url: &str, - content_type: Option<&str>, - offset: usize, - chunk_end: usize, - total_len: usize, - body: AsyncBody, - ) -> Result<Response<IncomingAsyncBody>> { - let mut req = Request::put(url); - - let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len); - req = req.header("Content-Range".to_string(), range); - - let size = chunk_end - offset + 1; - req = req.header(header::CONTENT_LENGTH, size.to_string()); - - if let Some(mime) = content_type { - req = req.header(header::CONTENT_TYPE, mime) - } - - let req = req.body(body).map_err(new_request_build_error)?; - - self.client.send(req).await - } - - pub(crate) async fn onedrive_create_upload_session( - &self, - url: &str, - body: OneDriveUploadSessionCreationRequestBody, - ) -> Result<Response<IncomingAsyncBody>> { - let mut req = Request::post(url); - - let auth_header_content = format!("Bearer {}", self.access_token); - req = req.header(header::AUTHORIZATION, auth_header_content); - - req = req.header(header::CONTENT_TYPE, "application/json"); - - let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; - let asyn_body = AsyncBody::Bytes(Bytes::from(body_bytes)); - let req = req.body(asyn_body).map_err(new_request_build_error)?; - - self.client.send(req).await - } + // pub(crate) async fn onedrive_chunked_upload( + // &self, + // url: &str, + // content_type: Option<&str>, + // offset: usize, + // chunk_end: usize, + // total_len: usize, + // body: AsyncBody, + // ) -> Result<Response<IncomingAsyncBody>> { + // let mut req = Request::put(url); + // + // let auth_header_content = format!("Bearer {}", self.access_token); + // req = req.header(header::AUTHORIZATION, auth_header_content); + // + // let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len); + // req = req.header("Content-Range".to_string(), range); + // + // let size = chunk_end - offset + 1; + // req = req.header(header::CONTENT_LENGTH, size.to_string()); + // + // if let Some(mime) = content_type { + // req = req.header(header::CONTENT_TYPE, mime) + // } + // + // let req = req.body(body).map_err(new_request_build_error)?; + // + // self.client.send(req).await + // } + + // pub(crate) async fn onedrive_create_upload_session( + // &self, + // url: &str, + // body: OneDriveUploadSessionCreationRequestBody, + // ) -> Result<Response<IncomingAsyncBody>> { + // let mut req = Request::post(url); + // + // let auth_header_content = format!("Bearer {}", self.access_token); + // req = req.header(header::AUTHORIZATION, auth_header_content); + // + // req = req.header(header::CONTENT_TYPE, "application/json"); + // + // let body_bytes = serde_json::to_vec(&body).map_err(new_json_serialize_error)?; + // let asyn_body = AsyncBody::Bytes(Bytes::from(body_bytes)); + // let req = req.body(asyn_body).map_err(new_request_build_error)?; + // + // self.client.send(req).await + // } async fn onedrive_create_dir( &self, diff --git a/core/src/services/onedrive/graph_model.rs b/core/src/services/onedrive/graph_model.rs index e51c69010..5326513fa 100644 --- a/core/src/services/onedrive/graph_model.rs +++ b/core/src/services/onedrive/graph_model.rs @@ -132,17 +132,17 @@ pub struct OneDriveUploadSessionCreationRequestBody { item: FileUploadItem, } -impl OneDriveUploadSessionCreationRequestBody { - pub fn new(path: String) -> Self { - OneDriveUploadSessionCreationRequestBody { - item: FileUploadItem { - odata_type: "microsoft.graph.driveItemUploadableProperties".to_string(), - microsoft_graph_conflict_behavior: "replace".to_string(), - name: path, - }, - } - } -} +// impl OneDriveUploadSessionCreationRequestBody { +// pub fn new(path: String) -> Self { +// OneDriveUploadSessionCreationRequestBody { +// item: FileUploadItem { +// odata_type: "microsoft.graph.driveItemUploadableProperties".to_string(), +// microsoft_graph_conflict_behavior: "replace".to_string(), +// name: path, +// }, +// } +// } +// } #[test] fn test_parse_one_drive_json() { diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index c1abb2230..16c97c40c 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -32,10 +32,10 @@ pub struct OneDriveWriter { } impl OneDriveWriter { - const MAX_SIMPLE_SIZE: usize = 4 * 1024 * 1024; + // const MAX_SIMPLE_SIZE: usize = 4 * 1024 * 1024; // If your app splits a file into multiple byte ranges, the size of each byte range MUST be a multiple of 320 KiB (327,680 bytes). Using a fragment size that does not divide evenly by 320 KiB will result in errors committing some files. // https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session - const CHUNK_SIZE_FACTOR: usize = 327_680; + // const CHUNK_SIZE_FACTOR: usize = 327_680; pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self { OneDriveWriter { backend, op, path } diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 28a49856e..e14bc5ce8 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use openssh_sftp_client::file::File; use crate::raw::oio; diff --git a/core/src/services/supabase/writer.rs b/core/src/services/supabase/writer.rs index bda8dd3e4..7b4284e0f 100644 --- a/core/src/services/supabase/writer.rs +++ b/core/src/services/supabase/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::*; diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index 3e6746cd0..a4342bb0f 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::VercelArtifactsBackend; diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index d043d44fd..ac4df5ad2 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::*; diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index 50a40d9d5..bcb890b7f 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebdavBackend; diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 3097395ae..4b667a0b9 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebhdfsBackend;
