This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new af273fa7 services/gdrive: port code to GdriveCore & add path_2_id 
cache (#2203)
af273fa7 is described below

commit af273fa745ef677cc3d3cccc0819681c7b9b311e
Author: Flash <[email protected]>
AuthorDate: Tue May 9 17:58:41 2023 +0800

    services/gdrive: port code to GdriveCore & add path_2_id cache (#2203)
    
    * port code to GdriveCore & add path_2_id cache
    
    * make clippy happy
    
    * deal with error
    
    * typos fix
    
    * extract auth to core.sign
    
    * code review
    
    * parse resp according to StatusCode
---
 core/src/services/gdrive/backend.rs | 174 +++--------------------------
 core/src/services/gdrive/core.rs    | 217 ++++++++++++++++++++++++++++++++++++
 core/src/services/gdrive/mod.rs     |   1 +
 core/src/services/gdrive/writer.rs  |  13 ++-
 4 files changed, 243 insertions(+), 162 deletions(-)

diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index babda427..03d8c83e 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -16,47 +16,40 @@
 // under the License.
 
 use async_trait::async_trait;
-use http::{header, Request, Response, StatusCode};
-use serde::Deserialize;
-use std::fmt::Debug;
+use http::StatusCode;
+
+use std::{fmt::Debug, sync::Arc};
 
 use crate::{
     ops::{OpDelete, OpRead, OpWrite},
     raw::{
-        build_rooted_abs_path, new_request_build_error, parse_into_metadata, 
Accessor,
-        AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody, RpDelete, 
RpRead, RpWrite,
+        parse_into_metadata, Accessor, AccessorInfo, HttpClient, 
IncomingAsyncBody, RpDelete,
+        RpRead, RpWrite,
     },
     types::Result,
     Capability, Error, ErrorKind,
 };
 
-use super::{error::parse_error, writer::GdriveWriter};
+use super::{core::GdriveCore, error::parse_error, writer::GdriveWriter};
 
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct GdriveBackend {
-    root: String,
-    access_token: String,
-    client: HttpClient,
+    core: Arc<GdriveCore>,
 }
 
 impl GdriveBackend {
     pub(crate) fn new(root: String, access_token: String, http_client: 
HttpClient) -> Self {
         GdriveBackend {
-            root,
-            access_token,
-            client: http_client,
+            core: Arc::new(GdriveCore {
+                root,
+                access_token,
+                client: http_client,
+                path_cache: Arc::default(),
+            }),
         }
     }
 }
 
-impl Debug for GdriveBackend {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let mut de = f.debug_struct("GoolgeDriveBackend");
-        de.field("root", &self.root);
-        de.finish()
-    }
-}
-
 #[async_trait]
 impl Accessor for GdriveBackend {
     type Reader = IncomingAsyncBody;
@@ -69,7 +62,7 @@ impl Accessor for GdriveBackend {
     fn info(&self) -> AccessorInfo {
         let mut ma = AccessorInfo::default();
         ma.set_scheme(crate::Scheme::Gdrive)
-            .set_root(&self.root)
+            .set_root(&self.core.root)
             .set_capability(Capability {
                 read: true,
                 write: true,
@@ -81,7 +74,7 @@ impl Accessor for GdriveBackend {
     }
 
     async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.gdrive_get(path).await?;
+        let resp = self.core.gdrive_get(path).await?;
 
         let status = resp.status();
 
@@ -104,12 +97,12 @@ impl Accessor for GdriveBackend {
 
         Ok((
             RpWrite::default(),
-            GdriveWriter::new(self.clone(), args, String::from(path)),
+            GdriveWriter::new(self.core.clone(), args, String::from(path)),
         ))
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.gdrive_delete(path).await?;
+        let resp = self.core.gdrive_delete(path).await?;
 
         let status = resp.status();
 
@@ -119,134 +112,3 @@ impl Accessor for GdriveBackend {
         }
     }
 }
-
-impl GdriveBackend {
-    async fn get_abs_root_id(&self) -> String {
-        let mut req = 
Request::get("https://www.googleapis.com/drive/v3/files/root";);
-        let auth_header_content = format!("Bearer {}", self.access_token);
-        req = req.header(header::AUTHORIZATION, auth_header_content);
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)
-            .unwrap();
-
-        let resp = self.client.send(req).await.unwrap();
-
-        let body_value: GdriveFile =
-            
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
-        let root_id = String::from(body_value.id.as_str());
-        root_id
-    }
-
-    async fn get_file_id_by_path(&self, file_path: &str) -> String {
-        let path = build_rooted_abs_path(&self.root, file_path);
-        let auth_header_content = format!("Bearer {}", self.access_token);
-
-        let mut parent_id = self.get_abs_root_id().await;
-        let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
-
-        for (i, item) in file_path_items.iter().enumerate() {
-            let mut query = format!(
-                "name = '{}' and parents = '{}' and trashed = false",
-                item, parent_id
-            );
-            if i != file_path_items.len() - 1 {
-                query += "and mimeType = 'application/vnd.google-apps.folder'";
-            }
-            let query: String = query.chars().filter(|c| 
!c.is_whitespace()).collect();
-
-            let mut req = Request::get(format!(
-                "https://www.googleapis.com/drive/v3/files?q={}";,
-                query
-            ));
-            req = req.header(header::AUTHORIZATION, &auth_header_content);
-            let req = req
-                .body(AsyncBody::default())
-                .map_err(new_request_build_error)
-                .unwrap();
-
-            let resp = self.client.send(req).await.unwrap();
-
-            let body_value: GdriveFileList =
-                
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
-            parent_id = String::from(body_value.files[0].id.as_str());
-        }
-
-        parent_id
-    }
-
-    async fn gdrive_get(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let url: String = format!(
-            "https://www.googleapis.com/drive/v3/files/{}?alt=media";,
-            self.get_file_id_by_path(path).await
-        );
-
-        let auth_header_content = format!("Bearer {}", self.access_token);
-        let mut req = Request::get(&url);
-        req = req.header(header::AUTHORIZATION, auth_header_content);
-
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub async fn gdrive_update(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        content_type: Option<&str>,
-        body: AsyncBody,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let url = format!(
-            "https://www.googleapis.com/upload/drive/v3/files/{}";,
-            self.get_file_id_by_path(path).await
-        );
-
-        let mut req = Request::patch(&url);
-
-        let auth_header_content = format!("Bearer {}", self.access_token);
-        req = req.header(header::AUTHORIZATION, auth_header_content);
-
-        if let Some(size) = size {
-            req = req.header(header::CONTENT_LENGTH, size)
-        }
-
-        if let Some(mime) = content_type {
-            req = req.header(header::CONTENT_TYPE, mime)
-        }
-
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn gdrive_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let url = format!(
-            "https://www.googleapis.com/drive/v3/files/{}";,
-            self.get_file_id_by_path(path).await
-        );
-
-        let mut req = Request::delete(&url);
-
-        let auth_header_content = format!("Bearer {}", self.access_token);
-        req = req.header(header::AUTHORIZATION, auth_header_content);
-
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-}
-
-#[derive(Deserialize)]
-struct GdriveFile {
-    id: String,
-}
-
-#[derive(Deserialize)]
-struct GdriveFileList {
-    files: Vec<GdriveFile>,
-}
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
new file mode 100644
index 00000000..6f66bf8b
--- /dev/null
+++ b/core/src/services/gdrive/core.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use crate::raw::new_json_deserialize_error;
+use crate::raw::percent_encode_path;
+use crate::raw::HttpClient;
+use crate::Error;
+use crate::ErrorKind;
+
+use http::request::Builder;
+use http::StatusCode;
+use http::{header, Request, Response};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+use crate::{
+    raw::{build_rooted_abs_path, new_request_build_error, AsyncBody, 
IncomingAsyncBody},
+    types::Result,
+};
+
+use super::error::parse_error;
+
+pub struct GdriveCore {
+    pub root: String,
+    pub access_token: String,
+    pub client: HttpClient,
+    pub path_cache: Arc<Mutex<HashMap<String, String>>>,
+}
+
+impl Debug for GdriveCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut de = f.debug_struct("GdriveCore");
+        de.field("root", &self.root);
+        de.finish()
+    }
+}
+
+impl GdriveCore {
+    async fn get_abs_root_id(&self) -> Result<String> {
+        let root = "root";
+
+        if let Some(root_id) = self.path_cache.lock().await.get(root) {
+            return Ok(root_id.to_string());
+        }
+
+        let req = self
+            .sign(Request::get(
+                "https://www.googleapis.com/drive/v3/files/root";,
+            ))
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.client.send(req).await?;
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let resp_body = &resp.into_body().bytes().await?;
+
+                let gdrive_file: GdriveFile =
+                    
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+                let root_id = gdrive_file.id;
+
+                let mut cache_guard = self.path_cache.lock().await;
+                cache_guard.insert(root.to_owned(), root_id.clone());
+
+                Ok(root_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
+        let path = build_rooted_abs_path(&self.root, file_path);
+
+        if let Some(file_id) = self.path_cache.lock().await.get(&path) {
+            return Ok(file_id.to_string());
+        }
+
+        let mut parent_id = self.get_abs_root_id().await?;
+        let file_path_items: Vec<&str> = path.split('/').filter(|&x| 
!x.is_empty()).collect();
+
+        for (i, item) in file_path_items.iter().enumerate() {
+            let mut query = format!(
+                "name = '{}' and parents = '{}' and trashed = false",
+                item, parent_id
+            );
+            if i != file_path_items.len() - 1 {
+                query += "and mimeType = 'application/vnd.google-apps.folder'";
+            }
+
+            let req = self
+                .sign(Request::get(format!(
+                    "https://www.googleapis.com/drive/v3/files?q={}";,
+                    percent_encode_path(&query)
+                )))
+                .body(AsyncBody::default())
+                .map_err(new_request_build_error)?;
+
+            let resp = self.client.send(req).await?;
+            let status = resp.status();
+
+            if status == StatusCode::OK {
+                let resp_body = &resp.into_body().bytes().await?;
+
+                let gdrive_file_list: GdriveFileList =
+                    
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+                if gdrive_file_list.files.len() != 1 {
+                    return Err(Error::new(ErrorKind::Unexpected, 
&format!("Please ensure that the file corresponding to the path exists and is 
unique. The response body is {}", String::from_utf8_lossy(resp_body))));
+                }
+
+                parent_id = gdrive_file_list.files[0].id.clone();
+            } else {
+                return Err(parse_error(resp).await?);
+            }
+        }
+
+        let mut cache_guard = self.path_cache.lock().await;
+        cache_guard.insert(path, parent_id.clone());
+
+        Ok(parent_id)
+    }
+
+    pub async fn gdrive_get(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let url: String = format!(
+            "https://www.googleapis.com/drive/v3/files/{}?alt=media";,
+            self.get_file_id_by_path(path).await?
+        );
+
+        let req = self
+            .sign(Request::get(&url))
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub async fn gdrive_update(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            "https://www.googleapis.com/upload/drive/v3/files/{}";,
+            self.get_file_id_by_path(path).await?
+        );
+
+        let mut req = Request::patch(&url);
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size)
+        }
+
+        if let Some(mime) = content_type {
+            req = req.header(header::CONTENT_TYPE, mime)
+        }
+
+        let req = self.sign(req).body(body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    pub async fn gdrive_delete(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let url = format!(
+            "https://www.googleapis.com/drive/v3/files/{}";,
+            self.get_file_id_by_path(path).await?
+        );
+
+        let req = self
+            .sign(Request::delete(&url))
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
+
+    fn sign(&self, mut req: Builder) -> Builder {
+        let auth_header_content = format!("Bearer {}", self.access_token);
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+        req
+    }
+}
+
+// refer to 
https://developers.google.com/drive/api/reference/rest/v3/files#File
+#[derive(Deserialize)]
+struct GdriveFile {
+    id: String,
+}
+
+// refer to 
https://developers.google.com/drive/api/reference/rest/v3/files/list
+#[derive(Deserialize)]
+struct GdriveFileList {
+    files: Vec<GdriveFile>,
+}
diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs
index 7c88f4fe..5f77ecc5 100644
--- a/core/src/services/gdrive/mod.rs
+++ b/core/src/services/gdrive/mod.rs
@@ -17,6 +17,7 @@
 
 mod backend;
 mod builder;
+mod core;
 mod error;
 
 pub use builder::GdriveBuilder as Gdrive;
diff --git a/core/src/services/gdrive/writer.rs 
b/core/src/services/gdrive/writer.rs
index de9067aa..dcff07d2 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -15,26 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use async_trait::async_trait;
 use bytes::Bytes;
 use http::StatusCode;
 
-use super::backend::GdriveBackend;
+use super::core::GdriveCore;
 use super::error::parse_error;
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
 
 pub struct GdriveWriter {
-    backend: GdriveBackend,
-
+    core: Arc<GdriveCore>,
     op: OpWrite,
     path: String,
 }
 
 impl GdriveWriter {
-    pub fn new(backend: GdriveBackend, op: OpWrite, path: String) -> Self {
-        GdriveWriter { backend, op, path }
+    pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self {
+        GdriveWriter { core, op, path }
     }
 }
 
@@ -42,7 +43,7 @@ impl GdriveWriter {
 impl oio::Write for GdriveWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
         let resp = self
-            .backend
+            .core
             .gdrive_update(
                 &self.path,
                 Some(bs.len()),

Reply via email to