This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 4e1c14cb1 refactor(services/gdrive): prepare for CI (#2892)
4e1c14cb1 is described below
commit 4e1c14cb19f75ebf35848ca40ae03cda5f14ae32
Author: Suyan <[email protected]>
AuthorDate: Tue Aug 22 00:26:38 2023 +0800
refactor(services/gdrive): prepare for CI (#2892)
---
.env.example | 9 +-
core/src/services/gdrive/backend.rs | 163 ++++++++++++++--
core/src/services/gdrive/core.rs | 377 ++++++++++++++++++++++++++++--------
core/src/services/gdrive/writer.rs | 72 +++++--
4 files changed, 510 insertions(+), 111 deletions(-)
diff --git a/.env.example b/.env.example
index 62a3f9473..ef73d622b 100644
--- a/.env.example
+++ b/.env.example
@@ -148,4 +148,11 @@ OPENDAL_ETCD_USERNAME=<username>
OPENDAL_ETCD_PASSWORD=<password>
OPENDAL_ETCD_CA_PATH=<ca_path>
OPENDAL_ETCD_CERT_PATH=<cert_path>
-OPENDAL_ETCD_KEY_PATH=<key_path>
\ No newline at end of file
+OPENDAL_ETCD_KEY_PATH=<key_path>
+# google drive
+OPENDAL_GDRIVE_TEST=false
+OPENDAL_GDRIVE_ROOT=/tmp/opendal/
+OPENDAL_GDRIVE_ACCESS_TOKEN=<access_token>
+OPENDAL_GDRIVE_REFRESH_TOKEN=<refresh_token>
+OPENDAL_GDRIVE_CLIENT_ID=<client_id>
+OPENDAL_GDRIVE_CLIENT_SECRET=<client_secret>
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index 72d5740e8..49ce23314 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -19,16 +19,17 @@ use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
+use chrono::Utc;
use http::StatusCode;
use super::core::GdriveCore;
use super::error::parse_error;
use super::writer::GdriveWriter;
use crate::raw::*;
+use crate::services::gdrive::core::GdriveFile;
+use crate::services::gdrive::core::GdriveFileList;
use crate::types::Result;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
+use crate::*;
#[derive(Clone, Debug)]
pub struct GdriveBackend {
@@ -59,27 +60,96 @@ impl Accessor for GdriveBackend {
fn info(&self) -> AccessorInfo {
let mut ma = AccessorInfo::default();
- ma.set_scheme(crate::Scheme::Gdrive)
+ ma.set_scheme(Scheme::Gdrive)
.set_root(&self.core.root)
.set_full_capability(Capability {
+ stat: true,
+
read: true,
+
write: true,
+
+ create_dir: true,
+
delete: true,
+
..Default::default()
});
ma
}
+ async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let resp = self.core.gdrive_stat(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let meta =
self.parse_metadata(resp.into_body().bytes().await?)?;
+ Ok(RpStat::new(meta))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn create_dir(&self, path: &str, _args: OpCreateDir) ->
Result<RpCreateDir> {
+ let parent = self.core.ensure_parent_path(path).await?;
+
+ let path = path.split('/').filter(|&x| !x.is_empty()).last().unwrap();
+
+ // As Google Drive allows files have the same name, we need to check
if the folder exists.
+ let resp = self.core.gdrive_search_folder(path, &parent).await?;
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let body = resp.into_body().bytes().await?;
+ let meta = serde_json::from_slice::<GdriveFileList>(&body)
+ .map_err(new_json_deserialize_error)?;
+
+ if !meta.files.is_empty() {
+ return Ok(RpCreateDir::default());
+ }
+ }
+ _ => return Err(parse_error(resp).await?),
+ }
+
+ let resp = self.core.gdrive_create_folder(path, Some(parent)).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(RpCreateDir::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.core.gdrive_get(path).await?;
+ // We need to request for metadata and body separately here.
+ // Request for metadata first to check if the file exists.
+ let resp = self.core.gdrive_stat(path).await?;
let status = resp.status();
match status {
StatusCode::OK => {
- let meta = parse_into_metadata(path, resp.headers())?;
- Ok((RpRead::with_metadata(meta), resp.into_body()))
+ let body = resp.into_body().bytes().await?;
+ let meta = self.parse_metadata(body)?;
+
+ let resp = self.core.gdrive_get(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok((RpRead::with_metadata(meta),
resp.into_body())),
+ _ => Err(parse_error(resp).await?),
+ }
}
_ => Err(parse_error(resp).await?),
}
@@ -93,20 +163,87 @@ impl Accessor for GdriveBackend {
));
}
+ // As Google Drive allows files have the same name, we need to check
if the file exists.
+ // If the file exists, we will keep its ID and update it.
+ let mut file_id: Option<String> = None;
+
+ let resp = self.core.gdrive_stat(path).await;
+ // We don't care about the error here.
+ // As long as the file doesn't exist, we will create a new one.
+ if let Ok(resp) = resp {
+ let status = resp.status();
+
+ if status == StatusCode::OK {
+ let body = resp.into_body().bytes().await?;
+ let meta = serde_json::from_slice::<GdriveFile>(&body)
+ .map_err(new_json_deserialize_error)?;
+
+ file_id = if meta.id.is_empty() {
+ None
+ } else {
+ Some(meta.id)
+ };
+ }
+ }
+
Ok((
RpWrite::default(),
- GdriveWriter::new(self.core.clone(), args, String::from(path)),
+ GdriveWriter::new(self.core.clone(), String::from(path), file_id),
))
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.core.gdrive_delete(path).await?;
+ let resp = self.core.gdrive_delete(path).await;
+ if let Ok(resp) = resp {
+ let status = resp.status();
- let status = resp.status();
+ match status {
+ StatusCode::NO_CONTENT => return Ok(RpDelete::default()),
+ _ => return Err(parse_error(resp).await?),
+ }
+ };
- match status {
- StatusCode::NO_CONTENT => Ok(RpDelete::default()),
- _ => Err(parse_error(resp).await?),
+ let e = resp.err().unwrap();
+ if e.kind() == ErrorKind::NotFound {
+ Ok(RpDelete::default())
+ } else {
+ Err(e)
}
}
}
+
+impl GdriveBackend {
+ pub(crate) fn parse_metadata(&self, body: bytes::Bytes) ->
Result<Metadata> {
+ let metadata =
+
serde_json::from_slice::<GdriveFile>(&body).map_err(new_json_deserialize_error)?;
+
+ let mut meta = Metadata::new(match metadata.mime_type.as_str() {
+ "application/vnd.google-apps.folder" => EntryMode::DIR,
+ _ => EntryMode::FILE,
+ });
+
+ let size = if meta.mode() == EntryMode::DIR {
+ // Google Drive does not return the size for folders.
+ 0
+ } else {
+ metadata
+ .size
+ .expect("file size must exist")
+ .parse::<u64>()
+ .map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "parse content
length").set_source(e)
+ })?
+ };
+ meta = meta.with_content_length(size);
+ meta = meta.with_last_modified(
+ metadata
+ .modified_time
+ .expect("modified time must exist. please check your query
param - fields")
+ .parse::<chrono::DateTime<Utc>>()
+ .map_err(|e| {
+ Error::new(ErrorKind::Unexpected, "parse last modified
time").set_source(e)
+ })?,
+ );
+ Ok(meta)
+ }
+}
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index b54504bca..acf8b0747 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -20,22 +20,17 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
+use bytes;
use http::header;
-use http::request::Builder;
use http::Request;
use http::Response;
use http::StatusCode;
use serde::Deserialize;
+use serde_json::json;
use tokio::sync::Mutex;
use super::error::parse_error;
-use crate::raw::build_rooted_abs_path;
-use crate::raw::new_json_deserialize_error;
-use crate::raw::new_request_build_error;
-use crate::raw::percent_encode_path;
-use crate::raw::AsyncBody;
-use crate::raw::HttpClient;
-use crate::raw::IncomingAsyncBody;
+use crate::raw::*;
use crate::types::Result;
use crate::Error;
use crate::ErrorKind;
@@ -43,7 +38,13 @@ use crate::ErrorKind;
pub struct GdriveCore {
pub root: String,
pub access_token: String,
+
pub client: HttpClient,
+
+ /// Cache the mapping from path to file id
+ ///
+ /// Google Drive uses file id to identify a file.
+ /// As the path is immutable, we can cache the mapping from path to file
id.
pub path_cache: Arc<Mutex<HashMap<String, String>>>,
}
@@ -56,91 +57,223 @@ impl Debug for GdriveCore {
}
impl GdriveCore {
- async fn get_abs_root_id(&self) -> Result<String> {
- let root = "root";
+ /// Get the file id by path.
+ /// Including file and folder.
+ ///
+ /// The path is rooted at the root of the Google Drive.
+ ///
+ /// # Notes
+ ///
+ /// - A path is a sequence of file names separated by slashes.
+ /// - A file only knows its parent id, but not its name.
+ /// - To find the file id of a file, we need to traverse the path from the
root to the file.
+ pub(crate) async fn get_file_id_by_path(&self, file_path: &str) ->
Result<String> {
+ let path = build_rooted_abs_path(&self.root, file_path);
- if let Some(root_id) = self.path_cache.lock().await.get(root) {
- return Ok(root_id.to_string());
- }
+ let mut parent_id = "root".to_owned();
+ let file_path_items: Vec<&str> = path.split('/').filter(|&x|
!x.is_empty()).collect();
+
+ for (i, item) in file_path_items.iter().enumerate() {
+ let mut query = format!(
+ "name = \"{}\" and \"{}\" in parents and trashed = false",
+ item, parent_id
+ );
+ if i != file_path_items.len() - 1 {
+ query += " and mimeType =
'application/vnd.google-apps.folder'";
+ }
- let req = self
- .sign(Request::get(
- "https://www.googleapis.com/drive/v3/files/root",
+ let mut req = Request::get(format!(
+ "https://www.googleapis.com/drive/v3/files?q={}",
+ percent_encode_path(&query)
))
- .body(AsyncBody::Empty)
+ .body(AsyncBody::default())
.map_err(new_request_build_error)?;
- let resp = self.client.send(req).await?;
- let status = resp.status();
+ let _ = self.sign(&mut req);
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
- match status {
- StatusCode::OK => {
- let resp_body = &resp.into_body().bytes().await?;
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
- let gdrive_file: GdriveFile =
-
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+ let gdrive_file_list: GdriveFileList =
+
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
- let root_id = gdrive_file.id;
+ if gdrive_file_list.files.is_empty() {
+ return Err(Error::new(
+ ErrorKind::NotFound,
+ &format!("path not found: {}", item),
+ ));
+ }
- let mut cache_guard = self.path_cache.lock().await;
- cache_guard.insert(root.to_owned(), root_id.clone());
+ if gdrive_file_list.files.len() > 1 {
+ return Err(Error::new(ErrorKind::Unexpected,
&format!("please ensure that the file corresponding to the path exists and is
unique. the response body is {}", String::from_utf8_lossy(resp_body))));
+ }
- Ok(root_id)
+ parent_id = gdrive_file_list.files[0].id.clone();
+ }
+ _ => {
+ return Err(parse_error(resp).await?);
+ }
}
- _ => Err(parse_error(resp).await?),
}
- }
- async fn get_file_id_by_path(&self, file_path: &str) -> Result<String> {
- let path = build_rooted_abs_path(&self.root, file_path);
+ Ok(parent_id)
+ }
- if let Some(file_id) = self.path_cache.lock().await.get(&path) {
- return Ok(file_id.to_string());
- }
+ /// Ensure the parent path exists.
+ /// If the parent path does not exist, create it.
+ ///
+ /// # Notes
+ ///
+ /// - The path is rooted at the root of the Google Drive.
+ /// - Will create the parent path recursively.
+ pub(crate) async fn ensure_parent_path(&self, path: &str) ->
Result<String> {
+ let path = build_rooted_abs_path(&self.root, path);
- let mut parent_id = self.get_abs_root_id().await?;
- let file_path_items: Vec<&str> = path.split('/').filter(|&x|
!x.is_empty()).collect();
+ let mut parent: String = "root".to_owned();
+ let mut file_path_items: Vec<&str> = path.split('/').filter(|&x|
!x.is_empty()).collect();
+ file_path_items.pop();
for (i, item) in file_path_items.iter().enumerate() {
- let mut query = format!(
- "name = '{}' and parents = '{}' and trashed = false",
- item, parent_id
+ let query = format!(
+ "name = \"{}\" and \"{}\" in parents and trashed = false and
mimeType = 'application/vnd.google-apps.folder'",
+ item, parent
);
- if i != file_path_items.len() - 1 {
- query += "and mimeType = 'application/vnd.google-apps.folder'";
- }
- let req = self
- .sign(Request::get(format!(
- "https://www.googleapis.com/drive/v3/files?q={}",
- percent_encode_path(&query)
- )))
- .body(AsyncBody::default())
- .map_err(new_request_build_error)?;
+ let mut req = Request::get(format!(
+ "https://www.googleapis.com/drive/v3/files?q={}",
+ percent_encode_path(&query)
+ ))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let _ = self.sign(&mut req);
let resp = self.client.send(req).await?;
let status = resp.status();
- if status == StatusCode::OK {
- let resp_body = &resp.into_body().bytes().await?;
-
- let gdrive_file_list: GdriveFileList =
-
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
-
- if gdrive_file_list.files.len() != 1 {
- return Err(Error::new(ErrorKind::Unexpected,
&format!("Please ensure that the file corresponding to the path exists and is
unique. The response body is {}", String::from_utf8_lossy(resp_body))));
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+
+ let gdrive_file_list: GdriveFileList =
+
serde_json::from_slice(resp_body).map_err(new_json_deserialize_error)?;
+
+ if gdrive_file_list.files.len() != 1 {
+ let parent_name = file_path_items[i];
+ let resp_body = self
+ .gdrive_create_folder(parent_name,
Some(parent.to_owned()))
+ .await?
+ .into_body()
+ .bytes()
+ .await?;
+ let parent_meta: GdriveFile =
serde_json::from_slice(&resp_body)
+ .map_err(new_json_deserialize_error)?;
+
+ parent = parent_meta.id;
+ } else {
+ parent = gdrive_file_list.files[0].id.clone();
+ }
+ }
+ StatusCode::NOT_FOUND => {
+ let parent_name = file_path_items[i];
+ let res = self
+ .gdrive_create_folder(parent_name,
Some(parent.to_owned()))
+ .await?;
+
+ let status = res.status();
+
+ match status {
+ StatusCode::OK => {
+ let parent_id = res.into_body().bytes().await?;
+ parent =
String::from_utf8_lossy(&parent_id).to_string();
+ }
+ _ => {
+ return Err(parse_error(res).await?);
+ }
+ }
+ }
+ _ => {
+ return Err(parse_error(resp).await?);
}
-
- parent_id = gdrive_file_list.files[0].id.clone();
- } else {
- return Err(parse_error(resp).await?);
}
}
- let mut cache_guard = self.path_cache.lock().await;
- cache_guard.insert(path, parent_id.clone());
+ Ok(parent.to_owned())
+ }
- Ok(parent_id)
+ pub async fn gdrive_search_folder(
+ &self,
+ target: &str,
+ parent: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let query = format!(
+ "name = '{}' and '{}' in parents and trashed = false and mimeType
= 'application/vnd.google-apps.folder'",
+ target, parent
+ );
+ let url = format!(
+ "https://www.googleapis.com/drive/v3/files?q={}",
+ percent_encode_path(query.as_str())
+ );
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req)?;
+
+ self.client.send(req).await
+ }
+
+ /// Create a folder.
+ /// Should provide the parent folder id.
+ /// Or will create the folder in the root folder.
+ pub async fn gdrive_create_folder(
+ &self,
+ name: &str,
+ parent: Option<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url = "https://www.googleapis.com/drive/v3/files";
+
+ let mut req = Request::post(url)
+ .header(header::CONTENT_TYPE, "application/json")
+ .body(AsyncBody::Bytes(bytes::Bytes::from(
+ serde_json::to_vec(&json!({
+ "name": name,
+ "mimeType": "application/vnd.google-apps.folder",
+ // If the parent is not provided, the folder will be
created in the root folder.
+ "parents": [parent.unwrap_or("root".to_owned())],
+ }))
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("failed to serialize json(create folder
result): {}", e),
+ )
+ })?,
+ )))
+ .map_err(new_request_build_error)?;
+
+ let _ = self.sign(&mut req);
+
+ self.client.send(req).await
+ }
+
+ pub async fn gdrive_stat(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path_id = self.get_file_id_by_path(path).await?;
+
+ // The file metadata in the Google Drive API is very complex.
+ // For now, we only need the file id, name, mime type and modified
time.
+ let mut req = Request::get(&format!(
+
"https://www.googleapis.com/drive/v3/files/{}?fields=id,name,mimeType,size,modifiedTime",
+ path_id.as_str()
+ ))
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ let _ = self.sign(&mut req);
+
+ self.client.send(req).await
}
pub async fn gdrive_get(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
@@ -149,10 +282,10 @@ impl GdriveCore {
self.get_file_id_by_path(path).await?
);
- let req = self
- .sign(Request::get(&url))
+ let mut req = Request::get(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
+ let _ = self.sign(&mut req);
self.client.send(req).await
}
@@ -165,11 +298,11 @@ impl GdriveCore {
body: AsyncBody,
) -> Result<Response<IncomingAsyncBody>> {
let url = format!(
- "https://www.googleapis.com/upload/drive/v3/files/{}",
+
"https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media",
self.get_file_id_by_path(path).await?
);
- let mut req = Request::patch(&url);
+ let mut req = Request::put(&url);
if let Some(size) = size {
req = req.header(header::CONTENT_LENGTH, size)
@@ -179,7 +312,8 @@ impl GdriveCore {
req = req.header(header::CONTENT_TYPE, mime)
}
- let req = self.sign(req).body(body).map_err(new_request_build_error)?;
+ let mut req = req.body(body).map_err(new_request_build_error)?;
+ let _ = self.sign(&mut req);
self.client.send(req).await
}
@@ -190,29 +324,112 @@ impl GdriveCore {
self.get_file_id_by_path(path).await?
);
- let req = self
- .sign(Request::delete(&url))
+ let mut req = Request::delete(&url)
.body(AsyncBody::Empty)
.map_err(new_request_build_error)?;
+ let _ = self.sign(&mut req);
+
+ self.client.send(req).await
+ }
+
+ pub async fn gdrive_upload_simple_request(
+ &self,
+ path: &str,
+ size: u64,
+ body: bytes::Bytes,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let parent = self.ensure_parent_path(path).await?;
+
+ let url =
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart";
+
+ let file_name = path.split('/').filter(|&x|
!x.is_empty()).last().unwrap();
+
+ let metadata = &json!({
+ "name": file_name,
+ "parents": [parent],
+ });
+
+ let req = Request::post(url).header("X-Upload-Content-Length", size);
+
+ let multipart = Multipart::new()
+ .part(
+ FormDataPart::new("metadata")
+ .header(
+ header::CONTENT_TYPE,
+ "application/json; charset=UTF-8".parse().unwrap(),
+ )
+ .content(metadata.to_string()),
+ )
+ .part(
+ FormDataPart::new("file")
+ .header(
+ header::CONTENT_TYPE,
+ "application/octet-stream".parse().unwrap(),
+ )
+ .content(body),
+ );
+
+ let mut req = multipart.apply(req)?;
+
+ let _ = self.sign(&mut req);
+
+ self.client.send(req).await
+ }
+
+ pub async fn gdrive_upload_overwrite_simple_request(
+ &self,
+ file_id: &str,
+ size: u64,
+ body: bytes::Bytes,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let url = format!(
+
"https://www.googleapis.com/upload/drive/v3/files/{}?uploadType=media",
+ file_id
+ );
+
+ let mut req = Request::patch(url)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .header(header::CONTENT_LENGTH, size)
+ .header("X-Upload-Content-Length", size)
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ let _ = self.sign(&mut req);
+
self.client.send(req).await
}
- fn sign(&self, mut req: Builder) -> Builder {
+ fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let auth_header_content = format!("Bearer {}", self.access_token);
- req = req.header(header::AUTHORIZATION, auth_header_content);
- req
+ req.headers_mut()
+ .insert(header::AUTHORIZATION,
auth_header_content.parse().unwrap());
+
+ Ok(())
}
}
+// This is the file struct returned by the Google Drive API.
+// This is a complex struct, but we only add the fields we need.
// refer to
https://developers.google.com/drive/api/reference/rest/v3/files#File
-#[derive(Deserialize)]
-struct GdriveFile {
- id: String,
+#[derive(Deserialize, Debug)]
+#[serde(rename_all = "camelCase")]
+pub struct GdriveFile {
+ pub mime_type: String,
+ pub id: String,
+ pub name: String,
+ pub size: Option<String>,
+ // The modified time is not returned unless the `fields`
+ // query parameter contains `modifiedTime`.
+ // As we only need the modified time when we do `stat` operation,
+ // if other operations(such as search) do not specify the `fields` query
parameter,
+ // try to access this field, it will be `None`.
+ pub modified_time: Option<String>,
}
// refer to
https://developers.google.com/drive/api/reference/rest/v3/files/list
#[derive(Deserialize)]
-struct GdriveFileList {
- files: Vec<GdriveFile>,
+#[serde(rename_all = "camelCase")]
+pub(crate) struct GdriveFileList {
+ pub(crate) files: Vec<GdriveFile>,
}
diff --git a/core/src/services/gdrive/writer.rs
b/core/src/services/gdrive/writer.rs
index ba8f20b8f..48d88f3ec 100644
--- a/core/src/services/gdrive/writer.rs
+++ b/core/src/services/gdrive/writer.rs
@@ -24,31 +24,61 @@ use http::StatusCode;
use super::core::GdriveCore;
use super::error::parse_error;
use crate::raw::*;
+use crate::services::gdrive::core::GdriveFile;
use crate::*;
pub struct GdriveWriter {
core: Arc<GdriveCore>,
- op: OpWrite,
+
path: String,
+
+ file_id: Option<String>,
}
impl GdriveWriter {
- pub fn new(core: Arc<GdriveCore>, op: OpWrite, path: String) -> Self {
- GdriveWriter { core, op, path }
+ pub fn new(core: Arc<GdriveCore>, path: String, file_id: Option<String>)
-> Self {
+ GdriveWriter {
+ core,
+ path,
+
+ file_id,
+ }
}
-}
-#[async_trait]
-impl oio::Write for GdriveWriter {
- async fn write(&mut self, bs: Bytes) -> Result<()> {
+ /// Write a single chunk of data to the object.
+ ///
+ /// This is used for small objects.
+ /// And should overwrite the object if it already exists.
+ pub async fn write_create(&mut self, size: u64, body: Bytes) -> Result<()>
{
let resp = self
.core
- .gdrive_update(
- &self.path,
- Some(bs.len()),
- self.op.content_type(),
- AsyncBody::Bytes(bs),
- )
+ .gdrive_upload_simple_request(&self.path, size, body)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::CREATED => {
+ let bs = resp.into_body().bytes().await?;
+
+ let file = serde_json::from_slice::<GdriveFile>(&bs)
+ .map_err(new_json_deserialize_error)?;
+
+ self.file_id = Some(file.id);
+
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn write_overwrite(&self, size: u64, body: Bytes) -> Result<()> {
+ let file_id = self.file_id.as_ref().ok_or_else(|| {
+ Error::new(ErrorKind::Unexpected, "file_id is required for
overwrite")
+ })?;
+ let resp = self
+ .core
+ .gdrive_upload_overwrite_simple_request(file_id, size, body)
.await?;
let status = resp.status();
@@ -61,12 +91,20 @@ impl oio::Write for GdriveWriter {
_ => Err(parse_error(resp).await?),
}
}
+}
+
+#[async_trait]
+impl oio::Write for GdriveWriter {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ if self.file_id.is_none() {
+ self.write_create(bs.len() as u64, bs).await
+ } else {
+ self.write_overwrite(bs.len() as u64, bs).await
+ }
+ }
async fn sink(&mut self, _size: u64, _s: oio::Streamer) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
+ Err(Error::new(ErrorKind::Unsupported, "sink is not supported"))
}
async fn abort(&mut self) -> Result<()> {