This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 57581b90a feat(service/gdrive): add gdrive list support (#3025)
57581b90a is described below

commit 57581b90a8a3c7220a9ca09e006f8fb997c90bcd
Author: Flash <[email protected]>
AuthorDate: Mon Sep 11 22:00:42 2023 +0800

    feat(service/gdrive): add gdrive list support (#3025)
    
    * add gdrive list support
    
    * update
    
    * update
---
 core/src/services/gdrive/backend.rs |  14 ++++-
 core/src/services/gdrive/core.rs    |  65 ++++++++++++++++++++++-
 core/src/services/gdrive/error.rs   |  32 ++++++++++-
 core/src/services/gdrive/mod.rs     |   1 +
 core/src/services/gdrive/pager.rs   | 103 ++++++++++++++++++++++++++++++++++++
 5 files changed, 211 insertions(+), 4 deletions(-)

diff --git a/core/src/services/gdrive/backend.rs 
b/core/src/services/gdrive/backend.rs
index f50713ac7..ea67ed200 100644
--- a/core/src/services/gdrive/backend.rs
+++ b/core/src/services/gdrive/backend.rs
@@ -24,6 +24,7 @@ use http::StatusCode;
 
 use super::core::GdriveCore;
 use super::error::parse_error;
+use super::pager::GdrivePager;
 use super::writer::GdriveWriter;
 use crate::raw::*;
 use crate::services::gdrive::core::GdriveFile;
@@ -42,7 +43,7 @@ impl Accessor for GdriveBackend {
     type BlockingReader = ();
     type Writer = oio::OneShotWriter<GdriveWriter>;
     type BlockingWriter = ();
-    type Pager = ();
+    type Pager = GdrivePager;
     type BlockingPager = ();
 
     fn info(&self) -> AccessorInfo {
@@ -54,6 +55,10 @@ impl Accessor for GdriveBackend {
 
                 read: true,
 
+                list: true,
+
+                list_with_delimiter_slash: true,
+
                 write: true,
 
                 create_dir: true,
@@ -248,6 +253,13 @@ impl Accessor for GdriveBackend {
             Err(e)
         }
     }
+
+    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        Ok((
+            RpList::default(),
+            GdrivePager::new(path.into(), self.core.clone()),
+        ))
+    }
 }
 
 impl GdriveBackend {
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 6e08f4d31..4e7216184 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -24,6 +24,7 @@ use bytes;
 use bytes::Bytes;
 use chrono::DateTime;
 use chrono::Utc;
+use futures::stream;
 use http::header;
 use http::Request;
 use http::Response;
@@ -99,7 +100,7 @@ impl GdriveCore {
                 "name = \"{}\" and \"{}\" in parents and trashed = false",
                 item, parent_id
             );
-            if i != file_path_items.len() - 1 {
+            if i != file_path_items.len() - 1 || path.ends_with('/') {
                 query += " and mimeType = 
'application/vnd.google-apps.folder'";
             }
 
@@ -325,6 +326,67 @@ impl GdriveCore {
         self.client.send(req).await
     }
 
+    pub async fn gdrive_list(
+        &self,
+        path: &str,
+        page_size: i32,
+        next_page_token: Option<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let file_id = self.get_file_id_by_path(path).await;
+
+        // when list over a no exist dir, `get_file_id_by_path` will return a 
NotFound Error, we should return a empty list in this case.
+        let q = match file_id {
+            Ok(file_id) => {
+                format!("'{}' in parents and trashed = false", file_id)
+            }
+            Err(e) => match e.kind() {
+                ErrorKind::NotFound => {
+                    return Response::builder()
+                        .status(StatusCode::OK)
+                        .body(IncomingAsyncBody::new(
+                            Box::new(oio::into_stream(stream::empty())),
+                            Some(0),
+                        ))
+                        .map_err(|e| {
+                            Error::new(
+                                ErrorKind::Unexpected,
+                                &format!("failed to create a empty response 
for list: {}", e),
+                            )
+                            .set_source(e)
+                        });
+                }
+                _ => {
+                    return Err(e);
+                }
+            },
+        };
+
+        let url = match next_page_token {
+            Some(page_token) => {
+                format!(
+                    
"https://www.googleapis.com/drive/v3/files?pageSize={}&pageToken={}&q={}";,
+                    page_size,
+                    page_token,
+                    percent_encode_path(q.as_str())
+                )
+            }
+            None => {
+                format!(
+                    
"https://www.googleapis.com/drive/v3/files?pageSize={}&q={}";,
+                    page_size,
+                    percent_encode_path(q.as_str())
+                )
+            }
+        };
+
+        let mut req = Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        self.sign(&mut req).await?;
+
+        self.client.send(req).await
+    }
+
     // Update with content and metadata
     pub async fn gdrive_patch_metadata_request(
         &self,
@@ -556,4 +618,5 @@ pub struct GdriveFile {
 #[serde(rename_all = "camelCase")]
 pub(crate) struct GdriveFileList {
     pub(crate) files: Vec<GdriveFile>,
+    pub(crate) next_page_token: Option<String>,
 }
diff --git a/core/src/services/gdrive/error.rs 
b/core/src/services/gdrive/error.rs
index 60de6ae0d..9a1360fa3 100644
--- a/core/src/services/gdrive/error.rs
+++ b/core/src/services/gdrive/error.rs
@@ -17,18 +17,29 @@
 
 use http::Response;
 use http::StatusCode;
+use serde::Deserialize;
 
 use crate::raw::*;
 use crate::Error;
 use crate::ErrorKind;
 use crate::Result;
 
+#[derive(Default, Debug, Deserialize)]
+struct GdriveError {
+    error: GdriveInnerError,
+}
+
+#[derive(Default, Debug, Deserialize)]
+struct GdriveInnerError {
+    message: String,
+}
+
 /// Parse error response into Error.
 pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
     let (parts, body) = resp.into_parts();
     let bs = body.bytes().await?;
 
-    let (kind, retryable) = match parts.status {
+    let (mut kind, mut retryable) = match parts.status {
         StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
         StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
         StatusCode::INTERNAL_SERVER_ERROR
@@ -38,7 +49,14 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
         _ => (ErrorKind::Unexpected, false),
     };
 
-    let message = String::from_utf8_lossy(&bs);
+    let (message, gdrive_err) = 
serde_json::from_slice::<GdriveError>(bs.as_ref())
+        .map(|gdrive_err| (format!("{gdrive_err:?}"), Some(gdrive_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    if let Some(gdrive_err) = gdrive_err {
+        (kind, retryable) =
+            
parse_gdrive_error_code(gdrive_err.error.message.as_str()).unwrap_or((kind, 
retryable));
+    }
 
     let mut err = Error::new(kind, &message);
 
@@ -50,3 +68,13 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
 
     Ok(err)
 }
+
+pub fn parse_gdrive_error_code(message: &str) -> Option<(ErrorKind, bool)> {
+    match message {
+        // > Please reduce your request rate.
+        //
+        // It's Ok to retry since later on the request rate may get reduced.
+        "User rate limit exceeded." => Some((ErrorKind::RateLimited, true)),
+        _ => None,
+    }
+}
diff --git a/core/src/services/gdrive/mod.rs b/core/src/services/gdrive/mod.rs
index 5f77ecc50..2ef0caaec 100644
--- a/core/src/services/gdrive/mod.rs
+++ b/core/src/services/gdrive/mod.rs
@@ -21,4 +21,5 @@ mod core;
 mod error;
 
 pub use builder::GdriveBuilder as Gdrive;
+mod pager;
 mod writer;
diff --git a/core/src/services/gdrive/pager.rs 
b/core/src/services/gdrive/pager.rs
new file mode 100644
index 000000000..ec4ad90d1
--- /dev/null
+++ b/core/src/services/gdrive/pager.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::{
+    raw::{
+        build_rel_path, build_rooted_abs_path, new_json_deserialize_error,
+        oio::{self},
+    },
+    EntryMode, Metadata, Result,
+};
+use async_trait::async_trait;
+use http::StatusCode;
+
+use super::{
+    core::{GdriveCore, GdriveFileList},
+    error::parse_error,
+};
+pub struct GdrivePager {
+    path: String,
+    core: Arc<GdriveCore>,
+    next_page_token: Option<String>,
+    done: bool,
+}
+
+impl GdrivePager {
+    pub fn new(path: String, core: Arc<GdriveCore>) -> Self {
+        Self {
+            path,
+            core,
+            next_page_token: None,
+            done: false,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::Page for GdrivePager {
+    async fn next(&mut self) -> Result<Option<Vec<oio::Entry>>> {
+        if self.done {
+            return Ok(None);
+        }
+
+        let resp = self
+            .core
+            .gdrive_list(&self.path, 100, self.next_page_token.clone())
+            .await?;
+
+        let bytes = match resp.status() {
+            StatusCode::OK => resp.into_body().bytes().await?,
+            _ => return Err(parse_error(resp).await?),
+        };
+
+        if bytes.is_empty() {
+            return Ok(None);
+        }
+
+        let decoded_response =
+            
serde_json::from_slice::<GdriveFileList>(&bytes).map_err(new_json_deserialize_error)?;
+
+        if let Some(next_page_token) = decoded_response.next_page_token {
+            self.next_page_token = Some(next_page_token);
+        } else {
+            self.done = true;
+        }
+
+        let entries: Vec<oio::Entry> = decoded_response
+            .files
+            .into_iter()
+            .map(|mut file| {
+                let file_type = if file.mime_type.as_str() == 
"application/vnd.google-apps.folder" {
+                    file.name = format!("{}/", file.name);
+                    EntryMode::DIR
+                } else {
+                    EntryMode::FILE
+                };
+
+                let root = &self.core.root;
+                let path = format!("{}{}", build_rooted_abs_path(root, 
&self.path), file.name);
+                let normalized_path = build_rel_path(root, &path);
+
+                oio::Entry::new(&normalized_path, Metadata::new(file_type))
+            })
+            .collect();
+
+        Ok(Some(entries))
+    }
+}

Reply via email to