This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch stream-based-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit b5afcb5bfa95f5a0a1ad75315fd1906eefdf8776 Author: Xuanwo <[email protected]> AuthorDate: Wed Aug 30 18:46:03 2023 +0800 Migrate onedrive Signed-off-by: Xuanwo <[email protected]> --- core/src/services/onedrive/backend.rs | 2 +- core/src/services/onedrive/writer.rs | 189 ++++++++++++++++------------------ 2 files changed, 91 insertions(+), 100 deletions(-) diff --git a/core/src/services/onedrive/backend.rs b/core/src/services/onedrive/backend.rs index 09ef04765..76429107c 100644 --- a/core/src/services/onedrive/backend.rs +++ b/core/src/services/onedrive/backend.rs @@ -263,7 +263,7 @@ impl OnedriveBackend { pub async fn onedrive_upload_simple( &self, path: &str, - size: Option<usize>, + size: Option<u64>, content_type: Option<&str>, body: AsyncBody, ) -> Result<Response<IncomingAsyncBody>> { diff --git a/core/src/services/onedrive/writer.rs b/core/src/services/onedrive/writer.rs index 219855dd3..b616c46e8 100644 --- a/core/src/services/onedrive/writer.rs +++ b/core/src/services/onedrive/writer.rs @@ -24,6 +24,7 @@ use super::backend::OnedriveBackend; use super::error::parse_error; use super::graph_model::OneDriveUploadSessionCreationRequestBody; use super::graph_model::OneDriveUploadSessionCreationResponseBody; +use crate::raw::oio::Stream; use crate::raw::*; use crate::*; @@ -39,28 +40,16 @@ impl OneDriveWriter { // If your app splits a file into multiple byte ranges, the size of each byte range MUST be a multiple of 320 KiB (327,680 bytes). Using a fragment size that does not divide evenly by 320 KiB will result in errors committing some files. // https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session const CHUNK_SIZE_FACTOR: usize = 327_680; + pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self { OneDriveWriter { backend, op, path } } - - async fn write(&mut self, bs: Bytes) -> Result<()> { - let size = bs.len(); - - if size <= Self::MAX_SIMPLE_SIZE { - self.write_simple(bs).await - } else { - self.write_chunked(bs).await - } - } } #[async_trait] impl oio::Write for OneDriveWriter { - async fn write(&mut self, _s: oio::Streamer) -> Result<()> { - Err(Error::new( - ErrorKind::Unsupported, - "Write::sink is not supported", - )) + async fn write(&mut self, s: oio::Streamer) -> Result<()> { + self.write_simple(s).await } async fn abort(&mut self) -> Result<()> { @@ -73,14 +62,14 @@ impl oio::Write for OneDriveWriter { } impl OneDriveWriter { - async fn write_simple(&mut self, bs: Bytes) -> Result<()> { + async fn write_simple(&mut self, s: oio::Streamer) -> Result<()> { let resp = self .backend .onedrive_upload_simple( &self.path, - Some(bs.len()), + Some(s.size()), self.op.content_type(), - AsyncBody::Bytes(bs), + AsyncBody::Stream(s), ) .await?; @@ -97,85 +86,87 @@ impl OneDriveWriter { } } - pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> Result<()> { - // Upload large files via sessions: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session - // 1. Create an upload session - // 2. Upload the bytes of each chunk - // 3. Commit the session - - let session_response = self.create_upload_session().await?; - - let mut offset = 0; - - let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR); - - for chunk in iter { - let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR; - if end > total_bytes.len() { - end = total_bytes.len(); - } - let total_len = total_bytes.len(); - let chunk_end = end - 1; - - let resp = self - .backend - .onedrive_chunked_upload( - &session_response.upload_url, - None, - offset, - chunk_end, - total_len, - AsyncBody::Bytes(Bytes::copy_from_slice(chunk)), - ) - .await?; - - let status = resp.status(); - - match status { - // Typical response code: 202 Accepted - // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response - StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK => { - resp.into_body().consume().await?; - } - _ => return Err(parse_error(resp).await?), - } - - offset += OneDriveWriter::CHUNK_SIZE_FACTOR; - } - - Ok(()) - } - - async fn create_upload_session(&self) -> Result<OneDriveUploadSessionCreationResponseBody> { - let file_name_from_path = self.path.split('/').last().ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "connection string must have AccountName", - ) - })?; - let url = format!( - "{}/drive/root:{}:/createUploadSession", - OnedriveBackend::BASE_URL, - percent_encode_path(&self.path) - ); - let body = OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string()); - - let resp = self - .backend - .onedrive_create_upload_session(&url, body) - .await?; - - let status = resp.status(); - - match status { - // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response - StatusCode::OK => { - let bs = resp.into_body().bytes().await?; - let result: OneDriveUploadSessionCreationResponseBody = - serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; - Ok(result) - } - _ => Err(parse_error(resp).await?), - } - } + // TODO: We should migrate to opendal's framework of uploading large files. + // + // pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> Result<()> { + // // Upload large files via sessions: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session + // // 1. Create an upload session + // // 2. Upload the bytes of each chunk + // // 3. Commit the session + // + // let session_response = self.create_upload_session().await?; + // + // let mut offset = 0; + // + // let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR); + // + // for chunk in iter { + // let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR; + // if end > total_bytes.len() { + // end = total_bytes.len(); + // } + // let total_len = total_bytes.len(); + // let chunk_end = end - 1; + // + // let resp = self + // .backend + // .onedrive_chunked_upload( + // &session_response.upload_url, + // None, + // offset, + // chunk_end, + // total_len, + // AsyncBody::Bytes(Bytes::copy_from_slice(chunk)), + // ) + // .await?; + // + // let status = resp.status(); + // + // match status { + // // Typical response code: 202 Accepted + // // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response + // StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK => { + // resp.into_body().consume().await?; + // } + // _ => return Err(parse_error(resp).await?), + // } + // + // offset += OneDriveWriter::CHUNK_SIZE_FACTOR; + // } + // + // Ok(()) + // } + // + // async fn create_upload_session(&self) -> Result<OneDriveUploadSessionCreationResponseBody> { + // let file_name_from_path = self.path.split('/').last().ok_or_else(|| { + // Error::new( + // ErrorKind::Unexpected, + // "connection string must have AccountName", + // ) + // })?; + // let url = format!( + // "{}/drive/root:{}:/createUploadSession", + // OnedriveBackend::BASE_URL, + // percent_encode_path(&self.path) + // ); + // let body = OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string()); + // + // let resp = self + // .backend + // .onedrive_create_upload_session(&url, body) + // .await?; + // + // let status = resp.status(); + // + // match status { + // // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response + // StatusCode::OK => { + // let bs = resp.into_body().bytes().await?; + // let result: OneDriveUploadSessionCreationResponseBody = + // serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?; + // Ok(result) + // } + // _ => Err(parse_error(resp).await?), + // } + // } }
