This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new af273fa7 services/gdrive: port code to GdriveCore & add path_2_id
cache (#2203)
af273fa7 is described below
commit af273fa745ef677cc3d3cccc0819681c7b9b311e
Author: Flash <[email protected]>
AuthorDate: Tue May 9 17:58:41 2023 +0800
services/gdrive: port code to GdriveCore & add path_2_id cache (#2203)
* port code to GdriveCore & add path_2_id cache
* make clippy happy
* deal with error
* typos fix
* extract auth to core.sign
* code review
* parse resp according to StatusCode
---
core/src/services/gdrive/backend.rs | 174 +++--------------------------
core/src/services/gdrive/core.rs | 217 ++++++++++++++++++++++++++++++++++++
core/src/services/gdrive/mod.rs | 1 +
core/src/services/gdrive/writer.rs | 13 ++-
4 files changed, 243 insertions(+), 162 deletions(-)
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index babda427..03d8c83e 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -16,47 +16,40 @@
// under the License.
use async_trait::async_trait;
-use http::{header, Request, Response, StatusCode};
-use serde::Deserialize;
-use std::fmt::Debug;
+use http::StatusCode;
+
+use std::{fmt::Debug, sync::Arc};
use crate::{
ops::{OpDelete, OpRead, OpWrite},
raw::{
- build_rooted_abs_path, new_request_build_error, parse_into_metadata,
Accessor,
- AccessorInfo, AsyncBody, HttpClient, IncomingAsyncBody, RpDelete,
RpRead, RpWrite,
+ parse_into_metadata, Accessor, AccessorInfo, HttpClient,
IncomingAsyncBody, RpDelete,
+ RpRead, RpWrite,
},
types::Result,
Capability, Error, ErrorKind,
};
-use super::{error::parse_error, writer::GdriveWriter};
+use super::{core::GdriveCore, error::parse_error, writer::GdriveWriter};
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct GdriveBackend {
- root: String,
- access_token: String,
- client: HttpClient,
+ core: Arc<GdriveCore>,
}
impl GdriveBackend {
pub(crate) fn new(root: String, access_token: String, http_client:
HttpClient) -> Self {
GdriveBackend {
- root,
- access_token,
- client: http_client,
+ core: Arc::new(GdriveCore {
+ root,
+ access_token,
+ client: http_client,
+ path_cache: Arc::default(),
+ }),
}
}
}
-impl Debug for GdriveBackend {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let mut de = f.debug_struct("GoolgeDriveBackend");
- de.field("root", &self.root);
- de.finish()
- }
-}
-
#[async_trait]
impl Accessor for GdriveBackend {
type Reader = IncomingAsyncBody;
@@ -69,7 +62,7 @@ impl Accessor for GdriveBackend {
fn info(&self) -> AccessorInfo {
let mut ma = AccessorInfo::default();
ma.set_scheme(crate::Scheme::Gdrive)
- .set_root(&self.root)
+ .set_root(&self.core.root)
.set_capability(Capability {
read: true,
write: true,
@@ -81,7 +74,7 @@ impl Accessor for GdriveBackend {
}
async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.gdrive_get(path).await?;
+ let resp = self.core.gdrive_get(path).await?;
let status = resp.status();
@@ -104,12 +97,12 @@ impl Accessor for GdriveBackend {
Ok((
RpWrite::default(),
- GdriveWriter::new(self.clone(), args, String::from(path)),
+ GdriveWriter::new(self.core.clone(), args, String::from(path)),
))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.gdrive_delete(path).await?;
+ let resp = self.core.gdrive_delete(path).await?;
let status = resp.status();
@@ -119,134 +112,3 @@ impl Accessor for GdriveBackend {
}
}
}
-
-impl GdriveBackend {
- async fn get_abs_root_id(&self) -> String {
- let mut req =
Request::get("https://www.googleapis.com/drive/v3/files/root");
- let auth_header_content = format!("Bearer {}", self.access_token);
- req = req.header(header::AUTHORIZATION, auth_header_content);
- let req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)
- .unwrap();
-
- let resp = self.client.send(req).await.unwrap();
-
- let body_value: GdriveFile =
-
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
- let root_id = String::from(body_value.id.as_str());
- root_id
- }
-
- async fn get_file_id_by_path(&self, file_path: &str) -> String {
- let path = build_rooted_abs_path(&self.root, file_path);
- let auth_header_content = format!("Bearer {}", self.access_token);
-
- let mut parent_id = self.get_abs_root_id().await;
- let file_path_items: Vec<&str> = path.split('/').filter(|&x|
!x.is_empty()).collect();
-
- for (i, item) in file_path_items.iter().enumerate() {
- let mut query = format!(
- "name = '{}' and parents = '{}' and trashed = false",
- item, parent_id
- );
- if i != file_path_items.len() - 1 {
- query += "and mimeType = 'application/vnd.google-apps.folder'";
- }
- let query: String = query.chars().filter(|c|
!c.is_whitespace()).collect();
-
- let mut req = Request::get(format!(
- "https://www.googleapis.com/drive/v3/files?q={}",
- query
- ));
- req = req.header(header::AUTHORIZATION, &auth_header_content);
- let req = req
- .body(AsyncBody::default())
- .map_err(new_request_build_error)
- .unwrap();
-
- let resp = self.client.send(req).await.unwrap();
-
- let body_value: GdriveFileList =
-
serde_json::from_slice(&resp.into_body().bytes().await.unwrap()).unwrap();
- parent_id = String::from(body_value.files[0].id.as_str());
- }
-
- parent_id
- }
-
- async fn gdrive_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let url: String = format!(
- "https://www.googleapis.com/drive/v3/files/{}?alt=media",
- self.get_file_id_by_path(path).await
- );
-
- let auth_header_content = format!("Bearer {}", self.access_token);
- let mut req = Request::get(&url);
- req = req.header(header::AUTHORIZATION, auth_header_content);
-
- let req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- pub async fn gdrive_update(
- &self,
- path: &str,
- size: Option<usize>,
- content_type: Option<&str>,
- body: AsyncBody,
- ) -> Result<Response<IncomingAsyncBody>> {
- let url = format!(
- "https://www.googleapis.com/upload/drive/v3/files/{}",
- self.get_file_id_by_path(path).await
- );
-
- let mut req = Request::patch(&url);
-
- let auth_header_content = format!("Bearer {}", self.access_token);
- req = req.header(header::AUTHORIZATION, auth_header_content);
-
- if let Some(size) = size {
- req = req.header(header::CONTENT_LENGTH, size)
- }
-
- if let Some(mime) = content_type {
- req = req.header(header::CONTENT_TYPE, mime)
- }
-
- let req = req.body(body).map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- async fn gdrive_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let url = format!(
- "https://www.googleapis.com/drive/v3/files/{}",
- self.get_file_id_by_path(path).await
- );
-
- let mut req = Request::delete(&url);
-
- let auth_header_content = format!("Bearer {}", self.access_token);
- req = req.header(header::AUTHORIZATION, auth_header_content);
-
- let req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-}
-
-#[derive(Deserialize)]
-struct GdriveFile {
- id: String,
-}
-
-#[derive(Deserialize)]
-struct GdriveFileList {
- files: Vec<GdriveFile>,
-}
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
new file mode 100644
index 00000000..6f66bf8b
--- /dev/null
+++ b/core/src/services/gdrive/core.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use crate::raw::new_json_deserialize_error;
+use crate::raw::percent_encode_path;
+use crate::raw::HttpClient;
+use crate::Error;
+use crate::ErrorKind;
+
+use http::request::Builder;
+use http::StatusCode;
+use http::{header, Request, Response};
+use serde::Deserialize;
+use tokio::sync::Mutex;
+
+use crate::{
+ raw::{build_rooted_abs_path, new_request_build_error, AsyncBody,
IncomingAsyncBody},
+ types::Result,
+};
+
+use super::error::parse_error;
+
+pub struct GdriveCore {
+ pub root: String,
+ pub access_token: String,
+ pub client: HttpClient,
+ pub path_cache: Arc<Mutex<HashMap<String, String>>>,
+}
+
+impl Debug for GdriveCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut de = f.debug_struct("GdriveCore");
+ de.field("root", &self.root);
+ de.finish()
+ }
+}
+
+impl GdriveCore {
+ async fn get_abs_root_id(&self) -> Result<String> {
+ let root = "root";
+
+ if let Some(root_id) = self.path_cache.lock().await.get(root) {
+ return Ok(root_id.to_string());
+ }
+
+ let req = self
+ .sign(Request::get(
+ "https://www.googleapis.com/drive/v3/files/root",
+ ))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+
+ let gdrive_file: GdriveFile =
+
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+ let root_id = gdrive_file.id;
+
+ let mut cache_guard = self.path_cache.lock().await;
+ cache_guard.insert(root.to_owned(), root_id.clone());
+
+ Ok(root_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
+ let path = build_rooted_abs_path(&self.root, file_path);
+
+ if let Some(file_id) = self.path_cache.lock().await.get(&path) {
+ return Ok(file_id.to_string());
+ }
+
+ let mut parent_id = self.get_abs_root_id().await?;
+ let file_path_items: Vec<&str> = path.split('/').filter(|&x|
!x.is_empty()).collect();
+
+ for (i, item) in file_path_items.iter().enumerate() {
+ let mut query = format!(
+ "name = '{}' and parents = '{}' and trashed = false",
+ item, parent_id
+ );
+ if i != file_path_items.len() - 1 {
+ query += "and mimeType = 'application/vnd.google-apps.folder'";
+ }
+
+ let req = self
+ .sign(Request::get(format!(
+ "https://www.googleapis.com/drive/v3/files?q={}",
+ percent_encode_path(&query)
+ )))
+ .body(AsyncBody::default())
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+
+ if status == StatusCode::OK {
+ let resp_body = &resp.into_body().bytes().await?;
+
+ let gdrive_file_list: GdriveFileList =
+
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+ if gdrive_file_list.files.len() != 1 {
+ return Err(Error::new(ErrorKind::Unexpected,
&format!("Please ensure that the file corresponding to the path exists and is
unique. The response body is {}", String::from_utf8_lossy(resp_body))));
+ }
+
+ parent_id = gdrive_file_list.files[0].id.clone();
+ } else {
+ return Err(parse_error(resp).await?);
+ }
+ }
+
+ let mut cache_guard = self.path_cache.lock().await;
+ cache_guard.insert(path, parent_id.clone());
+
+ Ok(parent_id)
+ }
+
+ pub async fn gdrive_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let url: String = format!(
+ "https://www.googleapis.com/drive/v3/files/{}?alt=media",
+ self.get_file_id_by_path(path).await?
+ );
+
+ let req = self
+ .sign(Request::get(&url))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub async fn gdrive_update(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url = format!(
+ "https://www.googleapis.com/upload/drive/v3/files/{}",
+ self.get_file_id_by_path(path).await?
+ );
+
+ let mut req = Request::patch(&url);
+
+ if let Some(size) = size {
+ req = req.header(header::CONTENT_LENGTH, size)
+ }
+
+ if let Some(mime) = content_type {
+ req = req.header(header::CONTENT_TYPE, mime)
+ }
+
+ let req = self.sign(req).body(body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ pub async fn gdrive_delete(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let url = format!(
+ "https://www.googleapis.com/drive/v3/files/{}",
+ self.get_file_id_by_path(path).await?
+ );
+
+ let req = self
+ .sign(Request::delete(&url))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
+
+ fn sign(&self, mut req: Builder) -> Builder {
+ let auth_header_content = format!("Bearer {}", self.access_token);
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+ req
+ }
+}
+
+// refer to
https://developers.google.com/drive/api/reference/rest/v3/files#File
+#[derive(Deserialize)]
+struct GdriveFile {
+ id: String,
+}
+
+// refer to
https://developers.google.com/drive/api/reference/rest/v3/files/list
+#[derive(Deserialize)]
+struct GdriveFileList {
+ files: Vec<GdriveFile>,
+}
diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs
index 7c88f4fe..5f77ecc5 100644
--- a/core/src/services/gdrive/mod.rs
+++ b/core/src/services/gdrive/mod.rs
@@ -17,6 +17,7 @@
mod backend;
mod builder;
+mod core;
mod error;
pub use builder::GdriveBuilder as Gdrive;
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index de9067aa..dcff07d2 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -15,26 +15,27 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
-use super::backend::GdriveBackend;
+use super::core::GdriveCore;
use super::error::parse_error;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
pub struct GdriveWriter {
- backend: GdriveBackend,
-
+ core: Arc<GdriveCore>,
op: OpWrite,
path: String,
}
impl GdriveWriter {
- pub fn new(backend: GdriveBackend, op: OpWrite, path: String) -> Self {
- GdriveWriter { backend, op, path }
+ pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self {
+ GdriveWriter { core, op, path }
}
}
@@ -42,7 +43,7 @@ impl GdriveWriter {
impl oio::Write for GdriveWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
let resp = self
- .backend
+ .core
.gdrive_update(
&self.path,
Some(bs.len()),