This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 5721bbd0 refactor(services/gcs): Migrate to async reqsign (#1906)
5721bbd0 is described below

commit 5721bbd02b5d5cea85ddbd35c1fbd8fb6d1ab2fe
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 17:22:01 2023 +0800

    refactor(services/gcs): Migrate to async reqsign (#1906)
    
    * refactor(services/gcs): Migrate to async reqsign
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 bindings/java/src/lib.rs         |   3 +-
 core/src/services/gcs/backend.rs | 295 ++++++++-------------------------------
 core/src/services/gcs/core.rs    | 262 ++++++++++++++++++++++++++++++++++
 core/src/services/gcs/mod.rs     |   1 +
 core/src/services/gcs/pager.rs   |  24 ++--
 core/src/services/gcs/writer.rs  |  21 ++-
 6 files changed, 339 insertions(+), 267 deletions(-)

diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs
index 88d10132..487384f7 100644
--- a/bindings/java/src/lib.rs
+++ b/bindings/java/src/lib.rs
@@ -22,7 +22,8 @@ use jni::objects::JClass;
 use jni::objects::JMap;
 use jni::objects::JObject;
 use jni::objects::JString;
-use jni::sys::{jboolean, jlong};
+use jni::sys::jboolean;
+use jni::sys::jlong;
 use jni::JNIEnv;
 use opendal::BlockingOperator;
 use opendal::Operator;
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index eda03ae6..8a2e8fc2 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -18,25 +18,22 @@
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fmt::Formatter;
-use std::fmt::Write;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use http::header::CONTENT_LENGTH;
-use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
 use http::StatusCode;
 use log::debug;
-use reqsign::GoogleSigner;
+use reqsign_0_9::GoogleCredentialLoader;
+use reqsign_0_9::GoogleSigner;
+use reqsign_0_9::GoogleTokenLoader;
 use serde::Deserialize;
 use serde_json;
 use time::format_description::well_known::Rfc3339;
 use time::OffsetDateTime;
 
+use super::core::GcsCore;
 use super::error::parse_error;
 use super::pager::GcsPager;
-use super::uri::percent_encode_path;
 use super::writer::GcsWriter;
 use crate::ops::*;
 use crate::raw::*;
@@ -275,45 +272,42 @@ impl Builder for GcsBuilder {
             .endpoint
             .clone()
             .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
-
         debug!("backend use endpoint: {endpoint}");
 
-        let signer = if let Some(signer) = &self.signer {
-            signer.clone()
+        let mut cred_loader = GoogleCredentialLoader::default();
+        if let Some(cred) = &self.credential {
+            cred_loader = cred_loader.with_content(cred);
+        }
+        if let Some(cred) = &self.credential_path {
+            cred_loader = cred_loader.with_path(cred);
+        }
+
+        let scope = if let Some(scope) = &self.scope {
+            scope
         } else {
-            // build signer
-            let mut signer_builder = GoogleSigner::builder();
-            if let Some(scope) = &self.scope {
-                signer_builder.scope(scope);
-            } else {
-                signer_builder.scope(DEFAULT_GCS_SCOPE);
-            }
-            if let Some(account) = &self.service_account {
-                signer_builder.service_account(account);
-            }
-            if let Some(cred) = &self.credential {
-                signer_builder.credential_content(cred);
-            }
-            if let Some(cred) = &self.credential_path {
-                signer_builder.credential_path(cred);
-            }
-            let signer = signer_builder.build().map_err(|e| {
-                Error::new(ErrorKind::ConfigInvalid, "build GoogleSigner")
-                    .with_operation("Builder::build")
-                    .with_context("service", Scheme::Gcs)
-                    .with_context("bucket", bucket)
-                    .with_context("endpoint", &endpoint)
-                    .set_source(e)
-            })?;
-            Arc::new(signer)
+            DEFAULT_GCS_SCOPE
         };
 
+        let mut token_loader = GoogleTokenLoader::new(scope, client.client());
+        if let Some(account) = &self.service_account {
+            token_loader = token_loader.with_service_account(account);
+        }
+        if let Ok(Some(cred)) = cred_loader.load() {
+            token_loader = token_loader.with_credentials(cred)
+        }
+
+        let signer = GoogleSigner::new("storage");
+
         let backend = GcsBackend {
-            root,
-            endpoint,
-            bucket: bucket.clone(),
-            signer,
-            client,
+            core: Arc::new(GcsCore {
+                endpoint,
+                bucket: bucket.to_string(),
+                root,
+                client,
+                signer,
+                token_loader,
+                credential_loader: cred_loader,
+            }),
         };
 
         Ok(backend)
@@ -321,27 +315,9 @@ impl Builder for GcsBuilder {
 }
 
 /// GCS storage backend
-#[derive(Clone)]
+#[derive(Clone, Debug)]
 pub struct GcsBackend {
-    endpoint: String,
-    bucket: String,
-    // root should end with "/"
-    root: String,
-
-    pub client: HttpClient,
-    pub signer: Arc<GoogleSigner>,
-}
-
-impl Debug for GcsBackend {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut de = f.debug_struct("Backend");
-        de.field("endpoint", &self.endpoint)
-            .field("bucket", &self.bucket)
-            .field("root", &self.root)
-            .field("client", &self.client)
-            .field("signer", &"<redacted>")
-            .finish()
-    }
+    core: Arc<GcsCore>,
 }
 
 #[async_trait]
@@ -359,19 +335,21 @@ impl Accessor for GcsBackend {
 
         let mut am = AccessorInfo::default();
         am.set_scheme(Scheme::Gcs)
-            .set_root(&self.root)
-            .set_name(&self.bucket)
+            .set_root(&self.core.root)
+            .set_name(&self.core.bucket)
             .set_capabilities(Read | Write | List | Scan | Copy)
             .set_hints(ReadStreamable);
         am
     }
 
     async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
-        let mut req = self.gcs_insert_object_request(path, Some(0), None, 
AsyncBody::Empty)?;
+        let mut req = self
+            .core
+            .gcs_insert_object_request(path, Some(0), None, AsyncBody::Empty)?;
 
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         if resp.status().is_success() {
             resp.into_body().consume().await?;
@@ -382,7 +360,7 @@ impl Accessor for GcsBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.gcs_get_object(path, args.range()).await?;
+        let resp = self.core.gcs_get_object(path, args.range()).await?;
 
         if resp.status().is_success() {
             let meta = parse_into_metadata(path, resp.headers())?;
@@ -402,32 +380,19 @@ impl Accessor for GcsBackend {
 
         Ok((
             RpWrite::default(),
-            GcsWriter::new(self.clone(), args, path.to_string()),
+            GcsWriter::new(self.core.clone(), args, path.to_string()),
         ))
     }
 
     async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
-        let source = percent_encode_path(&build_abs_path(&self.root, 
from.trim_end_matches('/')));
-        let dest = percent_encode_path(&build_abs_path(&self.root, 
to.trim_end_matches('/')));
-        let req_uri = format!(
-            "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
-            self.endpoint, self.bucket, source, self.bucket, dest
-        );
+        let resp = self.core.gcs_copy_object(from, to).await?;
 
-        let mut req = Request::post(req_uri)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        let resp = self.client.send(req).await?;
-
-        if !resp.status().is_success() {
-            return Err(parse_error(resp).await?);
+        if resp.status().is_success() {
+            resp.into_body().consume().await?;
+            Ok(RpCopy::default())
+        } else {
+            Err(parse_error(resp).await?)
         }
-        resp.into_body().consume().await?;
-
-        Ok(RpCopy::default())
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
@@ -436,7 +401,7 @@ impl Accessor for GcsBackend {
             return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
         }
 
-        let resp = self.gcs_get_object_metadata(path).await?;
+        let resp = self.core.gcs_get_object_metadata(path).await?;
 
         if resp.status().is_success() {
             // read http response body
@@ -477,7 +442,7 @@ impl Accessor for GcsBackend {
     }
 
     async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
-        let resp = self.gcs_delete_object(path).await?;
+        let resp = self.core.gcs_delete_object(path).await?;
 
         // deleting not existing objects is ok
         if resp.status().is_success() || resp.status() == 
StatusCode::NOT_FOUND {
@@ -490,168 +455,18 @@ impl Accessor for GcsBackend {
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
         Ok((
             RpList::default(),
-            GcsPager::new(Arc::new(self.clone()), &self.root, path, "/", 
args.limit()),
+            GcsPager::new(self.core.clone(), path, "/", args.limit()),
         ))
     }
 
     async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
         Ok((
             RpScan::default(),
-            GcsPager::new(Arc::new(self.clone()), &self.root, path, "", 
args.limit()),
+            GcsPager::new(self.core.clone(), path, "", args.limit()),
         ))
     }
 }
 
-impl GcsBackend {
-    fn gcs_get_object_request(&self, path: &str, range: BytesRange) -> 
Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/storage/v1/b/{}/o/{}?alt=media",
-            self.endpoint,
-            self.bucket,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::get(&url);
-
-        if !range.is_full() {
-            req = req.header(http::header::RANGE, range.to_header());
-        }
-
-        let req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    async fn gcs_get_object(
-        &self,
-        path: &str,
-        range: BytesRange,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let mut req = self.gcs_get_object_request(path, range)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub fn gcs_insert_object_request(
-        &self,
-        path: &str,
-        size: Option<usize>,
-        content_type: Option<&str>,
-        body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}",
-            self.endpoint,
-            self.bucket,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::post(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size)
-        }
-
-        if let Some(mime) = content_type {
-            req = req.header(CONTENT_TYPE, mime)
-        }
-
-        // Set body
-        let req = req.body(body).map_err(new_request_build_error)?;
-
-        Ok(req)
-    }
-
-    async fn gcs_get_object_metadata(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/storage/v1/b/{}/o/{}",
-            self.endpoint,
-            self.bucket,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::get(&url);
-
-        let mut req = req
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/storage/v1/b/{}/o/{}",
-            self.endpoint,
-            self.bucket,
-            percent_encode_path(&p)
-        );
-
-        let mut req = Request::delete(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub(crate) async fn gcs_list_objects(
-        &self,
-        path: &str,
-        page_token: &str,
-        delimiter: &str,
-        limit: Option<usize>,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let p = build_abs_path(&self.root, path);
-
-        let mut url = format!(
-            "{}/storage/v1/b/{}/o?prefix={}",
-            self.endpoint,
-            self.bucket,
-            percent_encode_path(&p)
-        );
-        if !delimiter.is_empty() {
-            write!(url, "&delimiter={delimiter}").expect("write into string 
must succeed");
-        }
-        if let Some(limit) = limit {
-            write!(url, "&maxResults={limit}").expect("write into string must 
succeed");
-        }
-        if !page_token.is_empty() {
-            // NOTE:
-            //
-            // GCS uses pageToken in request and nextPageToken in response
-            //
-            // Don't know how will those tokens be like so this part are copied
-            // directly from AWS S3 service.
-            write!(url, "&pageToken={}", percent_encode_path(page_token))
-                .expect("write into string must succeed");
-        }
-
-        let mut req = Request::get(&url)
-            .body(AsyncBody::Empty)
-            .map_err(new_request_build_error)?;
-
-        self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
-        self.client.send(req).await
-    }
-}
-
 /// The raw json response returned by 
[`get`](https://cloud.google.com/storage/docs/json_api/v1/objects/get)
 #[derive(Debug, Default, Deserialize)]
 #[serde(default, rename_all = "camelCase")]
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
new file mode 100644
index 00000000..84526983
--- /dev/null
+++ b/core/src/services/gcs/core.rs
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use reqsign_0_9::GoogleCredentialLoader;
+use reqsign_0_9::GoogleSigner;
+use reqsign_0_9::GoogleToken;
+use reqsign_0_9::GoogleTokenLoader;
+
+use super::uri::percent_encode_path;
+use crate::raw::*;
+use crate::*;
+
+pub struct GcsCore {
+    pub endpoint: String,
+    pub bucket: String,
+    pub root: String,
+
+    pub client: HttpClient,
+    pub signer: GoogleSigner,
+    pub token_loader: GoogleTokenLoader,
+    pub credential_loader: GoogleCredentialLoader,
+}
+
+impl Debug for GcsCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut de = f.debug_struct("Backend");
+        de.field("endpoint", &self.endpoint)
+            .field("bucket", &self.bucket)
+            .field("root", &self.root)
+            .finish_non_exhaustive()
+    }
+}
+
+impl GcsCore {
+    async fn load_token(&self) -> Result<GoogleToken> {
+        let cred = self
+            .token_loader
+            .load()
+            .await
+            .map_err(new_request_credential_error)?;
+
+        if let Some(cred) = cred {
+            Ok(cred)
+        } else {
+            Err(Error::new(
+                ErrorKind::ConfigInvalid,
+                "no valid credential found",
+            ))
+        }
+    }
+
+    pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+        let cred = self.load_token().await?;
+
+        self.signer.sign(req, &cred).map_err(new_request_sign_error)
+    }
+
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+}
+
+impl GcsCore {
+    pub fn gcs_get_object_request(
+        &self,
+        path: &str,
+        range: BytesRange,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/storage/v1/b/{}/o/{}?alt=media",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::get(&url);
+
+        if !range.is_full() {
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn gcs_get_object(
+        &self,
+        path: &str,
+        range: BytesRange,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let mut req = self.gcs_get_object_request(path, range)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub fn gcs_insert_object_request(
+        &self,
+        path: &str,
+        size: Option<usize>,
+        content_type: Option<&str>,
+        body: AsyncBody,
+    ) -> Result<Request<AsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+
+        if let Some(size) = size {
+            req = req.header(CONTENT_LENGTH, size)
+        }
+
+        if let Some(mime) = content_type {
+            req = req.header(CONTENT_TYPE, mime)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        Ok(req)
+    }
+
+    pub async fn gcs_get_object_metadata(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/storage/v1/b/{}/o/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::get(&url);
+
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn gcs_delete_object(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/storage/v1/b/{}/o/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::delete(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+
+    pub async fn gcs_copy_object(
+        &self,
+        from: &str,
+        to: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let source = build_abs_path(&self.root, from);
+        let dest = build_abs_path(&self.root, to);
+
+        let req_uri = format!(
+            "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&source),
+            self.bucket,
+            percent_encode_path(&dest)
+        );
+
+        let mut req = Request::post(req_uri)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
+
+    pub async fn gcs_list_objects(
+        &self,
+        path: &str,
+        page_token: &str,
+        delimiter: &str,
+        limit: Option<usize>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let mut url = format!(
+            "{}/storage/v1/b/{}/o?prefix={}",
+            self.endpoint,
+            self.bucket,
+            percent_encode_path(&p)
+        );
+        if !delimiter.is_empty() {
+            write!(url, "&delimiter={delimiter}").expect("write into string 
must succeed");
+        }
+        if let Some(limit) = limit {
+            write!(url, "&maxResults={limit}").expect("write into string must 
succeed");
+        }
+        if !page_token.is_empty() {
+            // NOTE:
+            //
+            // GCS uses pageToken in request and nextPageToken in response
+            //
+            // Don't know how will those tokens be like so this part are copied
+            // directly from AWS S3 service.
+            write!(url, "&pageToken={}", percent_encode_path(page_token))
+                .expect("write into string must succeed");
+        }
+
+        let mut req = Request::get(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.sign(&mut req).await?;
+
+        self.send(req).await
+    }
+}
diff --git a/core/src/services/gcs/mod.rs b/core/src/services/gcs/mod.rs
index 5aedb812..6f95267c 100644
--- a/core/src/services/gcs/mod.rs
+++ b/core/src/services/gcs/mod.rs
@@ -18,6 +18,7 @@
 mod backend;
 pub use backend::GcsBuilder as Gcs;
 
+mod core;
 mod error;
 mod pager;
 mod uri;
diff --git a/core/src/services/gcs/pager.rs b/core/src/services/gcs/pager.rs
index 7977935f..6cb66aac 100644
--- a/core/src/services/gcs/pager.rs
+++ b/core/src/services/gcs/pager.rs
@@ -23,7 +23,7 @@ use serde_json;
 use time::format_description::well_known::Rfc3339;
 use time::OffsetDateTime;
 
-use super::backend::GcsBackend;
+use super::core::GcsCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
@@ -31,8 +31,8 @@ use crate::*;
 /// GcsPager takes over task of listing objects and
 /// helps walking directory
 pub struct GcsPager {
-    backend: Arc<GcsBackend>,
-    root: String,
+    core: Arc<GcsCore>,
+
     path: String,
     delimiter: String,
     limit: Option<usize>,
@@ -43,16 +43,10 @@ pub struct GcsPager {
 
 impl GcsPager {
     /// Generate a new directory walker
-    pub fn new(
-        backend: Arc<GcsBackend>,
-        root: &str,
-        path: &str,
-        delimiter: &str,
-        limit: Option<usize>,
-    ) -> Self {
+    pub fn new(core: Arc<GcsCore>, path: &str, delimiter: &str, limit: 
Option<usize>) -> Self {
         Self {
-            backend,
-            root: root.to_string(),
+            core,
+
             path: path.to_string(),
             delimiter: delimiter.to_string(),
             limit,
@@ -71,7 +65,7 @@ impl oio::Page for GcsPager {
         }
 
         let resp = self
-            .backend
+            .core
             .gcs_list_objects(&self.path, &self.page_token, &self.delimiter, 
self.limit)
             .await?;
 
@@ -93,7 +87,7 @@ impl oio::Page for GcsPager {
 
         for prefix in output.prefixes {
             let de = oio::Entry::new(
-                &build_rel_path(&self.root, &prefix),
+                &build_rel_path(&self.core.root, &prefix),
                 Metadata::new(EntryMode::DIR),
             );
 
@@ -124,7 +118,7 @@ impl oio::Page for GcsPager {
             })?;
             meta.set_last_modified(dt);
 
-            let de = oio::Entry::new(&build_rel_path(&self.root, 
&object.name), meta);
+            let de = oio::Entry::new(&build_rel_path(&self.core.root, 
&object.name), meta);
 
             entries.push(de);
         }
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 1dda8d9b..563152dc 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -15,45 +15,44 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use async_trait::async_trait;
 use bytes::Bytes;
 use http::StatusCode;
 
-use super::backend::GcsBackend;
+use super::core::GcsCore;
 use super::error::parse_error;
 use crate::ops::OpWrite;
 use crate::raw::*;
 use crate::*;
 
 pub struct GcsWriter {
-    backend: GcsBackend,
+    core: Arc<GcsCore>,
 
     op: OpWrite,
     path: String,
 }
 
 impl GcsWriter {
-    pub fn new(backend: GcsBackend, op: OpWrite, path: String) -> Self {
-        GcsWriter { backend, op, path }
+    pub fn new(core: Arc<GcsCore>, op: OpWrite, path: String) -> Self {
+        GcsWriter { core, op, path }
     }
 }
 
 #[async_trait]
 impl oio::Write for GcsWriter {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let mut req = self.backend.gcs_insert_object_request(
-            &self.path,
+        let mut req = self.core.gcs_insert_object_request(
+            &percent_encode_path(&self.path),
             Some(bs.len()),
             self.op.content_type(),
             AsyncBody::Bytes(bs),
         )?;
 
-        self.backend
-            .signer
-            .sign(&mut req)
-            .map_err(new_request_sign_error)?;
+        self.core.sign(&mut req).await?;
 
-        let resp = self.backend.client.send(req).await?;
+        let resp = self.core.send(req).await?;
 
         let status = resp.status();
 

Reply via email to