This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 57581b90a feat(service/gdrive): add gdrive list support (#3025)
57581b90a is described below
commit 57581b90a8a3c7220a9ca09e006f8fb997c90bcd
Author: Flash <[email protected]>
AuthorDate: Mon Sep 11 22:00:42 2023 +0800
feat(service/gdrive): add gdrive list support (#3025)
* add gdrive list support
* update
* update
---
core/src/services/gdrive/backend.rs | 14 ++++-
core/src/services/gdrive/core.rs | 65 ++++++++++++++++++++++-
core/src/services/gdrive/error.rs | 32 ++++++++++-
core/src/services/gdrive/mod.rs | 1 +
core/src/services/gdrive/pager.rs | 103 ++++++++++++++++++++++++++++++++++++
5 files changed, 211 insertions(+), 4 deletions(-)
diff --git a/core/src/services/gdrive/backend.rs
b/core/src/services/gdrive/backend.rs
index f50713ac7..ea67ed200 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -24,6 +24,7 @@ use http::StatusCode;
use super::core::GdriveCore;
use super::error::parse_error;
+use super::pager::GdrivePager;
use super::writer::GdriveWriter;
use crate::raw::*;
use crate::services::gdrive::core::GdriveFile;
@@ -42,7 +43,7 @@ impl Accessor for GdriveBackend {
type BlockingReader = ();
type Writer = oio::OneShotWriter<GdriveWriter>;
type BlockingWriter = ();
- type Pager = ();
+ type Pager = GdrivePager;
type BlockingPager = ();
fn info(&self) -> AccessorInfo {
@@ -54,6 +55,10 @@ impl Accessor for GdriveBackend {
read: true,
+ list: true,
+
+ list_with_delimiter_slash: true,
+
write: true,
create_dir: true,
@@ -248,6 +253,13 @@ impl Accessor for GdriveBackend {
Err(e)
}
}
+
+ async fn list(&self, path: &str, _args: OpList) -> Result<(RpList,
Self::Pager)> {
+ Ok((
+ RpList::default(),
+ GdrivePager::new(path.into(), self.core.clone()),
+ ))
+ }
}
impl GdriveBackend {
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 6e08f4d31..4e7216184 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -24,6 +24,7 @@ use bytes;
use bytes::Bytes;
use chrono::DateTime;
use chrono::Utc;
+use futures::stream;
use http::header;
use http::Request;
use http::Response;
@@ -99,7 +100,7 @@ impl GdriveCore {
"name = \"{}\" and \"{}\" in parents and trashed = false",
item, parent_id
);
- if i != file_path_items.len() - 1 {
+ if i != file_path_items.len() - 1 || path.ends_with('/') {
query += " and mimeType =
'application/vnd.google-apps.folder'";
}
@@ -325,6 +326,67 @@ impl GdriveCore {
self.client.send(req).await
}
+ pub async fn gdrive_list(
+ &self,
+ path: &str,
+ page_size: i32,
+ next_page_token: Option<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let file_id = self.get_file_id_by_path(path).await;
+
+ // when list over a no exist dir, `get_file_id_by_path` will return a
NotFound Error, we should return a empty list in this case.
+ let q = match file_id {
+ Ok(file_id) => {
+ format!("'{}' in parents and trashed = false", file_id)
+ }
+ Err(e) => match e.kind() {
+ ErrorKind::NotFound => {
+ return Response::builder()
+ .status(StatusCode::OK)
+ .body(IncomingAsyncBody::new(
+ Box::new(oio::into_stream(stream::empty())),
+ Some(0),
+ ))
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::Unexpected,
+ &format!("failed to create a empty response
for list: {}", e),
+ )
+ .set_source(e)
+ });
+ }
+ _ => {
+ return Err(e);
+ }
+ },
+ };
+
+ let url = match next_page_token {
+ Some(page_token) => {
+ format!(
+
"https://www.googleapis.com/drive/v3/files?pageSize={}&pageToken={}&q={}",
+ page_size,
+ page_token,
+ percent_encode_path(q.as_str())
+ )
+ }
+ None => {
+ format!(
+
"https://www.googleapis.com/drive/v3/files?pageSize={}&q={}",
+ page_size,
+ percent_encode_path(q.as_str())
+ )
+ }
+ };
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ self.sign(&mut req).await?;
+
+ self.client.send(req).await
+ }
+
// Update with content and metadata
pub async fn gdrive_patch_metadata_request(
&self,
@@ -556,4 +618,5 @@ pub struct GdriveFile {
#[serde(rename_all = "camelCase")]
pub(crate) struct GdriveFileList {
pub(crate) files: Vec<GdriveFile>,
+ pub(crate) next_page_token: Option<String>,
}
diff --git a/core/src/services/gdrive/error.rs
b/core/src/services/gdrive/error.rs
index 60de6ae0d..9a1360fa3 100644
--- a/core/src/services/gdrive/error.rs
+++ b/core/src/services/gdrive/error.rs
@@ -17,18 +17,29 @@
use http::Response;
use http::StatusCode;
+use serde::Deserialize;
use crate::raw::*;
use crate::Error;
use crate::ErrorKind;
use crate::Result;
+#[derive(Default, Debug, Deserialize)]
+struct GdriveError {
+ error: GdriveInnerError,
+}
+
+#[derive(Default, Debug, Deserialize)]
+struct GdriveInnerError {
+ message: String,
+}
+
/// Parse error response into Error.
pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
let (parts, body) = resp.into_parts();
let bs = body.bytes().await?;
- let (kind, retryable) = match parts.status {
+ let (mut kind, mut retryable) = match parts.status {
StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
StatusCode::INTERNAL_SERVER_ERROR
@@ -38,7 +49,14 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
_ => (ErrorKind::Unexpected, false),
};
- let message = String::from_utf8_lossy(&bs);
+ let (message, gdrive_err) =
serde_json::from_slice::<GdriveError>(bs.as_ref())
+ .map(|gdrive_err| (format!("{gdrive_err:?}"), Some(gdrive_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ if let Some(gdrive_err) = gdrive_err {
+ (kind, retryable) =
+
parse_gdrive_error_code(gdrive_err.error.message.as_str()).unwrap_or((kind,
retryable));
+ }
let mut err = Error::new(kind, &message);
@@ -50,3 +68,13 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
Ok(err)
}
+
+pub fn parse_gdrive_error_code(message: &str) -> Option<(ErrorKind, bool)> {
+ match message {
+ // > Please reduce your request rate.
+ //
+ // It's Ok to retry since later on the request rate may get reduced.
+ "User rate limit exceeded." => Some((ErrorKind::RateLimited, true)),
+ _ => None,
+ }
+}
diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs
index 5f77ecc50..2ef0caaec 100644
--- a/core/src/services/gdrive/mod.rs
+++ b/core/src/services/gdrive/mod.rs
@@ -21,4 +21,5 @@ mod core;
mod error;
pub use builder::GdriveBuilder as Gdrive;
+mod pager;
mod writer;
diff --git a/core/src/services/gdrive/pager.rs
b/core/src/services/gdrive/pager.rs
new file mode 100644
index 000000000..ec4ad90d1
--- /dev/null
+++ b/core/src/services/gdrive/pager.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::{
+ raw::{
+ build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
+ oio::{self},
+ },
+ EntryMode, Metadata, Result,
+};
+use async_trait::async_trait;
+use http::StatusCode;
+
+use super::{
+ core::{GdriveCore, GdriveFileList},
+ error::parse_error,
+};
+pub struct GdrivePager {
+ path: String,
+ core: Arc<GdriveCore>,
+ next_page_token: Option<String>,
+ done: bool,
+}
+
+impl GdrivePager {
+ pub fn new(path: String, core: Arc<GdriveCore>) -> Self {
+ Self {
+ path,
+ core,
+ next_page_token: None,
+ done: false,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::Page for GdrivePager {
+ async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+ if self.done {
+ return Ok(None);
+ }
+
+ let resp = self
+ .core
+ .gdrive_list(&self.path, 100, self.next_page_token.clone())
+ .await?;
+
+ let bytes = match resp.status() {
+ StatusCode::OK => resp.into_body().bytes().await?,
+ _ => return Err(parse_error(resp).await?),
+ };
+
+ if bytes.is_empty() {
+ return Ok(None);
+ }
+
+ let decoded_response =
+
serde_json::from_slice::<GdriveFileList>(&bytes).map_err(new_json_deserialize_error)?;
+
+ if let Some(next_page_token) = decoded_response.next_page_token {
+ self.next_page_token = Some(next_page_token);
+ } else {
+ self.done = true;
+ }
+
+ let entries: Vec<oio::Entry> = decoded_response
+ .files
+ .into_iter()
+ .map(|mut file| {
+ let file_type = if file.mime_type.as_str() ==
"application/vnd.google-apps.folder" {
+ file.name = format!("{}/", file.name);
+ EntryMode::DIR
+ } else {
+ EntryMode::FILE
+ };
+
+ let root = &self.core.root;
+ let path = format!("{}{}", build_rooted_abs_path(root,
&self.path), file.name);
+ let normalized_path = build_rel_path(root, &path);
+
+ oio::Entry::new(&normalized_path, Metadata::new(file_type))
+ })
+ .collect();
+
+ Ok(Some(entries))
+ }
+}