This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-gcs in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 6d868ba717a72bbf5cbc2e8fe41fe92611f4c0eb Author: Xuanwo <[email protected]> AuthorDate: Tue Apr 11 17:05:12 2023 +0800 refactor(services/gcs): Migrate to async reqsign Signed-off-by: Xuanwo <[email protected]> --- bindings/java/src/lib.rs | 3 +- core/src/services/gcs/backend.rs | 295 ++++++++------------------------------- core/src/services/gcs/core.rs | 262 ++++++++++++++++++++++++++++++++++ core/src/services/gcs/mod.rs | 1 + core/src/services/gcs/pager.rs | 24 ++-- core/src/services/gcs/writer.rs | 19 ++- 6 files changed, 338 insertions(+), 266 deletions(-) diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs index 88d10132..487384f7 100644 --- a/bindings/java/src/lib.rs +++ b/bindings/java/src/lib.rs @@ -22,7 +22,8 @@ use jni::objects::JClass; use jni::objects::JMap; use jni::objects::JObject; use jni::objects::JString; -use jni::sys::{jboolean, jlong}; +use jni::sys::jboolean; +use jni::sys::jlong; use jni::JNIEnv; use opendal::BlockingOperator; use opendal::Operator; diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index eda03ae6..8a2e8fc2 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -18,25 +18,22 @@ use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; -use std::fmt::Write; use std::sync::Arc; use async_trait::async_trait; -use http::header::CONTENT_LENGTH; -use http::header::CONTENT_TYPE; -use http::Request; -use http::Response; use http::StatusCode; use log::debug; -use reqsign::GoogleSigner; +use reqsign_0_9::GoogleCredentialLoader; +use reqsign_0_9::GoogleSigner; +use reqsign_0_9::GoogleTokenLoader; use serde::Deserialize; use serde_json; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; +use super::core::GcsCore; use super::error::parse_error; use super::pager::GcsPager; -use super::uri::percent_encode_path; use super::writer::GcsWriter; use crate::ops::*; use crate::raw::*; @@ -275,45 +272,42 @@ impl Builder for GcsBuilder { .endpoint .clone() .unwrap_or_else(|| DEFAULT_GCS_ENDPOINT.to_string()); - debug!("backend use endpoint: {endpoint}"); - let signer = if let Some(signer) = &self.signer { - signer.clone() + let mut cred_loader = GoogleCredentialLoader::default(); + if let Some(cred) = &self.credential { + cred_loader = cred_loader.with_content(cred); + } + if let Some(cred) = &self.credential_path { + cred_loader = cred_loader.with_path(cred); + } + + let scope = if let Some(scope) = &self.scope { + scope } else { - // build signer - let mut signer_builder = GoogleSigner::builder(); - if let Some(scope) = &self.scope { - signer_builder.scope(scope); - } else { - signer_builder.scope(DEFAULT_GCS_SCOPE); - } - if let Some(account) = &self.service_account { - signer_builder.service_account(account); - } - if let Some(cred) = &self.credential { - signer_builder.credential_content(cred); - } - if let Some(cred) = &self.credential_path { - signer_builder.credential_path(cred); - } - let signer = signer_builder.build().map_err(|e| { - Error::new(ErrorKind::ConfigInvalid, "build GoogleSigner") - .with_operation("Builder::build") - .with_context("service", Scheme::Gcs) - .with_context("bucket", bucket) - .with_context("endpoint", &endpoint) - .set_source(e) - })?; - Arc::new(signer) + DEFAULT_GCS_SCOPE }; + let mut token_loader = GoogleTokenLoader::new(scope, client.client()); + if let Some(account) = &self.service_account { + token_loader = token_loader.with_service_account(account); + } + if let Ok(Some(cred)) = cred_loader.load() { + token_loader = token_loader.with_credentials(cred) + } + + let signer = GoogleSigner::new("storage"); + let backend = GcsBackend { - root, - endpoint, - bucket: bucket.clone(), - signer, - client, + core: Arc::new(GcsCore { + endpoint, + bucket: bucket.to_string(), + root, + client, + signer, + token_loader, + credential_loader: cred_loader, + }), }; Ok(backend) @@ -321,27 +315,9 @@ impl Builder for GcsBuilder { } /// GCS storage backend -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct GcsBackend { - endpoint: String, - bucket: String, - // root should end with "/" - root: String, - - pub client: HttpClient, - pub signer: Arc<GoogleSigner>, -} - -impl Debug for GcsBackend { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut de = f.debug_struct("Backend"); - de.field("endpoint", &self.endpoint) - .field("bucket", &self.bucket) - .field("root", &self.root) - .field("client", &self.client) - .field("signer", &"<redacted>") - .finish() - } + core: Arc<GcsCore>, } #[async_trait] @@ -359,19 +335,21 @@ impl Accessor for GcsBackend { let mut am = AccessorInfo::default(); am.set_scheme(Scheme::Gcs) - .set_root(&self.root) - .set_name(&self.bucket) + .set_root(&self.core.root) + .set_name(&self.core.bucket) .set_capabilities(Read | Write | List | Scan | Copy) .set_hints(ReadStreamable); am } async fn create(&self, path: &str, _: OpCreate) -> Result<RpCreate> { - let mut req = self.gcs_insert_object_request(path, Some(0), None, AsyncBody::Empty)?; + let mut req = self + .core + .gcs_insert_object_request(path, Some(0), None, AsyncBody::Empty)?; - self.signer.sign(&mut req).map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.client.send(req).await?; + let resp = self.core.send(req).await?; if resp.status().is_success() { resp.into_body().consume().await?; @@ -382,7 +360,7 @@ impl Accessor for GcsBackend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let resp = self.gcs_get_object(path, args.range()).await?; + let resp = self.core.gcs_get_object(path, args.range()).await?; if resp.status().is_success() { let meta = parse_into_metadata(path, resp.headers())?; @@ -402,32 +380,19 @@ impl Accessor for GcsBackend { Ok(( RpWrite::default(), - GcsWriter::new(self.clone(), args, path.to_string()), + GcsWriter::new(self.core.clone(), args, path.to_string()), )) } async fn copy(&self, from: &str, to: &str, _: OpCopy) -> Result<RpCopy> { - let source = percent_encode_path(&build_abs_path(&self.root, from.trim_end_matches('/'))); - let dest = percent_encode_path(&build_abs_path(&self.root, to.trim_end_matches('/'))); - let req_uri = format!( - "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}", - self.endpoint, self.bucket, source, self.bucket, dest - ); + let resp = self.core.gcs_copy_object(from, to).await?; - let mut req = Request::post(req_uri) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - let resp = self.client.send(req).await?; - - if !resp.status().is_success() { - return Err(parse_error(resp).await?); + if resp.status().is_success() { + resp.into_body().consume().await?; + Ok(RpCopy::default()) + } else { + Err(parse_error(resp).await?) } - resp.into_body().consume().await?; - - Ok(RpCopy::default()) } async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { @@ -436,7 +401,7 @@ impl Accessor for GcsBackend { return Ok(RpStat::new(Metadata::new(EntryMode::DIR))); } - let resp = self.gcs_get_object_metadata(path).await?; + let resp = self.core.gcs_get_object_metadata(path).await?; if resp.status().is_success() { // read http response body @@ -477,7 +442,7 @@ impl Accessor for GcsBackend { } async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> { - let resp = self.gcs_delete_object(path).await?; + let resp = self.core.gcs_delete_object(path).await?; // deleting not existing objects is ok if resp.status().is_success() || resp.status() == StatusCode::NOT_FOUND { @@ -490,168 +455,18 @@ impl Accessor for GcsBackend { async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { Ok(( RpList::default(), - GcsPager::new(Arc::new(self.clone()), &self.root, path, "/", args.limit()), + GcsPager::new(self.core.clone(), path, "/", args.limit()), )) } async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { Ok(( RpScan::default(), - GcsPager::new(Arc::new(self.clone()), &self.root, path, "", args.limit()), + GcsPager::new(self.core.clone(), path, "", args.limit()), )) } } -impl GcsBackend { - fn gcs_get_object_request(&self, path: &str, range: BytesRange) -> Result<Request<AsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/storage/v1/b/{}/o/{}?alt=media", - self.endpoint, - self.bucket, - percent_encode_path(&p) - ); - - let mut req = Request::get(&url); - - if !range.is_full() { - req = req.header(http::header::RANGE, range.to_header()); - } - - let req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - Ok(req) - } - - async fn gcs_get_object( - &self, - path: &str, - range: BytesRange, - ) -> Result<Response<IncomingAsyncBody>> { - let mut req = self.gcs_get_object_request(path, range)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub fn gcs_insert_object_request( - &self, - path: &str, - size: Option<usize>, - content_type: Option<&str>, - body: AsyncBody, - ) -> Result<Request<AsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}", - self.endpoint, - self.bucket, - percent_encode_path(&p) - ); - - let mut req = Request::post(&url); - - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size) - } - - if let Some(mime) = content_type { - req = req.header(CONTENT_TYPE, mime) - } - - // Set body - let req = req.body(body).map_err(new_request_build_error)?; - - Ok(req) - } - - async fn gcs_get_object_metadata(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/storage/v1/b/{}/o/{}", - self.endpoint, - self.bucket, - percent_encode_path(&p) - ); - - let req = Request::get(&url); - - let mut req = req - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - async fn gcs_delete_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let url = format!( - "{}/storage/v1/b/{}/o/{}", - self.endpoint, - self.bucket, - percent_encode_path(&p) - ); - - let mut req = Request::delete(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } - - pub(crate) async fn gcs_list_objects( - &self, - path: &str, - page_token: &str, - delimiter: &str, - limit: Option<usize>, - ) -> Result<Response<IncomingAsyncBody>> { - let p = build_abs_path(&self.root, path); - - let mut url = format!( - "{}/storage/v1/b/{}/o?prefix={}", - self.endpoint, - self.bucket, - percent_encode_path(&p) - ); - if !delimiter.is_empty() { - write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); - } - if let Some(limit) = limit { - write!(url, "&maxResults={limit}").expect("write into string must succeed"); - } - if !page_token.is_empty() { - // NOTE: - // - // GCS uses pageToken in request and nextPageToken in response - // - // Don't know how will those tokens be like so this part are copied - // directly from AWS S3 service. - write!(url, "&pageToken={}", percent_encode_path(page_token)) - .expect("write into string must succeed"); - } - - let mut req = Request::get(&url) - .body(AsyncBody::Empty) - .map_err(new_request_build_error)?; - - self.signer.sign(&mut req).map_err(new_request_sign_error)?; - - self.client.send(req).await - } -} - /// The raw json response returned by [`get`](https://cloud.google.com/storage/docs/json_api/v1/objects/get) #[derive(Debug, Default, Deserialize)] #[serde(default, rename_all = "camelCase")] diff --git a/core/src/services/gcs/core.rs b/core/src/services/gcs/core.rs new file mode 100644 index 00000000..84526983 --- /dev/null +++ b/core/src/services/gcs/core.rs @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::fmt::Write; + +use http::header::CONTENT_LENGTH; +use http::header::CONTENT_TYPE; +use http::Request; +use http::Response; +use reqsign_0_9::GoogleCredentialLoader; +use reqsign_0_9::GoogleSigner; +use reqsign_0_9::GoogleToken; +use reqsign_0_9::GoogleTokenLoader; + +use super::uri::percent_encode_path; +use crate::raw::*; +use crate::*; + +pub struct GcsCore { + pub endpoint: String, + pub bucket: String, + pub root: String, + + pub client: HttpClient, + pub signer: GoogleSigner, + pub token_loader: GoogleTokenLoader, + pub credential_loader: GoogleCredentialLoader, +} + +impl Debug for GcsCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut de = f.debug_struct("Backend"); + de.field("endpoint", &self.endpoint) + .field("bucket", &self.bucket) + .field("root", &self.root) + .finish_non_exhaustive() + } +} + +impl GcsCore { + async fn load_token(&self) -> Result<GoogleToken> { + let cred = self + .token_loader + .load() + .await + .map_err(new_request_credential_error)?; + + if let Some(cred) = cred { + Ok(cred) + } else { + Err(Error::new( + ErrorKind::ConfigInvalid, + "no valid credential found", + )) + } + } + + pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> { + let cred = self.load_token().await?; + + self.signer.sign(req, &cred).map_err(new_request_sign_error) + } + + #[inline] + pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> { + self.client.send(req).await + } +} + +impl GcsCore { + pub fn gcs_get_object_request( + &self, + path: &str, + range: BytesRange, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/storage/v1/b/{}/o/{}?alt=media", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::get(&url); + + if !range.is_full() { + req = req.header(http::header::RANGE, range.to_header()); + } + + let req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn gcs_get_object( + &self, + path: &str, + range: BytesRange, + ) -> Result<Response<IncomingAsyncBody>> { + let mut req = self.gcs_get_object_request(path, range)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub fn gcs_insert_object_request( + &self, + path: &str, + size: Option<usize>, + content_type: Option<&str>, + body: AsyncBody, + ) -> Result<Request<AsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/upload/storage/v1/b/{}/o?uploadType=media&name={}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::post(&url); + + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size) + } + + if let Some(mime) = content_type { + req = req.header(CONTENT_TYPE, mime) + } + + // Set body + let req = req.body(body).map_err(new_request_build_error)?; + + Ok(req) + } + + pub async fn gcs_get_object_metadata(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/storage/v1/b/{}/o/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + let req = Request::get(&url); + + let mut req = req + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn gcs_delete_object(&self, path: &str) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let url = format!( + "{}/storage/v1/b/{}/o/{}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + + let mut req = Request::delete(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } + + pub async fn gcs_copy_object( + &self, + from: &str, + to: &str, + ) -> Result<Response<IncomingAsyncBody>> { + let source = build_abs_path(&self.root, from); + let dest = build_abs_path(&self.root, to); + + let req_uri = format!( + "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}", + self.endpoint, + self.bucket, + percent_encode_path(&source), + self.bucket, + percent_encode_path(&dest) + ); + + let mut req = Request::post(req_uri) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + self.send(req).await + } + + pub async fn gcs_list_objects( + &self, + path: &str, + page_token: &str, + delimiter: &str, + limit: Option<usize>, + ) -> Result<Response<IncomingAsyncBody>> { + let p = build_abs_path(&self.root, path); + + let mut url = format!( + "{}/storage/v1/b/{}/o?prefix={}", + self.endpoint, + self.bucket, + percent_encode_path(&p) + ); + if !delimiter.is_empty() { + write!(url, "&delimiter={delimiter}").expect("write into string must succeed"); + } + if let Some(limit) = limit { + write!(url, "&maxResults={limit}").expect("write into string must succeed"); + } + if !page_token.is_empty() { + // NOTE: + // + // GCS uses pageToken in request and nextPageToken in response + // + // Don't know how will those tokens be like so this part are copied + // directly from AWS S3 service. + write!(url, "&pageToken={}", percent_encode_path(page_token)) + .expect("write into string must succeed"); + } + + let mut req = Request::get(&url) + .body(AsyncBody::Empty) + .map_err(new_request_build_error)?; + + self.sign(&mut req).await?; + + self.send(req).await + } +} diff --git a/core/src/services/gcs/mod.rs b/core/src/services/gcs/mod.rs index 5aedb812..6f95267c 100644 --- a/core/src/services/gcs/mod.rs +++ b/core/src/services/gcs/mod.rs @@ -18,6 +18,7 @@ mod backend; pub use backend::GcsBuilder as Gcs; +mod core; mod error; mod pager; mod uri; diff --git a/core/src/services/gcs/pager.rs b/core/src/services/gcs/pager.rs index 7977935f..6cb66aac 100644 --- a/core/src/services/gcs/pager.rs +++ b/core/src/services/gcs/pager.rs @@ -23,7 +23,7 @@ use serde_json; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime; -use super::backend::GcsBackend; +use super::core::GcsCore; use super::error::parse_error; use crate::raw::*; use crate::*; @@ -31,8 +31,8 @@ use crate::*; /// GcsPager takes over task of listing objects and /// helps walking directory pub struct GcsPager { - backend: Arc<GcsBackend>, - root: String, + core: Arc<GcsCore>, + path: String, delimiter: String, limit: Option<usize>, @@ -43,16 +43,10 @@ pub struct GcsPager { impl GcsPager { /// Generate a new directory walker - pub fn new( - backend: Arc<GcsBackend>, - root: &str, - path: &str, - delimiter: &str, - limit: Option<usize>, - ) -> Self { + pub fn new(core: Arc<GcsCore>, path: &str, delimiter: &str, limit: Option<usize>) -> Self { Self { - backend, - root: root.to_string(), + core, + path: path.to_string(), delimiter: delimiter.to_string(), limit, @@ -71,7 +65,7 @@ impl oio::Page for GcsPager { } let resp = self - .backend + .core .gcs_list_objects(&self.path, &self.page_token, &self.delimiter, self.limit) .await?; @@ -93,7 +87,7 @@ impl oio::Page for GcsPager { for prefix in output.prefixes { let de = oio::Entry::new( - &build_rel_path(&self.root, &prefix), + &build_rel_path(&self.core.root, &prefix), Metadata::new(EntryMode::DIR), ); @@ -124,7 +118,7 @@ impl oio::Page for GcsPager { })?; meta.set_last_modified(dt); - let de = oio::Entry::new(&build_rel_path(&self.root, &object.name), meta); + let de = oio::Entry::new(&build_rel_path(&self.core.root, &object.name), meta); entries.push(de); } diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 1dda8d9b..256f7512 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -15,45 +15,44 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use async_trait::async_trait; use bytes::Bytes; use http::StatusCode; -use super::backend::GcsBackend; +use super::core::GcsCore; use super::error::parse_error; use crate::ops::OpWrite; use crate::raw::*; use crate::*; pub struct GcsWriter { - backend: GcsBackend, + core: Arc<GcsCore>, op: OpWrite, path: String, } impl GcsWriter { - pub fn new(backend: GcsBackend, op: OpWrite, path: String) -> Self { - GcsWriter { backend, op, path } + pub fn new(core: Arc<GcsCore>, op: OpWrite, path: String) -> Self { + GcsWriter { core, op, path } } } #[async_trait] impl oio::Write for GcsWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - let mut req = self.backend.gcs_insert_object_request( + let mut req = self.core.gcs_insert_object_request( &self.path, Some(bs.len()), self.op.content_type(), AsyncBody::Bytes(bs), )?; - self.backend - .signer - .sign(&mut req) - .map_err(new_request_sign_error)?; + self.core.sign(&mut req).await?; - let resp = self.backend.client.send(req).await?; + let resp = self.core.send(req).await?; let status = resp.status();
