This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit b5afcb5bfa95f5a0a1ad75315fd1906eefdf8776
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:46:03 2023 +0800

    Migrate onedrive
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/onedrive/backend.rs |   2 +-
 core/src/services/onedrive/writer.rs  | 189 ++++++++++++++++------------------
 2 files changed, 91 insertions(+), 100 deletions(-)

diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 09ef04765..76429107c 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -263,7 +263,7 @@ impl OnedriveBackend {
     pub async fn onedrive_upload_simple(
         &self,
         path: &str,
-        size: Option<usize>,
+        size: Option<u64>,
         content_type: Option<&str>,
         body: AsyncBody,
     ) -> Result<Response<IncomingAsyncBody>> {
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 219855dd3..b616c46e8 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -24,6 +24,7 @@ use super::backend::OnedriveBackend;
 use super::error::parse_error;
 use super::graph_model::OneDriveUploadSessionCreationRequestBody;
 use super::graph_model::OneDriveUploadSessionCreationResponseBody;
+use crate::raw::oio::Stream;
 use crate::raw::*;
 use crate::*;
 
@@ -39,28 +40,16 @@ impl OneDriveWriter {
     // If your app splits a file into multiple byte ranges, the size of each 
byte range MUST be a multiple of 320 KiB (327,680 bytes). Using a fragment size 
that does not divide evenly by 320 KiB will result in errors committing some 
files.
     // 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
     const CHUNK_SIZE_FACTOR: usize = 327_680;
+
     pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self {
         OneDriveWriter { backend, op, path }
     }
-
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let size = bs.len();
-
-        if size <= Self::MAX_SIMPLE_SIZE {
-            self.write_simple(bs).await
-        } else {
-            self.write_chunked(bs).await
-        }
-    }
 }
 
 #[async_trait]
 impl oio::Write for OneDriveWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.write_simple(s).await
     }
 
     async fn abort(&mut self) -> Result<()> {
@@ -73,14 +62,14 @@ impl oio::Write for OneDriveWriter {
 }
 
 impl OneDriveWriter {
-    async fn write_simple(&mut self, bs: Bytes) -> Result<()> {
+    async fn write_simple(&mut self, s: oio::Streamer) -> Result<()> {
         let resp = self
             .backend
             .onedrive_upload_simple(
                 &self.path,
-                Some(bs.len()),
+                Some(s.size()),
                 self.op.content_type(),
-                AsyncBody::Bytes(bs),
+                AsyncBody::Stream(s),
             )
             .await?;
 
@@ -97,85 +86,87 @@ impl OneDriveWriter {
         }
     }
 
-    pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> Result<()> 
{
-        // Upload large files via sessions: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
-        // 1. Create an upload session
-        // 2. Upload the bytes of each chunk
-        // 3. Commit the session
-
-        let session_response = self.create_upload_session().await?;
-
-        let mut offset = 0;
-
-        let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR);
-
-        for chunk in iter {
-            let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR;
-            if end > total_bytes.len() {
-                end = total_bytes.len();
-            }
-            let total_len = total_bytes.len();
-            let chunk_end = end - 1;
-
-            let resp = self
-                .backend
-                .onedrive_chunked_upload(
-                    &session_response.upload_url,
-                    None,
-                    offset,
-                    chunk_end,
-                    total_len,
-                    AsyncBody::Bytes(Bytes::copy_from_slice(chunk)),
-                )
-                .await?;
-
-            let status = resp.status();
-
-            match status {
-                // Typical response code: 202 Accepted
-                // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
-                StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK => 
{
-                    resp.into_body().consume().await?;
-                }
-                _ => return Err(parse_error(resp).await?),
-            }
-
-            offset += OneDriveWriter::CHUNK_SIZE_FACTOR;
-        }
-
-        Ok(())
-    }
-
-    async fn create_upload_session(&self) -> 
Result<OneDriveUploadSessionCreationResponseBody> {
-        let file_name_from_path = self.path.split('/').last().ok_or_else(|| {
-            Error::new(
-                ErrorKind::Unexpected,
-                "connection string must have AccountName",
-            )
-        })?;
-        let url = format!(
-            "{}/drive/root:{}:/createUploadSession",
-            OnedriveBackend::BASE_URL,
-            percent_encode_path(&self.path)
-        );
-        let body = 
OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string());
-
-        let resp = self
-            .backend
-            .onedrive_create_upload_session(&url, body)
-            .await?;
-
-        let status = resp.status();
-
-        match status {
-            // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response
-            StatusCode::OK => {
-                let bs = resp.into_body().bytes().await?;
-                let result: OneDriveUploadSessionCreationResponseBody =
-                    
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
-                Ok(result)
-            }
-            _ => Err(parse_error(resp).await?),
-        }
-    }
+    // TODO: We should migrate to opendal's framework of uploading large files.
+    //
+    // pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> 
Result<()> {
+    //     // Upload large files via sessions: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
+    //     // 1. Create an upload session
+    //     // 2. Upload the bytes of each chunk
+    //     // 3. Commit the session
+    //
+    //     let session_response = self.create_upload_session().await?;
+    //
+    //     let mut offset = 0;
+    //
+    //     let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR);
+    //
+    //     for chunk in iter {
+    //         let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR;
+    //         if end > total_bytes.len() {
+    //             end = total_bytes.len();
+    //         }
+    //         let total_len = total_bytes.len();
+    //         let chunk_end = end - 1;
+    //
+    //         let resp = self
+    //             .backend
+    //             .onedrive_chunked_upload(
+    //                 &session_response.upload_url,
+    //                 None,
+    //                 offset,
+    //                 chunk_end,
+    //                 total_len,
+    //                 AsyncBody::Bytes(Bytes::copy_from_slice(chunk)),
+    //             )
+    //             .await?;
+    //
+    //         let status = resp.status();
+    //
+    //         match status {
+    //             // Typical response code: 202 Accepted
+    //             // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
+    //             StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK 
=> {
+    //                 resp.into_body().consume().await?;
+    //             }
+    //             _ => return Err(parse_error(resp).await?),
+    //         }
+    //
+    //         offset += OneDriveWriter::CHUNK_SIZE_FACTOR;
+    //     }
+    //
+    //     Ok(())
+    // }
+    //
+    // async fn create_upload_session(&self) -> 
Result<OneDriveUploadSessionCreationResponseBody> {
+    //     let file_name_from_path = self.path.split('/').last().ok_or_else(|| 
{
+    //         Error::new(
+    //             ErrorKind::Unexpected,
+    //             "connection string must have AccountName",
+    //         )
+    //     })?;
+    //     let url = format!(
+    //         "{}/drive/root:{}:/createUploadSession",
+    //         OnedriveBackend::BASE_URL,
+    //         percent_encode_path(&self.path)
+    //     );
+    //     let body = 
OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string());
+    //
+    //     let resp = self
+    //         .backend
+    //         .onedrive_create_upload_session(&url, body)
+    //         .await?;
+    //
+    //     let status = resp.status();
+    //
+    //     match status {
+    //         // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response
+    //         StatusCode::OK => {
+    //             let bs = resp.into_body().bytes().await?;
+    //             let result: OneDriveUploadSessionCreationResponseBody =
+    //                 
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+    //             Ok(result)
+    //         }
+    //         _ => Err(parse_error(resp).await?),
+    //     }
+    // }
 }

Reply via email to