This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c25a6ac3 feat(services/onedrive): Implement `list`, `create_dir`,
`stat` and uploading large files (#2231)
c25a6ac3 is described below
commit c25a6ac3c2d9033274505df625b68172b02b2655
Author: Daohan Chong <[email protected]>
AuthorDate: Tue May 9 23:55:36 2023 -0700
feat(services/onedrive): Implement `list`, `create_dir`, `stat` and
uploading large files (#2231)
* Revert "update"
This reverts commit 0d8e16cd7407d8d59bbffd85c44c94e5ab9c2ae0.
* add license
* implement pager
* finish all implementation of list op
* finish op
* handle err
* fix path
* fix path
* improve
* improve code
* fix all clippy
* enable opendal behavior test
* add example
* clean up
* update upload seesion
* implement chunked upload
* improve upload
* update status code
* update
* fix
* fix the entry of onedrive tests
* fix get item body
* fix
* add content len
* add delete op
* simplify
* create_dir [wip]
* fix create dir
* handle 404 of stat
* false
* update
* update
* fix delete
* delete non exist
* fix create [replace]
* fixcre dir
* disable scan
* clean up
* cleanup
* clean up models
* update
* update
* nest use
* clean up
* fmt
* split get to get / get content
* update
* inline build_request_url()
* use onedrive_chunked_upload
* clean up
* new struct CreateDirPayload
* use get_parent
* add onedrive_create_dir
* refactor and clean up post requests
* fix
* use onedrive_upload_simple
* add list_with_delimiter_slash
* fix
---
.env.example | 3 +
core/src/services/onedrive/backend.rs | 262 +++++++++++++++++++++++---
core/src/services/onedrive/builder.rs | 3 +-
core/src/services/onedrive/graph_model.rs | 298 ++++++++++++++++++++++++++++++
core/src/services/onedrive/mod.rs | 2 +
core/src/services/onedrive/pager.rs | 140 ++++++++++++++
core/src/services/onedrive/writer.rs | 113 ++++++++++-
core/tests/behavior/main.rs | 2 +
8 files changed, 788 insertions(+), 35 deletions(-)
diff --git a/.env.example b/.env.example
index 893a4b93..f4835d52 100644
--- a/.env.example
+++ b/.env.example
@@ -95,3 +95,6 @@ OPENDAL_SUPABASE_KEY=<service_key>
# vercel artifacts
OPENDAL_VERCEL_ARTIFACTS_TEST=false
OPENDAL_VERCEL_ARTIFACTS_ACCESS_TOKEN=<token>
+# onedrive
+OPENDAL_ONEDRIVE_TEST=false
+OPENDAL_ONEDRIVE_ACCESS_TOKEN=<access_token>
diff --git a/core/src/services/onedrive/backend.rs
b/core/src/services/onedrive/backend.rs
index 6e76dccb..0bf36f32 100644
--- a/core/src/services/onedrive/backend.rs
+++ b/core/src/services/onedrive/backend.rs
@@ -18,31 +18,33 @@
use std::fmt::Debug;
use async_trait::async_trait;
+use bytes::Bytes;
use http::header;
use http::Request;
use http::Response;
use http::StatusCode;
use super::error::parse_error;
+use super::graph_model::CreateDirPayload;
+use super::graph_model::ItemType;
+use super::graph_model::OneDriveUploadSessionCreationRequestBody;
+use super::graph_model::OnedriveGetItemBody;
+use super::pager::OnedrivePager;
use super::writer::OneDriveWriter;
-use crate::ops::OpRead;
-use crate::ops::OpWrite;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_request_build_error;
-use crate::raw::parse_into_metadata;
-use crate::raw::parse_location;
-use crate::raw::percent_encode_path;
-use crate::raw::Accessor;
-use crate::raw::AccessorInfo;
-use crate::raw::AsyncBody;
-use crate::raw::HttpClient;
-use crate::raw::IncomingAsyncBody;
-use crate::raw::RpRead;
-use crate::raw::RpWrite;
-use crate::types::Result;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
+use crate::ops::OpCreateDir;
+use crate::raw::get_parent;
+use crate::raw::RpCreateDir;
+use crate::{
+ ops::{OpDelete, OpList, OpRead, OpStat, OpWrite},
+ raw::{
+ build_abs_path, build_rooted_abs_path, get_basename,
new_json_deserialize_error,
+ new_json_serialize_error, new_request_build_error,
parse_datetime_from_rfc3339,
+ parse_into_metadata, parse_location, percent_encode_path, Accessor,
AccessorInfo,
+ AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, RpList, RpRead,
RpStat, RpWrite,
+ },
+ types::Result,
+ Capability, EntryMode, Error, ErrorKind, Metadata,
+};
#[derive(Clone)]
pub struct OnedriveBackend {
@@ -76,7 +78,7 @@ impl Accessor for OnedriveBackend {
type BlockingReader = ();
type Writer = OneDriveWriter;
type BlockingWriter = ();
- type Pager = ();
+ type Pager = OnedrivePager;
type BlockingPager = ();
fn info(&self) -> AccessorInfo {
@@ -85,11 +87,12 @@ impl Accessor for OnedriveBackend {
.set_root(&self.root)
.set_capability(Capability {
read: true,
- read_can_next: true,
write: true,
+ stat: true,
+ delete: true,
+ create_dir: true,
list: true,
- copy: true,
- rename: true,
+ list_with_delimiter_slash: true,
..Default::default()
});
@@ -97,14 +100,13 @@ impl Accessor for OnedriveBackend {
}
async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.onedrive_get(path).await?;
+ let resp = self.onedrive_get_content(path).await?;
let status = resp.status();
if status.is_redirection() {
let headers = resp.headers();
let location = parse_location(headers)?;
-
match location {
None => {
return Err(Error::new(
@@ -145,14 +147,135 @@ impl Accessor for OnedriveBackend {
OneDriveWriter::new(self.clone(), args, path),
))
}
+
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let resp = self.onedrive_get_stat(path).await?;
+ let status = resp.status();
+
+ if status.is_success() {
+ let bytes = resp.into_body().bytes().await?;
+ let decoded_response =
serde_json::from_slice::<OnedriveGetItemBody>(&bytes)
+ .map_err(new_json_deserialize_error)?;
+
+ let entry_mode: EntryMode = match decoded_response.item_type {
+ ItemType::Folder { .. } => EntryMode::DIR,
+ ItemType::File { .. } => EntryMode::FILE,
+ };
+
+ let mut meta = Metadata::new(entry_mode);
+ meta.set_etag(&decoded_response.e_tag);
+
+ let last_modified = decoded_response.last_modified_date_time;
+ let date_utc_last_modified =
parse_datetime_from_rfc3339(&last_modified)?;
+ meta.set_last_modified(date_utc_last_modified);
+
+ meta.set_content_length(decoded_response.size);
+
+ Ok(RpStat::new(meta))
+ } else {
+ match status {
+ StatusCode::NOT_FOUND if path.ends_with('/') => {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+ }
+
+ /// Delete operation
+ /// Documentation:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_delete?view=odsp-graph-online
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let resp = self.onedrive_delete(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::NO_CONTENT | StatusCode::NOT_FOUND =>
Ok(RpDelete::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn list(&self, path: &str, _op_list: OpList) -> Result<(RpList,
Self::Pager)> {
+ let pager: OnedrivePager = OnedrivePager::new(self.root.clone(),
path.into(), self.clone());
+
+ Ok((RpList::default(), pager))
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ let path = build_rooted_abs_path(&self.root, path);
+ let path_before_last_slash = get_parent(&path);
+ let encoded_path = percent_encode_path(path_before_last_slash);
+
+ let uri = format!(
+ "https://graph.microsoft.com/v1.0/me/drive/root:{}:/children",
+ encoded_path
+ );
+
+ let folder_name = get_basename(&path);
+ let folder_name = folder_name.strip_suffix('/').unwrap_or(folder_name);
+
+ let body = CreateDirPayload::new(folder_name.to_string());
+
+ let response = self.onedrive_create_dir(&uri, body).await?;
+
+ let status = response.status();
+ match status {
+ StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
+ _ => Err(parse_error(response).await?),
+ }
+ }
}
impl OnedriveBackend {
- async fn onedrive_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ pub(crate) const BASE_URL: &'static str =
"https://graph.microsoft.com/v1.0/me";
+
+ async fn onedrive_get_stat(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
let path = build_rooted_abs_path(&self.root, path);
let url: String = format!(
- "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content",
+ "https://graph.microsoft.com/v1.0/me/drive/root:{}{}",
+ percent_encode_path(&path),
+ ""
+ );
+
+ let mut req = Request::get(&url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub(crate) async fn onedrive_get_next_list_page(
+ &self,
+ url: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::get(url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ async fn onedrive_get_content(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path = build_rooted_abs_path(&self.root, path);
+ let url: String = format!(
+ "https://graph.microsoft.com/v1.0/me/drive/root:{}{}",
percent_encode_path(&path),
+ ":/content"
);
let mut req = Request::get(&url);
@@ -180,7 +303,7 @@ impl OnedriveBackend {
self.client.send(req).await
}
- pub async fn onedrive_put(
+ pub async fn onedrive_upload_simple(
&self,
path: &str,
size: Option<usize>,
@@ -209,4 +332,89 @@ impl OnedriveBackend {
self.client.send(req).await
}
+
+ pub(crate) async fn onedrive_chunked_upload(
+ &self,
+ url: &str,
+ content_type: Option<&str>,
+ offset: usize,
+ chunk_end: usize,
+ total_len: usize,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::put(url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let range = format!("bytes {}-{}/{}", offset, chunk_end, total_len);
+ req = req.header("Content-Range".to_string(), range);
+
+ let size = chunk_end - offset + 1;
+ req = req.header(header::CONTENT_LENGTH, size.to_string());
+
+ if let Some(mime) = content_type {
+ req = req.header(header::CONTENT_TYPE, mime)
+ }
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub(crate) async fn onedrive_create_upload_session(
+ &self,
+ url: &str,
+ body: OneDriveUploadSessionCreationRequestBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::post(url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ req = req.header(header::CONTENT_TYPE, "application/json");
+
+ let body_bytes =
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+ let asyn_body = AsyncBody::Bytes(Bytes::from(body_bytes));
+ let req = req.body(asyn_body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ async fn onedrive_create_dir(
+ &self,
+ url: &str,
+ body: CreateDirPayload,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::post(url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+ req = req.header(header::CONTENT_TYPE, "application/json");
+
+ let body_bytes =
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+ let async_body = AsyncBody::Bytes(bytes::Bytes::from(body_bytes));
+ let req = req.body(async_body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub(crate) async fn onedrive_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path = build_abs_path(&self.root, path);
+ let url = format!(
+ "https://graph.microsoft.com/v1.0/me/drive/root:/{}",
+ percent_encode_path(&path)
+ );
+
+ let mut req = Request::delete(&url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
}
diff --git a/core/src/services/onedrive/builder.rs
b/core/src/services/onedrive/builder.rs
index beab47a0..63f00a8a 100644
--- a/core/src/services/onedrive/builder.rs
+++ b/core/src/services/onedrive/builder.rs
@@ -35,9 +35,9 @@ use crate::*;
///
/// - [x] read
/// - [x] write
+/// - [x] list
/// - [ ] copy
/// - [ ] rename
-/// - [ ] list
/// - [ ] ~~scan~~
/// - [ ] ~~presign~~
/// - [ ] blocking
@@ -45,7 +45,6 @@ use crate::*;
/// # Notes
///
/// Currently, only OneDrive Personal is supported.
-/// For uploading, only files under 4MB are supported via the Simple Upload
API
(<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online>).
///
/// # Configuration
///
diff --git a/core/src/services/onedrive/graph_model.rs
b/core/src/services/onedrive/graph_model.rs
new file mode 100644
index 00000000..59cff4f2
--- /dev/null
+++ b/core/src/services/onedrive/graph_model.rs
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use serde::{Deserialize, Serialize};
+
+use std::collections::HashMap;
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct GraphApiOnedriveListResponse {
+ #[serde(rename = "@odata.count")]
+ pub odata_count: usize,
+ #[serde(rename = "@odata.nextLink")]
+ pub next_link: Option<String>,
+ pub value: Vec<OneDriveItem>,
+}
+
+/// DriveItem representation
+///
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/resources/list?view=odsp-graph-online#json-representation
+#[derive(Debug, Serialize, Deserialize)]
+pub struct OneDriveItem {
+ pub name: String,
+
+ #[serde(rename = "parentReference")]
+ pub parent_reference: ParentReference,
+
+ #[serde(flatten)]
+ pub item_type: ItemType,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+pub struct ParentReference {
+ pub path: String,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
+#[serde(untagged)]
+pub enum ItemType {
+ Folder {
+ folder: Folder,
+ #[serde(rename = "specialFolder")]
+ special_folder: Option<HashMap<String, String>>,
+ },
+ File {
+ file: File,
+ },
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct OnedriveGetItemBody {
+ #[serde(rename = "cTag")]
+ pub(crate) c_tag: String,
+ #[serde(rename = "eTag")]
+ pub(crate) e_tag: String,
+ id: String,
+ #[serde(rename = "lastModifiedDateTime")]
+ pub(crate) last_modified_date_time: String,
+ pub(crate) name: String,
+ pub(crate) size: u64,
+
+ #[serde(flatten)]
+ pub(crate) item_type: ItemType,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct File {
+ #[serde(rename = "mimeType")]
+ mime_type: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
+pub struct Folder {
+ #[serde(rename = "childCount")]
+ child_count: i64,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CreateDirPayload {
+ // folder: String,
+ #[serde(rename = "@microsoft.graph.conflictBehavior")]
+ conflict_behavior: String,
+ name: String,
+ folder: EmptyStruct,
+}
+
+impl CreateDirPayload {
+ pub fn new(name: String) -> Self {
+ Self {
+ conflict_behavior: "replace".to_string(),
+ name,
+ folder: EmptyStruct {},
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct EmptyStruct {}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+struct FileUploadItem {
+ #[serde(rename = "@odata.type")]
+ odata_type: String,
+ #[serde(rename = "@microsoft.graph.conflictBehavior")]
+ microsoft_graph_conflict_behavior: String,
+ name: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OneDriveUploadSessionCreationResponseBody {
+ #[serde(rename = "uploadUrl")]
+ pub upload_url: String,
+ #[serde(rename = "expirationDateTime")]
+ pub expiration_date_time: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct OneDriveUploadSessionCreationRequestBody {
+ item: FileUploadItem,
+}
+
+impl OneDriveUploadSessionCreationRequestBody {
+ pub fn new(path: String) -> Self {
+ OneDriveUploadSessionCreationRequestBody {
+ item: FileUploadItem {
+ odata_type:
"microsoft.graph.driveItemUploadableProperties".to_string(),
+ microsoft_graph_conflict_behavior: "replace".to_string(),
+ name: path,
+ },
+ }
+ }
+}
+
+#[test]
+fn test_parse_one_drive_json() {
+ let data = r#"{
+ "@odata.context":
"https://graph.microsoft.com/v1.0/$metadata#users('user_id')/drive/root/children",
+ "@odata.count": 1,
+ "value": [
+ {
+ "createdDateTime": "2020-01-01T00:00:00Z",
+ "cTag": "cTag",
+ "eTag": "eTag",
+ "id": "id",
+ "lastModifiedDateTime": "2020-01-01T00:00:00Z",
+ "name": "name",
+ "size": 0,
+ "webUrl": "webUrl",
+ "reactions": {
+ "like": 0
+ },
+ "parentReference": {
+ "driveId": "driveId",
+ "driveType": "driveType",
+ "id": "id",
+ "path": "/drive/root:"
+ },
+ "fileSystemInfo": {
+ "createdDateTime": "2020-01-01T00:00:00Z",
+ "lastModifiedDateTime": "2020-01-01T00:00:00Z"
+ },
+ "folder": {
+ "childCount": 0
+ },
+ "specialFolder": {
+ "name": "name"
+ }
+ },
+ {
+ "createdDateTime": "2018-12-30T05:32:55.46Z",
+ "cTag": "sample",
+ "eTag": "sample",
+ "id": "ID!102",
+ "lastModifiedDateTime": "2018-12-30T05:33:23.557Z",
+ "name": "Getting started with OneDrive.pdf",
+ "size": 1025867,
+ "reactions": {
+ "commentCount": 0
+ },
+ "createdBy": {
+ "user": {
+ "displayName": "Foo bar",
+ "id": "ID"
+ }
+ },
+ "lastModifiedBy": {
+ "user": {
+ "displayName": "Foo bar",
+ "id": "32217fc1154aec3d"
+ }
+ },
+ "parentReference": {
+ "driveId": "32217fc1154aec3d",
+ "driveType": "personal",
+ "id": "32217FC1154AEC3D!101",
+ "path": "/drive/root:"
+ },
+ "file": {
+ "mimeType": "application/pdf",
+ },
+ "fileSystemInfo": {
+ "createdDateTime": "2018-12-30T05:32:55.46Z",
+ "lastModifiedDateTime": "2018-12-30T05:32:55.46Z"
+ }
+ }
+ ]
+ }"#;
+
+ let response: GraphApiOnedriveListResponse =
serde_json::from_str(data).unwrap();
+ assert_eq!(response.odata_count, 1);
+ assert_eq!(response.value.len(), 2);
+ let item = &response.value[0];
+ assert_eq!(item.name, "name");
+}
+
+#[test]
+fn test_parse_folder_single() {
+ let response_json = r#"
+ {
+ "@odata.context":
"https://graph.microsoft.com/v1.0/$metadata#users('great.cat%40outlook.com')/drive/root/children",
+ "@odata.count": 1,
+ "value": [
+ {
+ "createdDateTime": "2023-02-01T00:51:02.803Z",
+ "cTag": "sample",
+ "eTag": "sample",
+ "id": "ID!3003",
+ "lastModifiedDateTime": "2023-02-01T00:51:10.703Z",
+ "name": "misc",
+ "size": 1084627,
+ "webUrl": "sample",
+ "reactions": {
+ "commentCount": 0
+ },
+ "createdBy": {
+ "application": {
+ "displayName": "OneDrive",
+ "id": "481710a4"
+ },
+ "user": {
+ "displayName": "Foo bar",
+ "id": "01"
+ }
+ },
+ "lastModifiedBy": {
+ "application": {
+ "displayName": "OneDrive",
+ "id": "481710a4"
+ },
+ "user": {
+ "displayName": "Foo bar",
+ "id": "02"
+ }
+ },
+ "parentReference": {
+ "driveId": "ID",
+ "driveType": "personal",
+ "id": "ID!101",
+ "path": "/drive/root:"
+ },
+ "fileSystemInfo": {
+ "createdDateTime": "2023-02-01T00:51:02.803Z",
+ "lastModifiedDateTime": "2023-02-01T00:51:02.803Z"
+ },
+ "folder": {
+ "childCount": 9,
+ "view": {
+ "viewType": "thumbnails",
+ "sortBy": "name",
+ "sortOrder": "ascending"
+ }
+ }
+ }
+ ]
+ }"#;
+
+ let response: GraphApiOnedriveListResponse =
serde_json::from_str(response_json).unwrap();
+ assert_eq!(response.odata_count, 1);
+ assert_eq!(response.value.len(), 1);
+ let item = &response.value[0];
+ if let ItemType::Folder { folder, .. } = &item.item_type {
+ assert_eq!(folder.child_count, serde_json::Value::Number(9.into()));
+ } else {
+ panic!("item_type is not folder");
+ }
+}
diff --git a/core/src/services/onedrive/mod.rs
b/core/src/services/onedrive/mod.rs
index 0b58fc8c..8a016001 100644
--- a/core/src/services/onedrive/mod.rs
+++ b/core/src/services/onedrive/mod.rs
@@ -18,6 +18,8 @@
mod backend;
mod builder;
mod error;
+mod graph_model;
pub use builder::OnedriveBuilder as Onedrive;
+mod pager;
mod writer;
diff --git a/core/src/services/onedrive/pager.rs
b/core/src/services/onedrive/pager.rs
new file mode 100644
index 00000000..8a29c7cd
--- /dev/null
+++ b/core/src/services/onedrive/pager.rs
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::{
+ raw::{
+ build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
+ oio::{self},
+ percent_encode_path, IncomingAsyncBody,
+ },
+ EntryMode, Metadata,
+};
+
+use super::{
+ backend::OnedriveBackend,
+ error::parse_error,
+ graph_model::{GraphApiOnedriveListResponse, ItemType},
+};
+use crate::Result;
+use async_trait::async_trait;
+use http::Response;
+
+pub struct OnedrivePager {
+ root: String,
+ path: String,
+ backend: OnedriveBackend,
+ next_link: Option<String>,
+ done: bool,
+}
+
+impl OnedrivePager {
+ const DRIVE_ROOT_PREFIX: &'static str = "/drive/root:";
+
+ pub(crate) fn new(root: String, path: String, backend: OnedriveBackend) ->
Self {
+ Self {
+ root,
+ path,
+ backend,
+ next_link: None,
+ done: false,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for OnedrivePager {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ if self.done {
+ return Ok(None);
+ }
+ let response = self.onedrive_get().await?;
+
+ let status_code = response.status();
+ if !status_code.is_success() {
+ if status_code == http::StatusCode::NOT_FOUND {
+ return Ok(None);
+ }
+ let error = parse_error(response).await?;
+ return Err(error);
+ }
+
+ let bytes = response.into_body().bytes().await?;
+ let decoded_response =
serde_json::from_slice::<GraphApiOnedriveListResponse>(&bytes)
+ .map_err(new_json_deserialize_error)?;
+
+ if let Some(next_link) = decoded_response.next_link {
+ self.next_link = Some(next_link);
+ } else {
+ self.done = true;
+ }
+
+ let entries: Vec<oio::Entry> = decoded_response
+ .value
+ .into_iter()
+ .map(|drive_item| {
+ let name = drive_item.name;
+ let parent_path = drive_item.parent_reference.path;
+ let parent_path = parent_path
+ .strip_prefix(Self::DRIVE_ROOT_PREFIX)
+ .unwrap_or("");
+
+ let path = format!("{}/{}", parent_path, name);
+
+ let normalized_path = build_rel_path(&self.root, &path);
+
+ let entry: oio::Entry = match drive_item.item_type {
+ ItemType::Folder { .. } => {
+ let normalized_path = format!("{}/", normalized_path);
+ oio::Entry::new(&normalized_path,
Metadata::new(EntryMode::DIR))
+ }
+ ItemType::File { .. } => {
+ oio::Entry::new(&normalized_path,
Metadata::new(EntryMode::FILE))
+ }
+ };
+ entry
+ })
+ .collect();
+
+ Ok(Some(entries))
+ }
+}
+
+impl OnedrivePager {
+ async fn onedrive_get(&mut self) -> Result<Response<IncomingAsyncBody>> {
+ let request_url = if let Some(next_link) = &self.next_link {
+ let next_link_clone = next_link.clone();
+ self.next_link = None;
+ next_link_clone
+ } else {
+ let path = build_rooted_abs_path(&self.root, &self.path);
+ let url: String = if path == "." || path == "/" {
+
"https://graph.microsoft.com/v1.0/me/drive/root/children".to_string()
+ } else {
+ // According to OneDrive API examples, the path should not end
with a slash.
+ // Reference:
<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_list_children?view=odsp-graph-online>
+ let path = path.strip_suffix('/').unwrap_or("");
+ format!(
+
"https://graph.microsoft.com/v1.0/me/drive/root:{}:/children",
+ percent_encode_path(path),
+ )
+ };
+ url
+ };
+
+ self.backend.onedrive_get_next_list_page(&request_url).await
+ }
+}
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
index 4ac6062a..5779b4b6 100644
--- a/core/src/services/onedrive/writer.rs
+++ b/core/src/services/onedrive/writer.rs
@@ -16,11 +16,14 @@
// under the License.
use async_trait::async_trait;
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use http::StatusCode;
use super::backend::OnedriveBackend;
use super::error::parse_error;
+use super::graph_model::{
+ OneDriveUploadSessionCreationRequestBody,
OneDriveUploadSessionCreationResponseBody,
+};
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
@@ -33,6 +36,10 @@ pub struct OneDriveWriter {
}
impl OneDriveWriter {
+ const MAX_SIMPLE_SIZE: usize = 4 * 1024 * 1024;
+ // If your app splits a file into multiple byte ranges, the size of each
byte range MUST be a multiple of 320 KiB (327,680 bytes). Using a fragment size
that does not divide evenly by 320 KiB will result in errors committing some
files.
+ //
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
+ const CHUNK_SIZE_FACTOR: usize = 327_680;
pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self {
OneDriveWriter { backend, op, path }
}
@@ -41,9 +48,29 @@ impl OneDriveWriter {
#[async_trait]
impl oio::Write for OneDriveWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+
+ if size <= Self::MAX_SIMPLE_SIZE {
+ self.write_simple(bs).await
+ } else {
+ self.write_chunked(bs).await
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
+
+impl OneDriveWriter {
+ async fn write_simple(&mut self, bs: Bytes) -> Result<()> {
let resp = self
.backend
- .onedrive_put(
+ .onedrive_upload_simple(
&self.path,
Some(bs.len()),
self.op.content_type(),
@@ -56,7 +83,7 @@ impl oio::Write for OneDriveWriter {
match status {
// Typical response code: 201 Created
// Reference:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
- StatusCode::CREATED => {
+ StatusCode::CREATED | StatusCode::OK => {
resp.into_body().consume().await?;
Ok(())
}
@@ -64,11 +91,85 @@ impl oio::Write for OneDriveWriter {
}
}
- async fn abort(&mut self) -> Result<()> {
+ pub(crate) async fn write_chunked(&self, total_bytes: Bytes) -> Result<()>
{
+ // Upload large files via sessions:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#upload-bytes-to-the-upload-session
+ // 1. Create an upload session
+ // 2. Upload the bytes of each chunk
+ // 3. Commit the session
+
+ let session_response = self.create_upload_session().await?;
+
+ let mut offset = 0;
+
+ let iter = total_bytes.chunks(OneDriveWriter::CHUNK_SIZE_FACTOR);
+
+ for chunk in iter {
+ let mut end = offset + OneDriveWriter::CHUNK_SIZE_FACTOR;
+ if end > total_bytes.len() {
+ end = total_bytes.len();
+ }
+ let total_len = total_bytes.len();
+ let chunk_end = end - 1;
+
+ let resp = self
+ .backend
+ .onedrive_chunked_upload(
+ &session_response.upload_url,
+ None,
+ offset,
+ chunk_end,
+ total_len,
+ AsyncBody::Bytes(Bytes::copy_from_slice(chunk)),
+ )
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ // Typical response code: 202 Accepted
+ // Reference:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
+ StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::OK =>
{
+ resp.into_body().consume().await?;
+ }
+ _ => return Err(parse_error(resp).await?),
+ }
+
+ offset += OneDriveWriter::CHUNK_SIZE_FACTOR;
+ }
+
Ok(())
}
- async fn close(&mut self) -> Result<()> {
- Ok(())
+ async fn create_upload_session(&self) ->
Result<OneDriveUploadSessionCreationResponseBody> {
+ let file_name_from_path = self.path.split('/').last().ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "connection string must have AccountName",
+ )
+ })?;
+ let url = format!(
+ "{}/drive/root:{}:/createUploadSession",
+ OnedriveBackend::BASE_URL,
+ percent_encode_path(&self.path)
+ );
+ let body =
OneDriveUploadSessionCreationRequestBody::new(file_name_from_path.to_string());
+
+ let resp = self
+ .backend
+ .onedrive_create_upload_session(&url, body)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ // Reference:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_createuploadsession?view=odsp-graph-online#response
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+ let result: OneDriveUploadSessionCreationResponseBody =
+
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+ Ok(result)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
}
}
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index 692ad5c7..fd477f05 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -141,3 +141,5 @@ behavior_tests!(VercelArtifacts);
behavior_tests!(Webdav);
#[cfg(feature = "services-webhdfs")]
behavior_tests!(Webhdfs);
+#[cfg(feature = "services-onedrive")]
+behavior_tests!(Onedrive);