This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c25a6ac3 feat(services/onedrive): Implement `list`, `create_dir`, 
`stat` and uploading large files (#2231)
c25a6ac3 is described below

commit c25a6ac3c2d9033274505df625b68172b02b2655
Author: Daohan Chong <[email protected]>
AuthorDate: Tue May 9 23:55:36 2023 -0700

    feat(services/onedrive): Implement `list`, `create_dir`, `stat` and 
uploading large files (#2231)
    
    * Revert "update"
    
    This reverts commit 0d8e16cd7407d8d59bbffd85c44c94e5ab9c2ae0.
    
    * add license
    
    * implement pager
    
    * finish all implementation of list op
    
    * finish op
    
    * handle err
    
    * fix path
    
    * fix path
    
    * improve
    
    * improve code
    
    * fix all clippy
    
    * enable opendal behavior test
    
    * add example
    
    * clean up
    
    * update upload seesion
    
    * implement chunked upload
    
    * improve upload
    
    * update status code
    
    * update
    
    * fix
    
    * fix the entry of onedrive tests
    
    * fix get item body
    
    * fix
    
    * add content len
    
    * add delete op
    
    * simplify
    
    * create_dir [wip]
    
    * fix create dir
    
    * handle 404 of stat
    
    * false
    
    * update
    
    * update
    
    * fix delete
    
    * delete non exist
    
    * fix create [replace]
    
    * fixcre dir
    
    * disable scan
    
    * clean up
    
    * cleanup
    
    * clean up models
    
    * update
    
    * update
    
    * nest use
    
    * clean up
    
    * fmt
    
    * split get to get / get content
    
    * update
    
    * inline build_request_url()
    
    * use onedrive_chunked_upload
    
    * clean up
    
    * new struct CreateDirPayload
    
    * use get_parent
    
    * add onedrive_create_dir
    
    * refactor and clean up post requests
    
    * fix
    
    * use onedrive_upload_simple
    
    * add list_with_delimiter_slash
    
    * fix
---
 .env.example                              |   3 +
 core/src/services/onedrive/backend.rs     | 262 +++++++++++++++++++++++---
 core/src/services/onedrive/builder.rs     |   3 +-
 core/src/services/onedrive/graph_model.rs | 298 ++++++++++++++++++++++++++++++
 core/src/services/onedrive/mod.rs         |   2 +
 core/src/services/onedrive/pager.rs       | 140 ++++++++++++++
 core/src/services/onedrive/writer.rs      | 113 ++++++++++-
 core/tests/behavior/main.rs               |   2 +
 8 files changed, 788 insertions(+), 35 deletions(-)

diff --git a/.env.example b/.env.example
index 893a4b93..f4835d52 100644
--- a/.env.example
+++ b/.env.example
@@ -95,3 +95,6 @@ OPENDAL_SUPABASE_KEY=<service_key>
 # vercel artifacts
 OPENDAL_VERCEL_ARTIFACTS_TEST=false
 OPENDAL_VERCEL_ARTIFACTS_ACCESS_TOKEN=<token>
+# onedrive
+OPENDAL_ONEDRIVE_TEST=false
+OPENDAL_ONEDRIVE_ACCESS_TOKEN=<access_token>
diff --git a/core/src/services/onedrive/backend.rs 
b/core/src/services/onedrive/backend.rs
index 6e76dccb..0bf36f32 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -18,31 +18,33 @@
 use std::fmt::Debug;
 
 use async_trait::async_trait;
+use bytes::Bytes;
 use http::header;
 use http::Request;
 use http::Response;
 use http::StatusCode;
 
 use super::error::parse_error;
+use super::graph_model::CreateDirPayload;
+use super::graph_model::ItemType;
+use super::graph_model::OneDriveUploadSessionCreationRequestBody;
+use super::graph_model::OnedriveGetItemBody;
+use super::pager::OnedrivePager;
 use super::writer::OneDriveWriter;
-use crate::ops::OpRead;
-use crate::ops::OpWrite;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_request_build_error;
-use crate::raw::parse_into_metadata;
-use crate::raw::parse_location;
-use crate::raw::percent_encode_path;
-use crate::raw::Accessor;
-use crate::raw::AccessorInfo;
-use crate::raw::AsyncBody;
-use crate::raw::HttpClient;
-use crate::raw::IncomingAsyncBody;
-use crate::raw::RpRead;
-use crate::raw::RpWrite;
-use crate::types::Result;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
+use crate::ops::OpCreateDir;
+use crate::raw::get_parent;
+use crate::raw::RpCreateDir;
+use crate::{
+    ops::{OpDelete, OpList, OpRead, OpStat, OpWrite},
+    raw::{
+        build_abs_path, build_rooted_abs_path, get_basename, 
new_json_deserialize_error,
+        new_json_serialize_error, new_request_build_error, 
parse_datetime_from_rfc3339,
+        parse_into_metadata, parse_location, percent_encode_path, Accessor, 
AccessorInfo,
+        AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, RpList, RpRead, 
RpStat, RpWrite,
+    },
+    types::Result,
+    Capability, EntryMode, Error, ErrorKind, Metadata,
+};
 
 #[derive(Clone)]
 pub struct OnedriveBackend {
@@ -76,7 +78,7 @@ impl Accessor for OnedriveBackend {
     type BlockingReader = ();
     type Writer = OneDriveWriter;
     type BlockingWriter = ();
-    type Pager = ();
+    type Pager = OnedrivePager;
     type BlockingPager = ();
 
     fn info(&self) -> AccessorInfo {
@@ -85,11 +87,12 @@ impl Accessor for OnedriveBackend {
             .set_root(&self.root)
             .set_capability(Capability {
                 read: true,
-                read_can_next: true,
                 write: true,
+                stat: true,
+                delete: true,
+                create_dir: true,
                 list: true,
-                copy: true,
-                rename: true,
+                list_with_delimiter_slash: true,
                 ..Default::default()
             });
 
@@ -97,14 +100,13 @@ impl Accessor for OnedriveBackend {
     }
 
     async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.onedrive_get(path).await?;
+        let resp = self.onedrive_get_content(path).await?;
 
         let status = resp.status();
 
         if status.is_redirection() {
             let headers = resp.headers();
             let location = parse_location(headers)?;
-
             match location {
                 None => {
                     return Err(Error::new(
@@ -145,14 +147,135 @@ impl Accessor for OnedriveBackend {
             OneDriveWriter::new(self.clone(), args, path),
         ))
     }
+
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        // Stat root always returns a DIR.
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        let resp = self.onedrive_get_stat(path).await?;
+        let status = resp.status();
+
+        if status.is_success() {
+            let bytes = resp.into_body().bytes().await?;
+            let decoded_response = 
serde_json::from_slice::<OnedriveGetItemBody>(&bytes)
+                .map_err(new_json_deserialize_error)?;
+
+            let entry_mode: EntryMode = match decoded_response.item_type {
+                ItemType::Folder { .. } => EntryMode::DIR,
+                ItemType::File { .. } => EntryMode::FILE,
+            };
+
+            let mut meta = Metadata::new(entry_mode);
+            meta.set_etag(&decoded_response.e_tag);
+
+            let last_modified = decoded_response.last_modified_date_time;
+            let date_utc_last_modified = 
parse_datetime_from_rfc3339(&last_modified)?;
+            meta.set_last_modified(date_utc_last_modified);
+
+            meta.set_content_length(decoded_response.size);
+
+            Ok(RpStat::new(meta))
+        } else {
+            match status {
+                StatusCode::NOT_FOUND if path.ends_with('/') => {
+                    Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+                }
+                _ => Err(parse_error(resp).await?),
+            }
+        }
+    }
+
+    /// Delete operation
+    /// Documentation: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_delete?view=odsp-graph-online
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = self.onedrive_delete(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => 
Ok(RpDelete::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn list(&self, path: &str, _op_list: OpList) -> Result<(RpList, 
Self::Pager)> {
+        let pager: OnedrivePager = OnedrivePager::new(self.root.clone(), 
path.into(), self.clone());
+
+        Ok((RpList::default(), pager))
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let path = build_rooted_abs_path(&self.root, path);
+        let path_before_last_slash = get_parent(&path);
+        let encoded_path = percent_encode_path(path_before_last_slash);
+
+        let uri = format!(
+            "https://graph.microsoft.com/v1.0/me/drive/root:{}:/children";,
+            encoded_path
+        );
+
+        let folder_name = get_basename(&path);
+        let folder_name = folder_name.strip_suffix('/').unwrap_or(folder_name);
+
+        let body = CreateDirPayload::new(folder_name.to_string());
+
+        let response = self.onedrive_create_dir(&uri, body).await?;
+
+        let status = response.status();
+        match status {
+            StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
+            _ => Err(parse_error(response).await?),
+        }
+    }
 }
 
 impl OnedriveBackend {
-    async fn onedrive_get(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+    pub(crate) const BASE_URL: &'static str = 
"https://graph.microsoft.com/v1.0/me";;
+
+    async fn onedrive_get_stat(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
         let path = build_rooted_abs_path(&self.root, path);
         let url: String = format!(
-            "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content";,
+            "https://graph.microsoft.com/v1.0/me/drive/root:{}{}";,
+            percent_encode_path(&path),
+            ""
+        );
+
+        let mut req = Request::get(&url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub(crate) async fn onedrive_get_next_list_page(
+        &self,
+        url: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::get(url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    async fn onedrive_get_content(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_rooted_abs_path(&self.root, path);
+        let url: String = format!(
+            "https://graph.microsoft.com/v1.0/me/drive/root:{}{}";,
             percent_encode_path(&path),
+            ":/content"
         );
 
         let mut req = Request::get(&url);
@@ -180,7 +303,7 @@ impl OnedriveBackend {
         self.client.send(req).await
     }
 
-    pub async fn onedrive_put(
+    pub async fn onedrive_upload_simple(
         &self,
         path: &str,
         size: Option<usize>,
@@ -209,4 +332,89 @@ impl OnedriveBackend {
 
         self.client.send(req).await
     }
+
+    pub(crate) async fn onedrive_chunked_upload(
+        &self,
+        url: &str,
+        content_type: Option<&str>,
+        offset: usize,
+        chunk_end: usize,
+        total_len: usize,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::put(url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
+        req = req.header("Content-Range".to_string(), range);
+
+        let size = chunk_end - offset + 1;
+        req = req.header(header::CONTENT_LENGTH, size.to_string());
+
+        if let Some(mime) = content_type {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub(crate) async fn onedrive_create_upload_session(
+        &self,
+        url: &str,
+        body: OneDriveUploadSessionCreationRequestBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::post(url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        req = req.header(header::CONTENT_TYPE, "application/json");
+
+        let body_bytes = 
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+        let asyn_body = AsyncBody::Bytes(Bytes::from(body_bytes));
+        let req = req.body(asyn_body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    async fn onedrive_create_dir(
+        &self,
+        url: &str,
+        body: CreateDirPayload,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = Request::post(url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+        req = req.header(header::CONTENT_TYPE, "application/json");
+
+        let body_bytes = 
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+        let async_body = AsyncBody::Bytes(bytes::Bytes::from(body_bytes));
+        let req = req.body(async_body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub(crate) async fn onedrive_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+        let url = format!(
+            "https://graph.microsoft.com/v1.0/me/drive/root:/{}";,
+            percent_encode_path(&path)
+        );
+
+        let mut req = Request::delete(&url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
 }
diff --git a/core/src/services/onedrive/builder.rs 
b/core/src/services/onedrive/builder.rs
index beab47a0..63f00a8a 100644
--- a/core/src/services/onedrive/builder.rs
+++ b/core/src/services/onedrive/builder.rs
@@ -35,9 +35,9 @@ use crate::*;
 ///
 /// - [x] read
 /// - [x] write
+/// - [x] list
 /// - [ ] copy
 /// - [ ] rename
-/// - [ ] list
 /// - [ ] ~~scan~~
 /// - [ ] ~~presign~~
 /// - [ ] blocking
@@ -45,7 +45,6 @@ use crate::*;
 /// # Notes
 ///
 /// Currently, only OneDrive Personal is supported.
-/// For uploading, only files under 4MB are supported via the Simple Upload 
API 
(<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online>).
 ///
 /// # Configuration
 ///
diff --git a/core/src/services/onedrive/graph_model.rs 
b/core/src/services/onedrive/graph_model.rs
new file mode 100644
index 00000000..59cff4f2
--- /dev/null
+++ b/core/src/services/onedrive/graph_model.rs
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+use std::collections::HashMap;
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct GraphApiOnedriveListResponse {
+    #[serde(rename = "@odata.count")]
+    pub odata_count: usize,
+    #[serde(rename = "@odata.nextLink")]
+    pub next_link: Option<String>,
+    pub value: Vec<OneDriveItem>,
+}
+
+/// DriveItem representation
+/// 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/resources/list?view=odsp-graph-online#json-representation
+#[derive(Debug, Serialize, Deserialize)]
+pub struct OneDriveItem {
+    pub name: String,
+
+    #[serde(rename = "parentReference")]
+    pub parent_reference: ParentReference,
+
+    #[serde(flatten)]
+    pub item_type: ItemType,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ParentReference {
+    pub path: String,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
+#[serde(untagged)]
+pub enum ItemType {
+    Folder {
+        folder: Folder,
+        #[serde(rename = "specialFolder")]
+        special_folder: Option<HashMap<String, String>>,
+    },
+    File {
+        file: File,
+    },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct OnedriveGetItemBody {
+    #[serde(rename = "cTag")]
+    pub(crate) c_tag: String,
+    #[serde(rename = "eTag")]
+    pub(crate) e_tag: String,
+    id: String,
+    #[serde(rename = "lastModifiedDateTime")]
+    pub(crate) last_modified_date_time: String,
+    pub(crate) name: String,
+    pub(crate) size: u64,
+
+    #[serde(flatten)]
+    pub(crate) item_type: ItemType,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct File {
+    #[serde(rename = "mimeType")]
+    mime_type: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct Folder {
+    #[serde(rename = "childCount")]
+    child_count: i64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CreateDirPayload {
+    // folder: String,
+    #[serde(rename = "@microsoft.graph.conflictBehavior")]
+    conflict_behavior: String,
+    name: String,
+    folder: EmptyStruct,
+}
+
+impl CreateDirPayload {
+    pub fn new(name: String) -> Self {
+        Self {
+            conflict_behavior: "replace".to_string(),
+            name,
+            folder: EmptyStruct {},
+        }
+    }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct EmptyStruct {}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct FileUploadItem {
+    #[serde(rename = "@odata.type")]
+    odata_type: String,
+    #[serde(rename = "@microsoft.graph.conflictBehavior")]
+    microsoft_graph_conflict_behavior: String,
+    name: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OneDriveUploadSessionCreationResponseBody {
+    #[serde(rename = "uploadUrl")]
+    pub upload_url: String,
+    #[serde(rename = "expirationDateTime")]
+    pub expiration_date_time: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OneDriveUploadSessionCreationRequestBody {
+    item: FileUploadItem,
+}
+
+impl OneDriveUploadSessionCreationRequestBody {
+    pub fn new(path: String) -> Self {
+        OneDriveUploadSessionCreationRequestBody {
+            item: FileUploadItem {
+                odata_type: 
"microsoft.graph.driveItemUploadableProperties".to_string(),
+                microsoft_graph_conflict_behavior: "replace".to_string(),
+                name: path,
+            },
+        }
+    }
+}
+
+#[test]
+fn test_parse_one_drive_json() {
+    let data = r#"{
+        "@odata.context": 
"https://graph.microsoft.com/v1.0/$metadata#users('user_id')/drive/root/children",
+        "@odata.count": 1,
+        "value": [
+            {
+                "createdDateTime": "2020-01-01T00:00:00Z",
+                "cTag": "cTag",
+                "eTag": "eTag",
+                "id": "id",
+                "lastModifiedDateTime": "2020-01-01T00:00:00Z",
+                "name": "name",
+                "size": 0,
+                "webUrl": "webUrl",
+                "reactions": {
+                    "like": 0
+                },
+                "parentReference": {
+                    "driveId": "driveId",
+                    "driveType": "driveType",
+                    "id": "id",
+                    "path": "/drive/root:"
+                },
+                "fileSystemInfo": {
+                    "createdDateTime": "2020-01-01T00:00:00Z",
+                    "lastModifiedDateTime": "2020-01-01T00:00:00Z"
+                },
+                "folder": {
+                    "childCount": 0
+                },
+                "specialFolder": {
+                    "name": "name"
+                }
+            },
+            {
+                "createdDateTime": "2018-12-30T05:32:55.46Z",
+                "cTag": "sample",
+                "eTag": "sample",
+                "id": "ID!102",
+                "lastModifiedDateTime": "2018-12-30T05:33:23.557Z",
+                "name": "Getting started with OneDrive.pdf",
+                "size": 1025867,
+                "reactions": {
+                    "commentCount": 0
+                },
+                "createdBy": {
+                    "user": {
+                        "displayName": "Foo bar",
+                        "id": "ID"
+                    }
+                },
+                "lastModifiedBy": {
+                    "user": {
+                        "displayName": "Foo bar",
+                        "id": "32217fc1154aec3d"
+                    }
+                },
+                "parentReference": {
+                    "driveId": "32217fc1154aec3d",
+                    "driveType": "personal",
+                    "id": "32217FC1154AEC3D!101",
+                    "path": "/drive/root:"
+                },
+                "file": {
+                    "mimeType": "application/pdf",
+                },
+                "fileSystemInfo": {
+                    "createdDateTime": "2018-12-30T05:32:55.46Z",
+                    "lastModifiedDateTime": "2018-12-30T05:32:55.46Z"
+                }
+            }
+        ]
+    }"#;
+
+    let response: GraphApiOnedriveListResponse = 
serde_json::from_str(data).unwrap();
+    assert_eq!(response.odata_count, 1);
+    assert_eq!(response.value.len(), 2);
+    let item = &response.value[0];
+    assert_eq!(item.name, "name");
+}
+
+#[test]
+fn test_parse_folder_single() {
+    let response_json = r#"
+    {
+        "@odata.context": 
"https://graph.microsoft.com/v1.0/$metadata#users('great.cat%40outlook.com')/drive/root/children",
+        "@odata.count": 1,
+        "value": [
+          {
+            "createdDateTime": "2023-02-01T00:51:02.803Z",
+            "cTag": "sample",
+            "eTag": "sample",
+            "id": "ID!3003",
+            "lastModifiedDateTime": "2023-02-01T00:51:10.703Z",
+            "name": "misc",
+            "size": 1084627,
+            "webUrl": "sample",
+            "reactions": {
+              "commentCount": 0
+            },
+            "createdBy": {
+              "application": {
+                "displayName": "OneDrive",
+                "id": "481710a4"
+              },
+              "user": {
+                "displayName": "Foo bar",
+                "id": "01"
+              }
+            },
+            "lastModifiedBy": {
+              "application": {
+                "displayName": "OneDrive",
+                "id": "481710a4"
+              },
+              "user": {
+                "displayName": "Foo bar",
+                "id": "02"
+              }
+            },
+            "parentReference": {
+              "driveId": "ID",
+              "driveType": "personal",
+              "id": "ID!101",
+              "path": "/drive/root:"
+            },
+            "fileSystemInfo": {
+              "createdDateTime": "2023-02-01T00:51:02.803Z",
+              "lastModifiedDateTime": "2023-02-01T00:51:02.803Z"
+            },
+            "folder": {
+              "childCount": 9,
+              "view": {
+                "viewType": "thumbnails",
+                "sortBy": "name",
+                "sortOrder": "ascending"
+              }
+            }
+          }
+        ]
+      }"#;
+
+    let response: GraphApiOnedriveListResponse = 
serde_json::from_str(response_json).unwrap();
+    assert_eq!(response.odata_count, 1);
+    assert_eq!(response.value.len(), 1);
+    let item = &response.value[0];
+    if let ItemType::Folder { folder, .. } = &item.item_type {
+        assert_eq!(folder.child_count, serde_json::Value::Number(9.into()));
+    } else {
+        panic!("item_type is not folder");
+    }
+}
diff --git a/core/src/services/onedrive/mod.rs 
b/core/src/services/onedrive/mod.rs
index 0b58fc8c..8a016001 100644
--- a/core/src/services/onedrive/mod.rs
+++ b/core/src/services/onedrive/mod.rs
@@ -18,6 +18,8 @@
 mod backend;
 mod builder;
 mod error;
+mod graph_model;
 
 pub use builder::OnedriveBuilder as Onedrive;
+mod pager;
 mod writer;
diff --git a/core/src/services/onedrive/pager.rs 
b/core/src/services/onedrive/pager.rs
new file mode 100644
index 00000000..8a29c7cd
--- /dev/null
+++ b/core/src/services/onedrive/pager.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+    raw::{
+        build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
+        oio::{self},
+        percent_encode_path, IncomingAsyncBody,
+    },
+    EntryMode, Metadata,
+};
+
+use super::{
+    backend::OnedriveBackend,
+    error::parse_error,
+    graph_model::{GraphApiOnedriveListResponse, ItemType},
+};
+use crate::Result;
+use async_trait::async_trait;
+use http::Response;
+
+pub struct OnedrivePager {
+    root: String,
+    path: String,
+    backend: OnedriveBackend,
+    next_link: Option<String>,
+    done: bool,
+}
+
+impl OnedrivePager {
+    const DRIVE_ROOT_PREFIX: &'static str = "/drive/root:";
+
+    pub(crate) fn new(root: String, path: String, backend: OnedriveBackend) -> 
Self {
+        Self {
+            root,
+            path,
+            backend,
+            next_link: None,
+            done: false,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for OnedrivePager {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        if self.done {
+            return Ok(None);
+        }
+        let response = self.onedrive_get().await?;
+
+        let status_code = response.status();
+        if !status_code.is_success() {
+            if status_code == http::StatusCode::NOT_FOUND {
+                return Ok(None);
+            }
+            let error = parse_error(response).await?;
+            return Err(error);
+        }
+
+        let bytes = response.into_body().bytes().await?;
+        let decoded_response = 
serde_json::from_slice::<GraphApiOnedriveListResponse>(&bytes)
+            .map_err(new_json_deserialize_error)?;
+
+        if let Some(next_link) = decoded_response.next_link {
+            self.next_link = Some(next_link);
+        } else {
+            self.done = true;
+        }
+
+        let entries: Vec<oio::Entry> = decoded_response
+            .value
+            .into_iter()
+            .map(|drive_item| {
+                let name = drive_item.name;
+                let parent_path = drive_item.parent_reference.path;
+                let parent_path = parent_path
+                    .strip_prefix(Self::DRIVE_ROOT_PREFIX)
+                    .unwrap_or("");
+
+                let path = format!("{}/{}", parent_path, name);
+
+                let normalized_path = build_rel_path(&self.root, &path);
+
+                let entry: oio::Entry = match drive_item.item_type {
+                    ItemType::Folder { .. } => {
+                        let normalized_path = format!("{}/", normalized_path);
+                        oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::DIR))
+                    }
+                    ItemType::File { .. } => {
+                        oio::Entry::new(&normalized_path, 
Metadata::new(EntryMode::FILE))
+                    }
+                };
+                entry
+            })
+            .collect();
+
+        Ok(Some(entries))
+    }
+}
+
+impl OnedrivePager {
+    async fn onedrive_get(&mut self) -> Result<Response<IncomingAsyncBody>> {
+        let request_url = if let Some(next_link) = &self.next_link {
+            let next_link_clone = next_link.clone();
+            self.next_link = None;
+            next_link_clone
+        } else {
+            let path = build_rooted_abs_path(&self.root, &self.path);
+            let url: String = if path == "." || path == "/" {
+                
"https://graph.microsoft.com/v1.0/me/drive/root/children".to_string()
+            } else {
+                // According to OneDrive API examples, the path should not end 
with a slash.
+                // Reference: 
<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online>
+                let path = path.strip_suffix('/').unwrap_or("");
+                format!(
+                    
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/children";,
+                    percent_encode_path(path),
+                )
+            };
+            url
+        };
+
+        self.backend.onedrive_get_next_list_page(&request_url).await
+    }
+}
diff --git a/core/src/services/onedrive/writer.rs 
b/core/src/services/onedrive/writer.rs
index 4ac6062a..5779b4b6 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,11 +16,14 @@
 // under the License.
 
 use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use http::StatusCode;
 
 use super::backend::OnedriveBackend;
 use super::error::parse_error;
+use super::graph_model::{
+    OneDriveUploadSessionCreationRequestBody, 
OneDriveUploadSessionCreationResponseBody,
+};
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
@@ -33,6 +36,10 @@ pub struct OneDriveWriter {
 }
 
 impl OneDriveWriter {
+    const MAX_SIMPLE_SIZE: usize = 4 * 1024 * 1024;
+    // If your app splits a file into multiple byte ranges, the size of each 
byte range MUST be a multiple of 320 KiB (327,680 bytes). Using a fragment size 
that does not divide evenly by 320 KiB will result in errors committing some 
files.
+    // 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
+    const CHUNK_SIZE_FACTOR: usize = 327_680;
     pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self {
         OneDriveWriter { backend, op, path }
     }
@@ -41,9 +48,29 @@ impl OneDriveWriter {
 #[async_trait]
 impl oio::Write for OneDriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+
+        if size <= Self::MAX_SIMPLE_SIZE {
+            self.write_simple(bs).await
+        } else {
+            self.write_chunked(bs).await
+        }
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
+
+impl OneDriveWriter {
+    async fn write_simple(&mut self, bs: Bytes) -> Result<()> {
         let resp = self
             .backend
-            .onedrive_put(
+            .onedrive_upload_simple(
                 &self.path,
                 Some(bs.len()),
                 self.op.content_type(),
@@ -56,7 +83,7 @@ impl oio::Write for OneDriveWriter {
         match status {
             // Typical response code: 201 Created
             // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
-            StatusCode::CREATED => {
+            StatusCode::CREATED | StatusCode::OK => {
                 resp.into_body().consume().await?;
                 Ok(())
             }
@@ -64,11 +91,85 @@ impl oio::Write for OneDriveWriter {
         }
     }
 
-    async fn abort(&mut self) -> Result<()> {
+    pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> Result<()> 
{
+        // Upload large files via sessions: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
+        // 1. Create an upload session
+        // 2. Upload the bytes of each chunk
+        // 3. Commit the session
+
+        let session_response = self.create_upload_session().await?;
+
+        let mut offset = 0;
+
+        let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR);
+
+        for chunk in iter {
+            let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR;
+            if end > total_bytes.len() {
+                end = total_bytes.len();
+            }
+            let total_len = total_bytes.len();
+            let chunk_end = end - 1;
+
+            let resp = self
+                .backend
+                .onedrive_chunked_upload(
+                    &session_response.upload_url,
+                    None,
+                    offset,
+                    chunk_end,
+                    total_len,
+                    AsyncBody::Bytes(Bytes::copy_from_slice(chunk)),
+                )
+                .await?;
+
+            let status = resp.status();
+
+            match status {
+                // Typical response code: 202 Accepted
+                // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
+                StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK => 
{
+                    resp.into_body().consume().await?;
+                }
+                _ => return Err(parse_error(resp).await?),
+            }
+
+            offset += OneDriveWriter::CHUNK_SIZE_FACTOR;
+        }
+
         Ok(())
     }
 
-    async fn close(&mut self) -> Result<()> {
-        Ok(())
+    async fn create_upload_session(&self) -> 
Result<OneDriveUploadSessionCreationResponseBody> {
+        let file_name_from_path = self.path.split('/').last().ok_or_else(|| {
+            Error::new(
+                ErrorKind::Unexpected,
+                "connection string must have AccountName",
+            )
+        })?;
+        let url = format!(
+            "{}/drive/root:{}:/createUploadSession",
+            OnedriveBackend::BASE_URL,
+            percent_encode_path(&self.path)
+        );
+        let body = 
OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string());
+
+        let resp = self
+            .backend
+            .onedrive_create_upload_session(&url, body)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            // Reference: 
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+                let result: OneDriveUploadSessionCreationResponseBody =
+                    
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+                Ok(result)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
     }
 }
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index 692ad5c7..fd477f05 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -141,3 +141,5 @@ behavior_tests!(VercelArtifacts);
 behavior_tests!(Webdav);
 #[cfg(feature = "services-webhdfs")]
 behavior_tests!(Webhdfs);
+#[cfg(feature = "services-onedrive")]
+behavior_tests!(Onedrive);


Reply via email to