This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new bf264084 feat(services/gdrive): Add read & write & delete support for 
GoogleDrive (#2184)
bf264084 is described below

commit bf2640841f5244579709cbd41a4fe503a8986404
Author: Flash <[email protected]>
AuthorDate: Mon May 1 22:18:47 2023 +0800

    feat(services/gdrive): Add read & write & delete support for GoogleDrive 
(#2184)
    
    add gdrive service read & write & delete support
---
 core/Cargo.toml                     |   2 +
 core/src/services/gdrive/backend.rs | 252 ++++++++++++++++++++++++++++++++++++
 core/src/services/gdrive/builder.rs | 148 +++++++++++++++++++++
 core/src/services/gdrive/error.rs   |  49 +++++++
 core/src/services/gdrive/mod.rs     |  23 ++++
 core/src/services/gdrive/writer.rs  |  72 +++++++++++
 core/src/services/mod.rs            |   6 +
 core/src/types/scheme.rs            |   3 +
 8 files changed, 555 insertions(+)

diff --git a/core/Cargo.toml b/core/Cargo.toml
index 5dc6170c..6cc1341d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -50,6 +50,7 @@ default = [
   "services-webdav",
   "services-webhdfs",
   "services-onedrive",
+  "services-gdrive",
 ]
 
 # Build docs or not.
@@ -123,6 +124,7 @@ services-obs = [
   "reqsign?/reqwest_request",
 ]
 services-onedrive = []
+services-gdrive = []
 services-oss = [
   "dep:reqsign",
   "reqsign?/services-aliyun",
diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
new file mode 100644
index 00000000..babda427
--- /dev/null
+++ b/core/src/services/gdrive/backend.rs
@@ -0,0 +1,252 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use http::{header, Request, Response, StatusCode};
+use serde::Deserialize;
+use std::fmt::Debug;
+
+use crate::{
+    ops::{OpDelete, OpRead, OpWrite},
+    raw::{
+        build_rooted_abs_path, new_request_build_error, parse_into_metadata, 
Accessor,
+        AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, 
RpRead, RpWrite,
+    },
+    types::Result,
+    Capability, Error, ErrorKind,
+};
+
+use super::{error::parse_error, writer::GdriveWriter};
+
+#[derive(Clone)]
+pub struct GdriveBackend {
+    root: String,
+    access_token: String,
+    client: HttpClient,
+}
+
+impl GdriveBackend {
+    pub(crate) fn new(root: String, access_token: String, http_client: 
HttpClient) -> Self {
+        GdriveBackend {
+            root,
+            access_token,
+            client: http_client,
+        }
+    }
+}
+
+impl Debug for GdriveBackend {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut de = f.debug_struct("GoolgeDriveBackend");
+        de.field("root", &self.root);
+        de.finish()
+    }
+}
+
+#[async_trait]
+impl Accessor for GdriveBackend {
+    type Reader = IncomingAsyncBody;
+    type BlockingReader = ();
+    type Writer = GdriveWriter;
+    type BlockingWriter = ();
+    type Pager = ();
+    type BlockingPager = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut ma = AccessorInfo::default();
+        ma.set_scheme(crate::Scheme::Gdrive)
+            .set_root(&self.root)
+            .set_capability(Capability {
+                read: true,
+                write: true,
+                delete: true,
+                ..Default::default()
+            });
+
+        ma
+    }
+
+    async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self.gdrive_get(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let meta = parse_into_metadata(path, resp.headers())?;
+                Ok((RpRead::with_metadata(meta), resp.into_body()))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        if args.content_length().is_none() {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                "write without content length is not supported",
+            ));
+        }
+
+        Ok((
+            RpWrite::default(),
+            GdriveWriter::new(self.clone(), args, String::from(path)),
+        ))
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = self.gdrive_delete(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::NO_CONTENT => Ok(RpDelete::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
+
+impl GdriveBackend {
+    async fn get_abs_root_id(&self) -> String {
+        let mut req = 
Request::get("https://www.googleapis.com/drive/v3/files/root";);
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)
+            .unwrap();
+
+        let resp = self.client.send(req).await.unwrap();
+
+        let body_value: GdriveFile =
+            
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
+        let root_id = String::from(body_value.id.as_str());
+        root_id
+    }
+
+    async fn get_file_id_by_path(&self, file_path: &str) -> String {
+        let path = build_rooted_abs_path(&self.root, file_path);
+        let auth_header_content = format!("Bearer {}", self.access_token);
+
+        let mut parent_id = self.get_abs_root_id().await;
+        let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
+
+        for (i, item) in file_path_items.iter().enumerate() {
+            let mut query = format!(
+                "name = '{}' and parents = '{}' and trashed = false",
+                item, parent_id
+            );
+            if i != file_path_items.len() - 1 {
+                query += "and mimeType = 'application/vnd.google-apps.folder'";
+            }
+            let query: String = query.chars().filter(|c| 
!c.is_whitespace()).collect();
+
+            let mut req = Request::get(format!(
+                "https://www.googleapis.com/drive/v3/files?q={}";,
+                query
+            ));
+            req = req.header(header::AUTHORIZATION, &auth_header_content);
+            let req = req
+                .body(AsyncBody::default())
+                .map_err(new_request_build_error)
+                .unwrap();
+
+            let resp = self.client.send(req).await.unwrap();
+
+            let body_value: GdriveFileList =
+                
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
+            parent_id = String::from(body_value.files[0].id.as_str());
+        }
+
+        parent_id
+    }
+
+    async fn gdrive_get(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let url: String = format!(
+            "https://www.googleapis.com/drive/v3/files/{}?alt=media";,
+            self.get_file_id_by_path(path).await
+        );
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        let mut req = Request::get(&url);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub async fn gdrive_update(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            "https://www.googleapis.com/upload/drive/v3/files/{}";,
+            self.get_file_id_by_path(path).await
+        );
+
+        let mut req = Request::patch(&url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size)
+        }
+
+        if let Some(mime) = content_type {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    async fn gdrive_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            "https://www.googleapis.com/drive/v3/files/{}";,
+            self.get_file_id_by_path(path).await
+        );
+
+        let mut req = Request::delete(&url);
+
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+}
+
+#[derive(Deserialize)]
+struct GdriveFile {
+    id: String,
+}
+
+#[derive(Deserialize)]
+struct GdriveFileList {
+    files: Vec<GdriveFile>,
+}
diff --git a/core/src/services/gdrive/builder.rs 
b/core/src/services/gdrive/builder.rs
new file mode 100644
index 00000000..d7ed0c84
--- /dev/null
+++ b/core/src/services/gdrive/builder.rs
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+
+use log::debug;
+
+use super::backend::GdriveBackend;
+use crate::raw::{normalize_root, HttpClient};
+use crate::Scheme;
+use crate::*;
+
+/// [GoogleDrive](https://drive.google.com/) backend support.
+///
+/// # Capabilities
+///
+/// This service can be used to:
+///
+/// - [x] read
+/// - [x] write
+/// - [x] delete
+/// - [ ] copy
+/// - [ ] create
+/// - [ ] list
+/// - [ ] rename
+///
+/// # Notes
+///
+///
+/// # Configuration
+///
+/// - `access_token`: set the access_token for google drive api
+/// - `root`: Set the work directory for backend
+///
+/// You can refer to [`GoogleDriveBuilder`]'s docs for more information
+///
+/// # Example
+///
+/// ## Via Builder
+///
+/// ```no_run
+/// use anyhow::Result;
+/// use opendal::services::Gdrive;
+/// use opendal::Operator;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // create backend builder
+///     let mut builder = Gdrive::default();
+///
+///     builder.access_token("xxx").root("/path/to/root");
+///
+///     let op: Operator = Operator::new(builder)?.finish();
+///
+///     let write = op.write("abc.txt", "who are you").await?;
+///     let read = op.read("abc.txt").await?;
+///     let s = String::from_utf8(read).unwrap();
+///     println!("{}", s);
+///     let delete = op.delete("abc.txt").await?;
+///     Ok(())
+/// }
+/// ```
+#[derive(Default)]
+pub struct GdriveBuilder {
+    access_token: Option<String>,
+    root: Option<String>,
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for GdriveBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend").field("root", &self.root).finish()
+    }
+}
+
+impl GdriveBuilder {
+    /// default: no access token, which leads to failure
+    pub fn access_token(&mut self, access_token: &str) -> &mut Self {
+        self.access_token = Some(access_token.to_string());
+        self
+    }
+
+    /// Set root path of GoogleDrive folder.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.root = Some(root.to_string());
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, http_client: HttpClient) -> &mut Self {
+        self.http_client = Some(http_client);
+        self
+    }
+}
+
+impl Builder for GdriveBuilder {
+    const SCHEME: Scheme = Scheme::Gdrive;
+
+    type Accessor = GdriveBackend;
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        let mut builder = Self::default();
+
+        map.get("root").map(|v| builder.root(v));
+        map.get("access_token").map(|v| builder.access_token(v));
+
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        let root = normalize_root(&self.root.take().unwrap_or_default());
+        debug!("backend use root {}", root);
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::Gdrive)
+            })?
+        };
+
+        match self.access_token.clone() {
+            Some(access_token) => Ok(GdriveBackend::new(root, access_token, 
client)),
+            None => Err(Error::new(ErrorKind::ConfigInvalid, "access_token not 
set")),
+        }
+    }
+}
diff --git a/core/src/services/gdrive/error.rs 
b/core/src/services/gdrive/error.rs
new file mode 100644
index 00000000..6fe6b57d
--- /dev/null
+++ b/core/src/services/gdrive/error.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use http::Response;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (kind, retryable) = match parts.status {
+        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
+        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
+        StatusCode::INTERNAL_SERVER_ERROR
+        | StatusCode::BAD_GATEWAY
+        | StatusCode::SERVICE_UNAVAILABLE
+        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let mut err = Error::new(kind, &String::from_utf8_lossy(&bs))
+        .with_context("response", format!("{parts:?}"));
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs
new file mode 100644
index 00000000..7c88f4fe
--- /dev/null
+++ b/core/src/services/gdrive/mod.rs
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+mod builder;
+mod error;
+
+pub use builder::GdriveBuilder as Gdrive;
+mod writer;
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
new file mode 100644
index 00000000..de9067aa
--- /dev/null
+++ b/core/src/services/gdrive/writer.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use http::StatusCode;
+
+use super::backend::GdriveBackend;
+use super::error::parse_error;
+use crate::ops::OpWrite;
+use crate::raw::*;
+use crate::*;
+
+pub struct GdriveWriter {
+    backend: GdriveBackend,
+
+    op: OpWrite,
+    path: String,
+}
+
+impl GdriveWriter {
+    pub fn new(backend: GdriveBackend, op: OpWrite, path: String) -> Self {
+        GdriveWriter { backend, op, path }
+    }
+}
+
+#[async_trait]
+impl oio::Write for GdriveWriter {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let resp = self
+            .backend
+            .gdrive_update(
+                &self.path,
+                Some(bs.len()),
+                self.op.content_type(),
+                AsyncBody::Bytes(bs),
+            )
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        Ok(())
+    }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index d08135b0..a2371940 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -146,5 +146,11 @@ mod webhdfs;
 mod onedrive;
 #[cfg(feature = "services-onedrive")]
 pub use onedrive::Onedrive;
+
+#[cfg(feature = "services-gdrive")]
+mod gdrive;
+#[cfg(feature = "services-gdrive")]
+pub use gdrive::Gdrive;
+
 #[cfg(feature = "services-webhdfs")]
 pub use webhdfs::Webhdfs;
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 7e52ab84..4f7c2020 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -63,6 +63,8 @@ pub enum Scheme {
     Obs,
     /// [onedrive][crate::services::Onedrive]: Microsoft OneDrive services.
     Onedrive,
+    /// [gdrive][crate::services::Gdrive]: GoogleDrive services.
+    Gdrive,
     /// [oss][crate::services::Oss]: Aliyun Object Storage Services
     Oss,
     /// [redis][crate::services::Redis]: Redis services
@@ -165,6 +167,7 @@ impl From<Scheme> for &'static str {
             Scheme::Moka => "moka",
             Scheme::Obs => "obs",
             Scheme::Onedrive => "onedrive",
+            Scheme::Gdrive => "gdrive",
             Scheme::Redis => "redis",
             Scheme::Rocksdb => "rocksdb",
             Scheme::S3 => "s3",

Reply via email to