This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 5721bbd0 refactor(services/gcs): Migrate to async reqsign (#1906)
5721bbd0 is described below
commit 5721bbd02b5d5cea85ddbd35c1fbd8fb6d1ab2fe
Author: Xuanwo <[email protected]>
AuthorDate: Tue Apr 11 17:22:01 2023 +0800
refactor(services/gcs): Migrate to async reqsign (#1906)
* refactor(services/gcs): Migrate to async reqsign
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bindings/java/src/lib.rs | 3 +-
core/src/services/gcs/backend.rs | 295 ++++++++-------------------------------
core/src/services/gcs/core.rs | 262 ++++++++++++++++++++++++++++++++++
core/src/services/gcs/mod.rs | 1 +
core/src/services/gcs/pager.rs | 24 ++--
core/src/services/gcs/writer.rs | 21 ++-
6 files changed, 339 insertions(+), 267 deletions(-)
diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs
index 88d10132..487384f7 100644
--- a/bindings/java/src/lib.rs
+++ b/bindings/java/src/lib.rs
@@ -22,7 +22,8 @@ use jni::objects::JClass;
use jni::objects::JMap;
use jni::objects::JObject;
use jni::objects::JString;
-use jni::sys::{jboolean, jlong};
+use jni::sys::jboolean;
+use jni::sys::jlong;
use jni::JNIEnv;
use opendal::BlockingOperator;
use opendal::Operator;
diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs
index eda03ae6..8a2e8fc2 100644
--- a/core/src/services/gcs/backend.rs
+++ b/core/src/services/gcs/backend.rs
@@ -18,25 +18,22 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
-use std::fmt::Write;
use std::sync::Arc;
use async_trait::async_trait;
-use http::header::CONTENT_LENGTH;
-use http::header::CONTENT_TYPE;
-use http::Request;
-use http::Response;
use http::StatusCode;
use log::debug;
-use reqsign::GoogleSigner;
+use reqsign_0_9::GoogleCredentialLoader;
+use reqsign_0_9::GoogleSigner;
+use reqsign_0_9::GoogleTokenLoader;
use serde::Deserialize;
use serde_json;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
+use super::core::GcsCore;
use super::error::parse_error;
use super::pager::GcsPager;
-use super::uri::percent_encode_path;
use super::writer::GcsWriter;
use crate::ops::*;
use crate::raw::*;
@@ -275,45 +272,42 @@ impl Builder for GcsBuilder {
.endpoint
.clone()
.unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string());
-
debug!("backend use endpoint: {endpoint}");
- let signer = if let Some(signer) = &self.signer {
- signer.clone()
+ let mut cred_loader = GoogleCredentialLoader::default();
+ if let Some(cred) = &self.credential {
+ cred_loader = cred_loader.with_content(cred);
+ }
+ if let Some(cred) = &self.credential_path {
+ cred_loader = cred_loader.with_path(cred);
+ }
+
+ let scope = if let Some(scope) = &self.scope {
+ scope
} else {
- // build signer
- let mut signer_builder = GoogleSigner::builder();
- if let Some(scope) = &self.scope {
- signer_builder.scope(scope);
- } else {
- signer_builder.scope(DEFAULT_GCS_SCOPE);
- }
- if let Some(account) = &self.service_account {
- signer_builder.service_account(account);
- }
- if let Some(cred) = &self.credential {
- signer_builder.credential_content(cred);
- }
- if let Some(cred) = &self.credential_path {
- signer_builder.credential_path(cred);
- }
- let signer = signer_builder.build().map_err(|e| {
- Error::new(ErrorKind::ConfigInvalid, "build GoogleSigner")
- .with_operation("Builder::build")
- .with_context("service", Scheme::Gcs)
- .with_context("bucket", bucket)
- .with_context("endpoint", &endpoint)
- .set_source(e)
- })?;
- Arc::new(signer)
+ DEFAULT_GCS_SCOPE
};
+ let mut token_loader = GoogleTokenLoader::new(scope, client.client());
+ if let Some(account) = &self.service_account {
+ token_loader = token_loader.with_service_account(account);
+ }
+ if let Ok(Some(cred)) = cred_loader.load() {
+ token_loader = token_loader.with_credentials(cred)
+ }
+
+ let signer = GoogleSigner::new("storage");
+
let backend = GcsBackend {
- root,
- endpoint,
- bucket: bucket.clone(),
- signer,
- client,
+ core: Arc::new(GcsCore {
+ endpoint,
+ bucket: bucket.to_string(),
+ root,
+ client,
+ signer,
+ token_loader,
+ credential_loader: cred_loader,
+ }),
};
Ok(backend)
@@ -321,27 +315,9 @@ impl Builder for GcsBuilder {
}
/// GCS storage backend
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct GcsBackend {
- endpoint: String,
- bucket: String,
- // root should end with "/"
- root: String,
-
- pub client: HttpClient,
- pub signer: Arc<GoogleSigner>,
-}
-
-impl Debug for GcsBackend {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut de = f.debug_struct("Backend");
- de.field("endpoint", &self.endpoint)
- .field("bucket", &self.bucket)
- .field("root", &self.root)
- .field("client", &self.client)
- .field("signer", &"<redacted>")
- .finish()
- }
+ core: Arc<GcsCore>,
}
#[async_trait]
@@ -359,19 +335,21 @@ impl Accessor for GcsBackend {
let mut am = AccessorInfo::default();
am.set_scheme(Scheme::Gcs)
- .set_root(&self.root)
- .set_name(&self.bucket)
+ .set_root(&self.core.root)
+ .set_name(&self.core.bucket)
.set_capabilities(Read | Write | List | Scan | Copy)
.set_hints(ReadStreamable);
am
}
async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> {
- let mut req = self.gcs_insert_object_request(path, Some(0), None,
AsyncBody::Empty)?;
+ let mut req = self
+ .core
+ .gcs_insert_object_request(path, Some(0), None, AsyncBody::Empty)?;
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.client.send(req).await?;
+ let resp = self.core.send(req).await?;
if resp.status().is_success() {
resp.into_body().consume().await?;
@@ -382,7 +360,7 @@ impl Accessor for GcsBackend {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.gcs_get_object(path, args.range()).await?;
+ let resp = self.core.gcs_get_object(path, args.range()).await?;
if resp.status().is_success() {
let meta = parse_into_metadata(path, resp.headers())?;
@@ -402,32 +380,19 @@ impl Accessor for GcsBackend {
Ok((
RpWrite::default(),
- GcsWriter::new(self.clone(), args, path.to_string()),
+ GcsWriter::new(self.core.clone(), args, path.to_string()),
))
}
async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> {
- let source = percent_encode_path(&build_abs_path(&self.root,
from.trim_end_matches('/')));
- let dest = percent_encode_path(&build_abs_path(&self.root,
to.trim_end_matches('/')));
- let req_uri = format!(
- "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
- self.endpoint, self.bucket, source, self.bucket, dest
- );
+ let resp = self.core.gcs_copy_object(from, to).await?;
- let mut req = Request::post(req_uri)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- let resp = self.client.send(req).await?;
-
- if !resp.status().is_success() {
- return Err(parse_error(resp).await?);
+ if resp.status().is_success() {
+ resp.into_body().consume().await?;
+ Ok(RpCopy::default())
+ } else {
+ Err(parse_error(resp).await?)
}
- resp.into_body().consume().await?;
-
- Ok(RpCopy::default())
}
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
@@ -436,7 +401,7 @@ impl Accessor for GcsBackend {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}
- let resp = self.gcs_get_object_metadata(path).await?;
+ let resp = self.core.gcs_get_object_metadata(path).await?;
if resp.status().is_success() {
// read http response body
@@ -477,7 +442,7 @@ impl Accessor for GcsBackend {
}
async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
- let resp = self.gcs_delete_object(path).await?;
+ let resp = self.core.gcs_delete_object(path).await?;
// deleting not existing objects is ok
if resp.status().is_success() || resp.status() ==
StatusCode::NOT_FOUND {
@@ -490,168 +455,18 @@ impl Accessor for GcsBackend {
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
Ok((
RpList::default(),
- GcsPager::new(Arc::new(self.clone()), &self.root, path, "/",
args.limit()),
+ GcsPager::new(self.core.clone(), path, "/", args.limit()),
))
}
async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
Ok((
RpScan::default(),
- GcsPager::new(Arc::new(self.clone()), &self.root, path, "",
args.limit()),
+ GcsPager::new(self.core.clone(), path, "", args.limit()),
))
}
}
-impl GcsBackend {
- fn gcs_get_object_request(&self, path: &str, range: BytesRange) ->
Result<Request<AsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/storage/v1/b/{}/o/{}?alt=media",
- self.endpoint,
- self.bucket,
- percent_encode_path(&p)
- );
-
- let mut req = Request::get(&url);
-
- if !range.is_full() {
- req = req.header(http::header::RANGE, range.to_header());
- }
-
- let req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- Ok(req)
- }
-
- async fn gcs_get_object(
- &self,
- path: &str,
- range: BytesRange,
- ) -> Result<Response<IncomingAsyncBody>> {
- let mut req = self.gcs_get_object_request(path, range)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- pub fn gcs_insert_object_request(
- &self,
- path: &str,
- size: Option<usize>,
- content_type: Option<&str>,
- body: AsyncBody,
- ) -> Result<Request<AsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}",
- self.endpoint,
- self.bucket,
- percent_encode_path(&p)
- );
-
- let mut req = Request::post(&url);
-
- if let Some(size) = size {
- req = req.header(CONTENT_LENGTH, size)
- }
-
- if let Some(mime) = content_type {
- req = req.header(CONTENT_TYPE, mime)
- }
-
- // Set body
- let req = req.body(body).map_err(new_request_build_error)?;
-
- Ok(req)
- }
-
- async fn gcs_get_object_metadata(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/storage/v1/b/{}/o/{}",
- self.endpoint,
- self.bucket,
- percent_encode_path(&p)
- );
-
- let req = Request::get(&url);
-
- let mut req = req
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- async fn gcs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let url = format!(
- "{}/storage/v1/b/{}/o/{}",
- self.endpoint,
- self.bucket,
- percent_encode_path(&p)
- );
-
- let mut req = Request::delete(&url)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-
- pub(crate) async fn gcs_list_objects(
- &self,
- path: &str,
- page_token: &str,
- delimiter: &str,
- limit: Option<usize>,
- ) -> Result<Response<IncomingAsyncBody>> {
- let p = build_abs_path(&self.root, path);
-
- let mut url = format!(
- "{}/storage/v1/b/{}/o?prefix={}",
- self.endpoint,
- self.bucket,
- percent_encode_path(&p)
- );
- if !delimiter.is_empty() {
- write!(url, "&delimiter={delimiter}").expect("write into string
must succeed");
- }
- if let Some(limit) = limit {
- write!(url, "&maxResults={limit}").expect("write into string must
succeed");
- }
- if !page_token.is_empty() {
- // NOTE:
- //
- // GCS uses pageToken in request and nextPageToken in response
- //
- // Don't know how will those tokens be like so this part are copied
- // directly from AWS S3 service.
- write!(url, "&pageToken={}", percent_encode_path(page_token))
- .expect("write into string must succeed");
- }
-
- let mut req = Request::get(&url)
- .body(AsyncBody::Empty)
- .map_err(new_request_build_error)?;
-
- self.signer.sign(&mut req).map_err(new_request_sign_error)?;
-
- self.client.send(req).await
- }
-}
-
/// The raw json response returned by
[`get`](https://cloud.google.com/storage/docs/json_api/v1/objects/get)
#[derive(Debug, Default, Deserialize)]
#[serde(default, rename_all = "camelCase")]
diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs
new file mode 100644
index 00000000..84526983
--- /dev/null
+++ b/core/src/services/gcs/core.rs
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+
+use http::header::CONTENT_LENGTH;
+use http::header::CONTENT_TYPE;
+use http::Request;
+use http::Response;
+use reqsign_0_9::GoogleCredentialLoader;
+use reqsign_0_9::GoogleSigner;
+use reqsign_0_9::GoogleToken;
+use reqsign_0_9::GoogleTokenLoader;
+
+use super::uri::percent_encode_path;
+use crate::raw::*;
+use crate::*;
+
+pub struct GcsCore {
+ pub endpoint: String,
+ pub bucket: String,
+ pub root: String,
+
+ pub client: HttpClient,
+ pub signer: GoogleSigner,
+ pub token_loader: GoogleTokenLoader,
+ pub credential_loader: GoogleCredentialLoader,
+}
+
+impl Debug for GcsCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut de = f.debug_struct("Backend");
+ de.field("endpoint", &self.endpoint)
+ .field("bucket", &self.bucket)
+ .field("root", &self.root)
+ .finish_non_exhaustive()
+ }
+}
+
+impl GcsCore {
+ async fn load_token(&self) -> Result<GoogleToken> {
+ let cred = self
+ .token_loader
+ .load()
+ .await
+ .map_err(new_request_credential_error)?;
+
+ if let Some(cred) = cred {
+ Ok(cred)
+ } else {
+ Err(Error::new(
+ ErrorKind::ConfigInvalid,
+ "no valid credential found",
+ ))
+ }
+ }
+
+ pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
+ let cred = self.load_token().await?;
+
+ self.signer.sign(req, &cred).map_err(new_request_sign_error)
+ }
+
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+}
+
+impl GcsCore {
+ pub fn gcs_get_object_request(
+ &self,
+ path: &str,
+ range: BytesRange,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/storage/v1/b/{}/o/{}?alt=media",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::get(&url);
+
+ if !range.is_full() {
+ req = req.header(http::header::RANGE, range.to_header());
+ }
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn gcs_get_object(
+ &self,
+ path: &str,
+ range: BytesRange,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let mut req = self.gcs_get_object_request(path, range)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub fn gcs_insert_object_request(
+ &self,
+ path: &str,
+ size: Option<usize>,
+ content_type: Option<&str>,
+ body: AsyncBody,
+ ) -> Result<Request<AsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::post(&url);
+
+ if let Some(size) = size {
+ req = req.header(CONTENT_LENGTH, size)
+ }
+
+ if let Some(mime) = content_type {
+ req = req.header(CONTENT_TYPE, mime)
+ }
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ Ok(req)
+ }
+
+ pub async fn gcs_get_object_metadata(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/storage/v1/b/{}/o/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::get(&url);
+
+ let mut req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn gcs_delete_object(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/storage/v1/b/{}/o/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::delete(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+
+ pub async fn gcs_copy_object(
+ &self,
+ from: &str,
+ to: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let source = build_abs_path(&self.root, from);
+ let dest = build_abs_path(&self.root, to);
+
+ let req_uri = format!(
+ "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&source),
+ self.bucket,
+ percent_encode_path(&dest)
+ );
+
+ let mut req = Request::post(req_uri)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+ self.send(req).await
+ }
+
+ pub async fn gcs_list_objects(
+ &self,
+ path: &str,
+ page_token: &str,
+ delimiter: &str,
+ limit: Option<usize>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let mut url = format!(
+ "{}/storage/v1/b/{}/o?prefix={}",
+ self.endpoint,
+ self.bucket,
+ percent_encode_path(&p)
+ );
+ if !delimiter.is_empty() {
+ write!(url, "&delimiter={delimiter}").expect("write into string
must succeed");
+ }
+ if let Some(limit) = limit {
+ write!(url, "&maxResults={limit}").expect("write into string must
succeed");
+ }
+ if !page_token.is_empty() {
+ // NOTE:
+ //
+ // GCS uses pageToken in request and nextPageToken in response
+ //
+ // Don't know how will those tokens be like so this part are copied
+ // directly from AWS S3 service.
+ write!(url, "&pageToken={}", percent_encode_path(page_token))
+ .expect("write into string must succeed");
+ }
+
+ let mut req = Request::get(&url)
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.sign(&mut req).await?;
+
+ self.send(req).await
+ }
+}
diff --git a/core/src/services/gcs/mod.rs b/core/src/services/gcs/mod.rs
index 5aedb812..6f95267c 100644
--- a/core/src/services/gcs/mod.rs
+++ b/core/src/services/gcs/mod.rs
@@ -18,6 +18,7 @@
mod backend;
pub use backend::GcsBuilder as Gcs;
+mod core;
mod error;
mod pager;
mod uri;
diff --git a/core/src/services/gcs/pager.rs b/core/src/services/gcs/pager.rs
index 7977935f..6cb66aac 100644
--- a/core/src/services/gcs/pager.rs
+++ b/core/src/services/gcs/pager.rs
@@ -23,7 +23,7 @@ use serde_json;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
-use super::backend::GcsBackend;
+use super::core::GcsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
@@ -31,8 +31,8 @@ use crate::*;
/// GcsPager takes over task of listing objects and
/// helps walking directory
pub struct GcsPager {
- backend: Arc<GcsBackend>,
- root: String,
+ core: Arc<GcsCore>,
+
path: String,
delimiter: String,
limit: Option<usize>,
@@ -43,16 +43,10 @@ pub struct GcsPager {
impl GcsPager {
/// Generate a new directory walker
- pub fn new(
- backend: Arc<GcsBackend>,
- root: &str,
- path: &str,
- delimiter: &str,
- limit: Option<usize>,
- ) -> Self {
+ pub fn new(core: Arc<GcsCore>, path: &str, delimiter: &str, limit:
Option<usize>) -> Self {
Self {
- backend,
- root: root.to_string(),
+ core,
+
path: path.to_string(),
delimiter: delimiter.to_string(),
limit,
@@ -71,7 +65,7 @@ impl oio::Page for GcsPager {
}
let resp = self
- .backend
+ .core
.gcs_list_objects(&self.path, &self.page_token, &self.delimiter,
self.limit)
.await?;
@@ -93,7 +87,7 @@ impl oio::Page for GcsPager {
for prefix in output.prefixes {
let de = oio::Entry::new(
- &build_rel_path(&self.root, &prefix),
+ &build_rel_path(&self.core.root, &prefix),
Metadata::new(EntryMode::DIR),
);
@@ -124,7 +118,7 @@ impl oio::Page for GcsPager {
})?;
meta.set_last_modified(dt);
- let de = oio::Entry::new(&build_rel_path(&self.root,
&object.name), meta);
+ let de = oio::Entry::new(&build_rel_path(&self.core.root,
&object.name), meta);
entries.push(de);
}
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 1dda8d9b..563152dc 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -15,45 +15,44 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
-use super::backend::GcsBackend;
+use super::core::GcsCore;
use super::error::parse_error;
use crate::ops::OpWrite;
use crate::raw::*;
use crate::*;
pub struct GcsWriter {
- backend: GcsBackend,
+ core: Arc<GcsCore>,
op: OpWrite,
path: String,
}
impl GcsWriter {
- pub fn new(backend: GcsBackend, op: OpWrite, path: String) -> Self {
- GcsWriter { backend, op, path }
+ pub fn new(core: Arc<GcsCore>, op: OpWrite, path: String) -> Self {
+ GcsWriter { core, op, path }
}
}
#[async_trait]
impl oio::Write for GcsWriter {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- let mut req = self.backend.gcs_insert_object_request(
- &self.path,
+ let mut req = self.core.gcs_insert_object_request(
+ &percent_encode_path(&self.path),
Some(bs.len()),
self.op.content_type(),
AsyncBody::Bytes(bs),
)?;
- self.backend
- .signer
- .sign(&mut req)
- .map_err(new_request_sign_error)?;
+ self.core.sign(&mut req).await?;
- let resp = self.backend.client.send(req).await?;
+ let resp = self.core.send(req).await?;
let status = resp.status();