This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 69dd4546 feat(services/onedrive): Add read and write support for
OneDrive (#2129)
69dd4546 is described below
commit 69dd45466363dd530a0bf183423cddaaf8e46938
Author: Daohan Chong <[email protected]>
AuthorDate: Thu Apr 27 21:30:58 2023 -0600
feat(services/onedrive): Add read and write support for OneDrive (#2129)
* basic onedrive
* fix data model
* implement basic
* set hints
* fix fix fix
* fix all
* fix
* Squashed commit
* update res
* improve builder
* update
* finish builder
* fix read
* fix expose
* read impl
* update
* finish write
* Squashed commit of the following:
* clean up
* fix
* update
* update
* onedrive read and write
* update license header
* update
* fix formatting
* revert
* 201
* clean up
* use parse_location
* trigger ci
* update
* fix debug fmt
* remove access token
* update
---
core/Cargo.toml | 2 +
core/src/services/mod.rs | 6 +
core/src/services/onedrive/backend.rs | 200 ++++++++++++++++++++++++++++++++++
core/src/services/onedrive/builder.rs | 149 +++++++++++++++++++++++++
core/src/services/onedrive/error.rs | 49 +++++++++
core/src/services/onedrive/mod.rs | 23 ++++
core/src/services/onedrive/writer.rs | 74 +++++++++++++
core/src/services/webdav/backend.rs | 2 +-
core/src/types/scheme.rs | 3 +
9 files changed, 507 insertions(+), 1 deletion(-)
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b8bbd0e9..a8579cf4 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -49,6 +49,7 @@ default = [
"services-s3",
"services-webdav",
"services-webhdfs",
+ "services-onedrive",
]
# Build docs or not.
@@ -121,6 +122,7 @@ services-obs = [
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
]
+services-onedrive = []
services-oss = [
"dep:reqsign",
"reqsign?/services-aliyun",
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index ab0aec0a..a74de606 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -136,5 +136,11 @@ pub use webdav::Webdav;
#[cfg(feature = "services-webhdfs")]
mod webhdfs;
+
+#[cfg(feature = "services-onedrive")]
+mod onedrive;
+#[cfg(feature = "services-onedrive")]
+pub use onedrive::Onedrive;
+
#[cfg(feature = "services-webhdfs")]
pub use webhdfs::Webhdfs;
diff --git a/core/src/services/onedrive/backend.rs
b/core/src/services/onedrive/backend.rs
new file mode 100644
index 00000000..ce15965e
--- /dev/null
+++ b/core/src/services/onedrive/backend.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::{header, Request, Response, StatusCode};
+use std::fmt::Debug;
+
+use crate::{
+ ops::{OpRead, OpWrite},
+ raw::{
+ build_rooted_abs_path, new_request_build_error, parse_into_metadata,
parse_location,
+ percent_encode_path, Accessor, AccessorInfo, AsyncBody, HttpClient,
IncomingAsyncBody,
+ RpRead, RpWrite,
+ },
+ types::Result,
+ Capability, Error, ErrorKind,
+};
+
+use super::{error::parse_error, writer::OneDriveWriter};
+
+#[derive(Clone)]
+pub struct OnedriveBackend {
+ root: String,
+ access_token: String,
+ client: HttpClient,
+}
+
+impl OnedriveBackend {
+ pub(crate) fn new(root: String, access_token: String, http_client:
HttpClient) -> Self {
+ Self {
+ root,
+ access_token,
+ client: http_client,
+ }
+ }
+}
+
+impl Debug for OnedriveBackend {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut de = f.debug_struct("OneDriveBackend");
+ de.field("root", &self.root);
+ de.field("access_token", &self.access_token);
+ de.finish()
+ }
+}
+
+#[async_trait]
+impl Accessor for OnedriveBackend {
+ type Reader = IncomingAsyncBody;
+ type BlockingReader = ();
+ type Writer = OneDriveWriter;
+ type BlockingWriter = ();
+ type Pager = ();
+ type BlockingPager = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut ma = AccessorInfo::default();
+ ma.set_scheme(crate::Scheme::Onedrive)
+ .set_root(&self.root)
+ .set_capability(Capability {
+ read: true,
+ read_can_next: true,
+ write: true,
+ list: true,
+ copy: true,
+ rename: true,
+ ..Default::default()
+ });
+
+ ma
+ }
+
+ async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let resp = self.onedrive_get(path).await?;
+
+ let status = resp.status();
+
+ if status.is_redirection() {
+ let headers = resp.headers();
+ let location = parse_location(headers)?;
+
+ match location {
+ None => {
+ return Err(Error::new(
+ ErrorKind::ContentIncomplete,
+ "redirect location not found in response",
+ ));
+ }
+ Some(location) => {
+ let resp = self.onedrive_get_redirection(location).await?;
+ let meta = parse_into_metadata(path, resp.headers())?;
+ Ok((RpRead::with_metadata(meta), resp.into_body()))
+ }
+ }
+ } else {
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let meta = parse_into_metadata(path, resp.headers())?;
+ Ok((RpRead::with_metadata(meta), resp.into_body()))
+ }
+
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ if args.content_length().is_none() {
+ return Err(Error::new(
+ ErrorKind::Unsupported,
+ "write without content length is not supported",
+ ));
+ }
+
+ let path = build_rooted_abs_path(&self.root, path);
+
+ Ok((
+ RpWrite::default(),
+ OneDriveWriter::new(self.clone(), args, path),
+ ))
+ }
+}
+
+impl OnedriveBackend {
+ async fn onedrive_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path = build_rooted_abs_path(&self.root, path);
+ let url: String = format!(
+ "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content",
+ percent_encode_path(&path),
+ );
+
+ let mut req = Request::get(&url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ async fn onedrive_get_redirection(&self, url: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let mut req = Request::get(url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub async fn onedrive_put(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url = format!(
+ "https://graph.microsoft.com/v1.0/me/drive/root:{}:/content",
+ percent_encode_path(path)
+ );
+
+ let mut req = Request::put(&url);
+
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ if let Some(size) = size {
+ req = req.header(header::CONTENT_LENGTH, size)
+ }
+
+ if let Some(mime) = content_type {
+ req = req.header(header::CONTENT_TYPE, mime)
+ }
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+}
diff --git a/core/src/services/onedrive/builder.rs
b/core/src/services/onedrive/builder.rs
new file mode 100644
index 00000000..b1d020aa
--- /dev/null
+++ b/core/src/services/onedrive/builder.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+
+use log::debug;
+
+use super::backend::OnedriveBackend;
+use crate::raw::{normalize_root, HttpClient};
+use crate::Scheme;
+use crate::*;
+
+/// [OneDrive](https://onedrive.com) backend support.
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [ ] copy
+/// - [ ] rename
+/// - [ ] list
+/// - [ ] ~~scan~~
+/// - [ ] ~~presign~~
+/// - [ ] blocking
+///
+/// # Notes
+///
+/// Currently, only OneDrive Personal is supported.
+/// For uploading, only files under 4MB are supported via the Simple Upload
API
(<https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online>).
+///
+/// # Configuration
+///
+/// - `access_token`: set the access_token for Graph API
+/// - `root`: Set the work directory for backend
+///
+/// You can refer to [`OneDriveBuilder`]'s docs for more information
+///
+/// # Example
+///
+/// ## Via Builder
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Onedrive;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // create backend builder
+/// let mut builder = Onedrive::default();
+///
+/// builder
+/// .access_token("xxx")
+/// .root("/path/to/root");
+///
+/// let op: Operator = Operator::new(builder)?.finish();
+/// Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct OnedriveBuilder {
+ access_token: Option<String>,
+ root: Option<String>,
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for OnedriveBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend").field("root", &self.root).finish()
+ }
+}
+
+impl OnedriveBuilder {
+ /// set the bearer access token for OneDrive
+ ///
+ /// default: no access token, which leads to failure
+ pub fn access_token(&mut self, access_token: &str) -> &mut Self {
+ self.access_token = Some(access_token.to_string());
+ self
+ }
+
+ /// Set root path of OneDrive folder.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.root = Some(root.to_string());
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, http_client: HttpClient) -> &mut Self {
+ self.http_client = Some(http_client);
+ self
+ }
+}
+
+impl Builder for OnedriveBuilder {
+ const SCHEME: Scheme = Scheme::Onedrive;
+
+ type Accessor = OnedriveBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = Self::default();
+
+ map.get("root").map(|v| builder.root(v));
+ map.get("access_token").map(|v| builder.access_token(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let root = normalize_root(&self.root.take().unwrap_or_default());
+ debug!("backend use root {}", root);
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::Onedrive)
+ })?
+ };
+
+ match self.access_token.clone() {
+ Some(access_token) => Ok(OnedriveBackend::new(root, access_token,
client)),
+ None => Err(Error::new(ErrorKind::ConfigInvalid, "access_token not
set")),
+ }
+ }
+}
diff --git a/core/src/services/onedrive/error.rs
b/core/src/services/onedrive/error.rs
new file mode 100644
index 00000000..6fe6b57d
--- /dev/null
+++ b/core/src/services/onedrive/error.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use http::Response;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (kind, retryable) = match parts.status {
+ StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+ StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+ StatusCode::INTERNAL_SERVER_ERROR
+ | StatusCode::BAD_GATEWAY
+ | StatusCode::SERVICE_UNAVAILABLE
+ | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let mut err = Error::new(kind, &String::from_utf8_lossy(&bs))
+ .with_context("response", format!("{parts:?}"));
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
diff --git a/core/src/services/onedrive/mod.rs
b/core/src/services/onedrive/mod.rs
new file mode 100644
index 00000000..0b58fc8c
--- /dev/null
+++ b/core/src/services/onedrive/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+mod builder;
+mod error;
+
+pub use builder::OnedriveBuilder as Onedrive;
+mod writer;
diff --git a/core/src/services/onedrive/writer.rs
b/core/src/services/onedrive/writer.rs
new file mode 100644
index 00000000..4ac6062a
--- /dev/null
+++ b/core/src/services/onedrive/writer.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::backend::OnedriveBackend;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct OneDriveWriter {
+ backend: OnedriveBackend,
+
+ op: OpWrite,
+ path: String,
+}
+
+impl OneDriveWriter {
+ pub fn new(backend: OnedriveBackend, op: OpWrite, path: String) -> Self {
+ OneDriveWriter { backend, op, path }
+ }
+}
+
+#[async_trait]
+impl oio::Write for OneDriveWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let resp = self
+ .backend
+ .onedrive_put(
+ &self.path,
+ Some(bs.len()),
+ self.op.content_type(),
+ AsyncBody::Bytes(bs),
+ )
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ // Typical response code: 201 Created
+ // Reference:
https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
+ StatusCode::CREATED => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ Ok(())
+ }
+}
diff --git a/core/src/services/webdav/backend.rs
b/core/src/services/webdav/backend.rs
index 4a073d6e..2e9bd7e7 100644
--- a/core/src/services/webdav/backend.rs
+++ b/core/src/services/webdav/backend.rs
@@ -433,7 +433,7 @@ impl WebdavBackend {
) -> Result<Response<IncomingAsyncBody>> {
let p = build_rooted_abs_path(&self.root, path);
- let url = format!("{}{}", self.endpoint, percent_encode_path(&p));
+ let url: String = format!("{}{}", self.endpoint,
percent_encode_path(&p));
let mut req = Request::get(&url);
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 3673b172..5019d22e 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -61,6 +61,8 @@ pub enum Scheme {
Moka,
/// [obs][crate::services::Obs]: Huawei Cloud OBS services.
Obs,
+ /// [onedrive][crate::services::Onedrive]: Microsoft OneDrive services.
+ Onedrive,
/// [oss][crate::services::Oss]: Aliyun Object Storage Services
Oss,
/// [redis][crate::services::Redis]: Redis services
@@ -160,6 +162,7 @@ impl From<Scheme> for &'static str {
Scheme::Memory => "memory",
Scheme::Moka => "moka",
Scheme::Obs => "obs",
+ Scheme::Onedrive => "onedrive",
Scheme::Redis => "redis",
Scheme::Rocksdb => "rocksdb",
Scheme::S3 => "s3",