This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 5a0baf1aeee Add GCS signed URL support (#5300)
5a0baf1aeee is described below
commit 5a0baf1aeeead4c461f1c8d14bfc65c85cfff78e
Author: Yu Zeng <[email protected]>
AuthorDate: Thu Apr 4 23:23:18 2024 +0800
Add GCS signed URL support (#5300)
* add util function for gcp sign url
* add string to sign and other sign functions
* add GoogleCloudStorageConfig::new and config and move functions to client
* add more code and rearrange struct
* add client_email for credential and return the signed url
* clean some code
* add client email for AuthorizedUserCredentials
* tidy some code
* format doc
* Add GcpSigningCredentialProvider for getting email
* add test
* Move some functions which shared by aws and gcp to utils.
* fix some bug and make it can get proper result
* remoe useless code
* tidy some code
* do not export host
* add sign_by_key
* Cleanup
* Add ServiceAccountKey
* Further tweaks
* add more scope for signing.
* tidy
* Tweak and add test
* Retry and handle errors for signBlob
---------
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
object_store/src/aws/credential.rs | 19 +-
object_store/src/aws/mod.rs | 11 +-
object_store/src/gcp/builder.rs | 55 ++++-
object_store/src/gcp/client.rs | 105 ++++++++-
object_store/src/gcp/credential.rs | 465 +++++++++++++++++++++++++++++++++----
object_store/src/gcp/mod.rs | 74 +++++-
object_store/src/util.rs | 29 +++
7 files changed, 677 insertions(+), 81 deletions(-)
diff --git a/object_store/src/aws/credential.rs
b/object_store/src/aws/credential.rs
index dd7fa5b41da..478e56dd09c 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -19,7 +19,7 @@ use crate::aws::{AwsCredentialProvider, STORE,
STRICT_ENCODE_SET, STRICT_PATH_EN
use crate::client::retry::RetryExt;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::client::TokenProvider;
-use crate::util::hmac_sha256;
+use crate::util::{hex_digest, hex_encode, hmac_sha256};
use crate::{CredentialProvider, Result, RetryConfig};
use async_trait::async_trait;
use bytes::Buf;
@@ -342,23 +342,6 @@ impl CredentialExt for RequestBuilder {
}
}
-/// Computes the SHA256 digest of `body` returned as a hex encoded string
-fn hex_digest(bytes: &[u8]) -> String {
- let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
- hex_encode(digest.as_ref())
-}
-
-/// Returns `bytes` as a lower-case hex encoded string
-fn hex_encode(bytes: &[u8]) -> String {
- use std::fmt::Write;
- let mut out = String::with_capacity(bytes.len() * 2);
- for byte in bytes {
- // String writing is infallible
- let _ = write!(out, "{byte:02x}");
- }
- out
-}
-
/// Canonicalizes query parameters into the AWS canonical form
///
///
<https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b33771de9a8..76d01d59704 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -43,6 +43,7 @@ use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
+use crate::util::STRICT_ENCODE_SET;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta,
ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
@@ -64,16 +65,6 @@ pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
pub use resolve::resolve_bucket_region;
-//
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
-//
-// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
-// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~
).
-pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
percent_encoding::NON_ALPHANUMERIC
- .remove(b'-')
- .remove(b'.')
- .remove(b'_')
- .remove(b'~');
-
/// This struct is used to maintain the URI path encoding
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet =
STRICT_ENCODE_SET.remove(b'/');
diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs
index 2cf75040b85..4fa91677aa7 100644
--- a/object_store/src/gcp/builder.rs
+++ b/object_store/src/gcp/builder.rs
@@ -21,7 +21,10 @@ use crate::gcp::credential::{
ApplicationDefaultCredentials, InstanceCredentialProvider,
ServiceAccountCredentials,
DEFAULT_GCS_BASE_URL,
};
-use crate::gcp::{credential, GcpCredential, GcpCredentialProvider,
GoogleCloudStorage, STORE};
+use crate::gcp::{
+ credential, GcpCredential, GcpCredentialProvider, GcpSigningCredential,
+ GcpSigningCredentialProvider, GoogleCloudStorage, STORE,
+};
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig,
StaticCredentialProvider};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
@@ -29,6 +32,8 @@ use std::str::FromStr;
use std::sync::Arc;
use url::Url;
+use super::credential::{AuthorizedUserSigningCredentials,
InstanceSigningCredentialProvider};
+
#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("Missing bucket name"))]
@@ -107,6 +112,8 @@ pub struct GoogleCloudStorageBuilder {
client_options: ClientOptions,
/// Credentials
credentials: Option<GcpCredentialProvider>,
+ /// Credentials for sign url
+ signing_cedentials: Option<GcpSigningCredentialProvider>,
}
/// Configuration keys for [`GoogleCloudStorageBuilder`]
@@ -202,6 +209,7 @@ impl Default for GoogleCloudStorageBuilder {
client_options: ClientOptions::new().with_allow_http(true),
url: None,
credentials: None,
+ signing_cedentials: None,
}
}
}
@@ -452,13 +460,13 @@ impl GoogleCloudStorageBuilder {
Arc::new(StaticCredentialProvider::new(GcpCredential {
bearer: "".to_string(),
})) as _
- } else if let Some(credentials) = service_account_credentials {
+ } else if let Some(credentials) = service_account_credentials.clone() {
Arc::new(TokenCredentialProvider::new(
credentials.token_provider()?,
self.client_options.client()?,
self.retry_config.clone(),
)) as _
- } else if let Some(credentials) = application_default_credentials {
+ } else if let Some(credentials) =
application_default_credentials.clone() {
match credentials {
ApplicationDefaultCredentials::AuthorizedUser(token) => {
Arc::new(TokenCredentialProvider::new(
@@ -483,13 +491,44 @@ impl GoogleCloudStorageBuilder {
)) as _
};
- let config = GoogleCloudStorageConfig {
- base_url: gcs_base_url,
+ let signing_credentials = if let Some(signing_credentials) =
self.signing_cedentials {
+ signing_credentials
+ } else if disable_oauth {
+ Arc::new(StaticCredentialProvider::new(GcpSigningCredential {
+ email: "".to_string(),
+ private_key: None,
+ })) as _
+ } else if let Some(credentials) = service_account_credentials.clone() {
+ credentials.signing_credentials()?
+ } else if let Some(credentials) =
application_default_credentials.clone() {
+ match credentials {
+ ApplicationDefaultCredentials::AuthorizedUser(token) => {
+ Arc::new(TokenCredentialProvider::new(
+ AuthorizedUserSigningCredentials::from(token)?,
+ self.client_options.client()?,
+ self.retry_config.clone(),
+ )) as _
+ }
+ ApplicationDefaultCredentials::ServiceAccount(token) => {
+ token.signing_credentials()?
+ }
+ }
+ } else {
+ Arc::new(TokenCredentialProvider::new(
+ InstanceSigningCredentialProvider::default(),
+ self.client_options.metadata_client()?,
+ self.retry_config.clone(),
+ )) as _
+ };
+
+ let config = GoogleCloudStorageConfig::new(
+ gcs_base_url,
credentials,
+ signing_credentials,
bucket_name,
- retry_config: self.retry_config,
- client_options: self.client_options,
- };
+ self.retry_config,
+ self.client_options,
+ );
Ok(GoogleCloudStorage {
client: Arc::new(GoogleCloudStorageClient::new(config)?),
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index def53beefe7..901257f917c 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -24,19 +24,22 @@ use crate::client::s3::{
ListResponse,
};
use crate::client::GetOptionsExt;
-use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
+use crate::gcp::{GcpCredential, GcpCredentialProvider,
GcpSigningCredentialProvider, STORE};
use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
+use crate::util::hex_encode;
use crate::{
ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions,
PutResult, Result,
RetryConfig,
};
use async_trait::async_trait;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
use bytes::{Buf, Bytes};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::header::HeaderName;
use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
@@ -101,6 +104,15 @@ enum Error {
#[snafu(display("Got invalid multipart response: {}", source))]
InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+ #[snafu(display("Error signing blob: {}", source))]
+ SignBlobRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Got invalid signing blob repsonse: {}", source))]
+ InvalidSignBlobResponse { source: reqwest::Error },
+
+ #[snafu(display("Got invalid signing blob signature: {}", source))]
+ InvalidSignBlobSignature { source: base64::DecodeError },
}
impl From<Error> for crate::Error {
@@ -123,6 +135,8 @@ pub struct GoogleCloudStorageConfig {
pub credentials: GcpCredentialProvider,
+ pub signing_credentials: GcpSigningCredentialProvider,
+
pub bucket_name: String,
pub retry_config: RetryConfig,
@@ -130,6 +144,30 @@ pub struct GoogleCloudStorageConfig {
pub client_options: ClientOptions,
}
+impl GoogleCloudStorageConfig {
+ pub fn new(
+ base_url: String,
+ credentials: GcpCredentialProvider,
+ signing_credentials: GcpSigningCredentialProvider,
+ bucket_name: String,
+ retry_config: RetryConfig,
+ client_options: ClientOptions,
+ ) -> Self {
+ Self {
+ base_url,
+ credentials,
+ signing_credentials,
+ bucket_name,
+ retry_config,
+ client_options,
+ }
+ }
+
+ pub fn path_url(&self, path: &Path) -> String {
+ format!("{}/{}/{}", self.base_url, self.bucket_name, path)
+ }
+}
+
/// A builder for a put request allowing customisation of the headers and
query string
pub struct PutRequest<'a> {
path: &'a Path,
@@ -163,6 +201,21 @@ impl<'a> PutRequest<'a> {
}
}
+/// Sign Blob Request Body
+#[derive(Debug, Serialize)]
+struct SignBlobBody {
+ /// The payload to sign
+ payload: String,
+}
+
+/// Sign Blob Response
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SignBlobResponse {
+ /// The signature for the payload
+ signed_blob: String,
+}
+
#[derive(Debug)]
pub struct GoogleCloudStorageClient {
config: GoogleCloudStorageConfig,
@@ -197,6 +250,54 @@ impl GoogleCloudStorageClient {
self.config.credentials.get_credential().await
}
+ /// Create a signature from a string-to-sign using Google Cloud signBlob
method.
+ /// form like:
+ /// ```plaintext
+ /// curl -X POST --data-binary @JSON_FILE_NAME \
+ /// -H "Authorization: Bearer OAUTH2_TOKEN" \
+ /// -H "Content-Type: application/json" \
+ ///
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:signBlob"
+ /// ```
+ ///
+ /// 'JSON_FILE_NAME' is a file containing the following JSON object:
+ /// ```plaintext
+ /// {
+ /// "payload": "REQUEST_INFORMATION"
+ /// }
+ /// ```
+ pub async fn sign_blob(&self, string_to_sign: &str, client_email: &str) ->
Result<String> {
+ let credential = self.get_credential().await?;
+ let body = SignBlobBody {
+ payload: BASE64_STANDARD.encode(string_to_sign),
+ };
+
+ let url = format!(
+
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob",
+ client_email
+ );
+
+ let response = self
+ .client
+ .post(&url)
+ .bearer_auth(&credential.bearer)
+ .json(&body)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(SignBlobRequestSnafu)?;
+
+ //If successful, the signature is returned in the signedBlob field in
the response.
+ let response = response
+ .json::<SignBlobResponse>()
+ .await
+ .context(InvalidSignBlobResponseSnafu)?;
+
+ let signed_blob = BASE64_STANDARD
+ .decode(response.signed_blob)
+ .context(InvalidSignBlobSignatureSnafu)?;
+
+ Ok(hex_encode(&signed_blob))
+ }
+
pub fn object_url(&self, path: &Path) -> String {
let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
format!(
diff --git a/object_store/src/gcp/credential.rs
b/object_store/src/gcp/credential.rs
index 34cd6eeb6ea..fcd516a1bf1 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -15,19 +15,26 @@
// specific language governing permissions and limitations
// under the License.
+use super::client::GoogleCloudStorageClient;
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
use crate::client::TokenProvider;
-use crate::gcp::STORE;
-use crate::RetryConfig;
+use crate::gcp::{GcpSigningCredentialProvider, STORE};
+use crate::util::{hex_digest, hex_encode, STRICT_ENCODE_SET};
+use crate::{RetryConfig, StaticCredentialProvider};
use async_trait::async_trait;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use base64::Engine;
+use chrono::{DateTime, Utc};
use futures::TryFutureExt;
+use hyper::HeaderMap;
+use itertools::Itertools;
+use percent_encoding::utf8_percent_encode;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
+use std::collections::BTreeMap;
use std::env;
use std::fs::File;
use std::io::BufReader;
@@ -35,11 +42,15 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::info;
+use url::Url;
-pub const DEFAULT_SCOPE: &str =
"https://www.googleapis.com/auth/devstorage.full_control";
+pub const DEFAULT_SCOPE: &str =
"https://www.googleapis.com/auth/cloud-platform";
pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";
+const DEFAULT_GCS_PLAYLOAD_STRING: &str = "UNSIGNED-PAYLOAD";
+const DEFAULT_GCS_SIGN_BLOB_HOST: &str = "storage.googleapis.com";
+
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to open service account file from {}: {}",
path.display(), source))]
@@ -57,7 +68,7 @@ pub enum Error {
#[snafu(display("Invalid RSA key: {}", source), context(false))]
InvalidKey { source: ring::error::KeyRejected },
- #[snafu(display("Error signing jwt: {}", source))]
+ #[snafu(display("Error signing: {}", source))]
Sign { source: ring::error::Unspecified },
#[snafu(display("Error encoding jwt payload: {}", source))]
@@ -82,6 +93,69 @@ impl From<Error> for crate::Error {
}
}
+/// A Google Cloud Storage Credential for signing
+#[derive(Debug)]
+pub struct GcpSigningCredential {
+ /// The email of the service account
+ pub email: String,
+
+ /// An optional RSA private key
+ ///
+ /// If provided this will be used to sign the URL, otherwise a call will
be made to
+ /// [`iam.serviceAccounts.signBlob`]. This allows supporting credential
sources
+ /// that don't expose the service account private key, e.g. [IMDS].
+ ///
+ /// [IMDS]:
https://cloud.google.com/docs/authentication/get-id-token#metadata-server
+ /// [`iam.serviceAccounts.signBlob`]:
https://cloud.google.com/storage/docs/authentication/creating-signatures
+ pub private_key: Option<ServiceAccountKey>,
+}
+
+/// A private RSA key for a service account
+#[derive(Debug)]
+pub struct ServiceAccountKey(RsaKeyPair);
+
+impl ServiceAccountKey {
+ /// Parses a pem-encoded RSA key
+ pub fn from_pem(encoded: &[u8]) -> Result<Self> {
+ use rustls_pemfile::Item;
+ use std::io::Cursor;
+
+ let mut cursor = Cursor::new(encoded);
+ let mut reader = BufReader::new(&mut cursor);
+
+ // Reading from string is infallible
+ match rustls_pemfile::read_one(&mut reader).unwrap() {
+ Some(Item::Pkcs8Key(key)) =>
Self::from_pkcs8(key.secret_pkcs8_der()),
+ Some(Item::Pkcs1Key(key)) =>
Self::from_der(key.secret_pkcs1_der()),
+ _ => Err(Error::MissingKey),
+ }
+ }
+
+ /// Parses an unencrypted PKCS#8-encoded RSA private key.
+ pub fn from_pkcs8(key: &[u8]) -> Result<Self> {
+ Ok(Self(RsaKeyPair::from_pkcs8(key)?))
+ }
+
+ /// Parses an unencrypted PKCS#8-encoded RSA private key.
+ pub fn from_der(key: &[u8]) -> Result<Self> {
+ Ok(Self(RsaKeyPair::from_der(key)?))
+ }
+
+ fn sign(&self, string_to_sign: &str) -> Result<String> {
+ let mut signature = vec![0; self.0.public().modulus_len()];
+ self.0
+ .sign(
+ &ring::signature::RSA_PKCS1_SHA256,
+ &ring::rand::SystemRandom::new(),
+ string_to_sign.as_bytes(),
+ &mut signature,
+ )
+ .context(SignSnafu)?;
+
+ Ok(hex_encode(&signature))
+ }
+}
+
/// A Google Cloud Storage Credential
#[derive(Debug, Eq, PartialEq)]
pub struct GcpCredential {
@@ -152,9 +226,8 @@ struct TokenResponse {
pub struct SelfSignedJwt {
issuer: String,
scope: String,
- key_pair: RsaKeyPair,
- jwt_header: String,
- random: ring::rand::SystemRandom,
+ private_key: ServiceAccountKey,
+ key_id: String,
}
impl SelfSignedJwt {
@@ -162,23 +235,14 @@ impl SelfSignedJwt {
pub fn new(
key_id: String,
issuer: String,
- private_key_pem: String,
+ private_key: ServiceAccountKey,
scope: String,
) -> Result<Self> {
- let key_pair = decode_first_rsa_key(private_key_pem)?;
- let jwt_header = b64_encode_obj(&JwtHeader {
- alg: "RS256",
- typ: Some("JWT"),
- kid: Some(&key_id),
- ..Default::default()
- })?;
-
Ok(Self {
issuer,
- key_pair,
scope,
- jwt_header,
- random: ring::rand::SystemRandom::new(),
+ private_key,
+ key_id,
})
}
}
@@ -204,13 +268,21 @@ impl TokenProvider for SelfSignedJwt {
exp,
};
+ let jwt_header = b64_encode_obj(&JwtHeader {
+ alg: "RS256",
+ typ: Some("JWT"),
+ kid: Some(&self.key_id),
+ ..Default::default()
+ })?;
+
let claim_str = b64_encode_obj(&claims)?;
- let message = [self.jwt_header.as_ref(), claim_str.as_ref()].join(".");
- let mut sig_bytes = vec![0; self.key_pair.public().modulus_len()];
- self.key_pair
+ let message = [jwt_header.as_ref(), claim_str.as_ref()].join(".");
+ let mut sig_bytes = vec![0; self.private_key.0.public().modulus_len()];
+ self.private_key
+ .0
.sign(
&ring::signature::RSA_PKCS1_SHA256,
- &self.random,
+ &ring::rand::SystemRandom::new(),
message.as_bytes(),
&mut sig_bytes,
)
@@ -238,7 +310,7 @@ where
}
/// A deserialized `service-account-********.json`-file.
-#[derive(serde::Deserialize, Debug)]
+#[derive(serde::Deserialize, Debug, Clone)]
pub struct ServiceAccountCredentials {
/// The private key in RSA format.
pub private_key: String,
@@ -281,10 +353,19 @@ impl ServiceAccountCredentials {
Ok(SelfSignedJwt::new(
self.private_key_id,
self.client_email,
- self.private_key,
+ ServiceAccountKey::from_pem(self.private_key.as_bytes())?,
DEFAULT_SCOPE.to_string(),
)?)
}
+
+ pub fn signing_credentials(self) ->
crate::Result<GcpSigningCredentialProvider> {
+ Ok(Arc::new(StaticCredentialProvider::new(
+ GcpSigningCredential {
+ email: self.client_email,
+ private_key:
Some(ServiceAccountKey::from_pem(self.private_key.as_bytes())?),
+ },
+ )))
+ }
}
/// Returns the number of seconds since unix epoch
@@ -295,21 +376,6 @@ fn seconds_since_epoch() -> u64 {
.as_secs()
}
-fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
- use rustls_pemfile::Item;
- use std::io::Cursor;
-
- let mut cursor = Cursor::new(private_key_pem);
- let mut reader = BufReader::new(&mut cursor);
-
- // Reading from string is infallible
- match rustls_pemfile::read_one(&mut reader).unwrap() {
- Some(Item::Pkcs8Key(key)) =>
Ok(RsaKeyPair::from_pkcs8(key.secret_pkcs8_der())?),
- Some(Item::Pkcs1Key(key)) =>
Ok(RsaKeyPair::from_der(key.secret_pkcs1_der())?),
- _ => Err(Error::MissingKey),
- }
-}
-
fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
let string = serde_json::to_string(obj).context(EncodeSnafu)?;
Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
@@ -360,6 +426,7 @@ impl TokenProvider for InstanceCredentialProvider {
let response = make_metadata_request(client, METADATA_HOST, retry)
.or_else(|_| make_metadata_request(client, METADATA_IP, retry))
.await?;
+
let token = TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
@@ -370,12 +437,69 @@ impl TokenProvider for InstanceCredentialProvider {
}
}
+/// Make a request to the metadata server to fetch the client email, using a
given hostname.
+async fn make_metadata_request_for_email(
+ client: &Client,
+ hostname: &str,
+ retry: &RetryConfig,
+) -> crate::Result<String> {
+ let url =
+
format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/email",);
+ let response = client
+ .request(Method::GET, url)
+ .header("Metadata-Flavor", "Google")
+ .send_retry(retry)
+ .await
+ .context(TokenRequestSnafu)?
+ .text()
+ .await
+ .context(TokenResponseBodySnafu)?;
+ Ok(response)
+}
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a
email for signing.
+///
+///
<https://cloud.google.com/appengine/docs/legacy/standard/java/accessing-instance-metadata>
+#[derive(Debug, Default)]
+pub struct InstanceSigningCredentialProvider {}
+
+#[async_trait]
+impl TokenProvider for InstanceSigningCredentialProvider {
+ type Credential = GcpSigningCredential;
+
+ /// Fetch a token from the metadata server.
+ /// Since the connection is local we need to enable http access and don't
actually use the client object passed in.
+ async fn fetch_token(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
+ const METADATA_IP: &str = "169.254.169.254";
+ const METADATA_HOST: &str = "metadata";
+
+ info!("fetching token from metadata server");
+
+ let email = make_metadata_request_for_email(client, METADATA_HOST,
retry)
+ .or_else(|_| make_metadata_request_for_email(client, METADATA_IP,
retry))
+ .await?;
+
+ let token = TemporaryToken {
+ token: Arc::new(GcpSigningCredential {
+ email,
+ private_key: None,
+ }),
+ expiry: None,
+ };
+ Ok(token)
+ }
+}
+
/// A deserialized `application_default_credentials.json`-file.
///
/// # References
/// -
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
/// - <https://google.aip.dev/auth/4110>
-#[derive(serde::Deserialize)]
+#[derive(serde::Deserialize, Clone)]
#[serde(tag = "type")]
pub enum ApplicationDefaultCredentials {
/// Service Account.
@@ -423,13 +547,65 @@ impl ApplicationDefaultCredentials {
const DEFAULT_TOKEN_GCP_URI: &str =
"https://accounts.google.com/o/oauth2/token";
/// <https://google.aip.dev/auth/4113>
-#[derive(Debug, Deserialize)]
+#[derive(Debug, Deserialize, Clone)]
pub struct AuthorizedUserCredentials {
client_id: String,
client_secret: String,
refresh_token: String,
}
+#[derive(Debug, Deserialize)]
+pub struct AuthorizedUserSigningCredentials {
+ credential: AuthorizedUserCredentials,
+}
+
+///<https://oauth2.googleapis.com/tokeninfo?access_token=ACCESS_TOKEN>
+#[derive(Debug, Deserialize)]
+struct EmailResponse {
+ email: String,
+}
+
+impl AuthorizedUserSigningCredentials {
+ pub fn from(credential: AuthorizedUserCredentials) -> crate::Result<Self> {
+ Ok(Self { credential })
+ }
+
+ async fn client_email(&self, client: &Client, retry: &RetryConfig) ->
crate::Result<String> {
+ let response = client
+ .request(Method::GET, "https://oauth2.googleapis.com/tokeninfo")
+ .query(&[("access_token", &self.credential.refresh_token)])
+ .send_retry(retry)
+ .await
+ .context(TokenRequestSnafu)?
+ .json::<EmailResponse>()
+ .await
+ .context(TokenResponseBodySnafu)?;
+
+ Ok(response.email)
+ }
+}
+
+#[async_trait]
+impl TokenProvider for AuthorizedUserSigningCredentials {
+ type Credential = GcpSigningCredential;
+
+ async fn fetch_token(
+ &self,
+ client: &Client,
+ retry: &RetryConfig,
+ ) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
+ let email = self.client_email(client, retry).await?;
+
+ Ok(TemporaryToken {
+ token: Arc::new(GcpSigningCredential {
+ email,
+ private_key: None,
+ }),
+ expiry: None,
+ })
+ }
+}
+
#[async_trait]
impl TokenProvider for AuthorizedUserCredentials {
type Credential = GcpCredential;
@@ -462,3 +638,208 @@ impl TokenProvider for AuthorizedUserCredentials {
})
}
}
+
+/// Trim whitespace from header values
+fn trim_header_value(value: &str) -> String {
+ let mut ret = value.to_string();
+ ret.retain(|c| !c.is_whitespace());
+ ret
+}
+
+/// A Google Cloud Storage Authorizer for generating signed URL using [Google
SigV4]
+///
+/// [Google SigV4]:
https://cloud.google.com/storage/docs/access-control/signed-urls
+#[derive(Debug)]
+pub struct GCSAuthorizer {
+ date: Option<DateTime<Utc>>,
+ credential: Arc<GcpSigningCredential>,
+}
+
+impl GCSAuthorizer {
+ /// Create a new [`GCSAuthorizer`]
+ pub fn new(credential: Arc<GcpSigningCredential>) -> Self {
+ Self {
+ date: None,
+ credential,
+ }
+ }
+
+ pub(crate) async fn sign(
+ &self,
+ method: Method,
+ url: &mut Url,
+ expires_in: Duration,
+ client: &GoogleCloudStorageClient,
+ ) -> crate::Result<()> {
+ let email = &self.credential.email;
+ let date = self.date.unwrap_or_else(Utc::now);
+ let scope = self.scope(date);
+ let credential_with_scope = format!("{}/{}", email, scope);
+
+ let mut headers = HeaderMap::new();
+ headers.insert("host", DEFAULT_GCS_SIGN_BLOB_HOST.parse().unwrap());
+
+ let (_, signed_headers) = Self::canonicalize_headers(&headers);
+
+ url.query_pairs_mut()
+ .append_pair("X-Goog-Algorithm", "GOOG4-RSA-SHA256")
+ .append_pair("X-Goog-Credential", &credential_with_scope)
+ .append_pair("X-Goog-Date",
&date.format("%Y%m%dT%H%M%SZ").to_string())
+ .append_pair("X-Goog-Expires", &expires_in.as_secs().to_string())
+ .append_pair("X-Goog-SignedHeaders", &signed_headers);
+
+ let string_to_sign = self.string_to_sign(date, &method, url, &headers);
+ let signature = match &self.credential.private_key {
+ Some(key) => key.sign(&string_to_sign)?,
+ None => client.sign_blob(&string_to_sign, email).await?,
+ };
+
+ url.query_pairs_mut()
+ .append_pair("X-Goog-Signature", &signature);
+ Ok(())
+ }
+
+ /// Get scope for the request
+ ///
+ ///
<https://cloud.google.com/storage/docs/authentication/signatures#credential-scope>
+ fn scope(&self, date: DateTime<Utc>) -> String {
+ format!("{}/auto/storage/goog4_request", date.format("%Y%m%d"),)
+ }
+
+ /// Canonicalizes query parameters into the GCP canonical form
+ /// form like:
+ ///```plaintext
+ ///HTTP_VERB
+ ///PATH_TO_RESOURCE
+ ///CANONICAL_QUERY_STRING
+ ///CANONICAL_HEADERS
+ ///
+ ///SIGNED_HEADERS
+ ///PAYLOAD
+ ///```
+ ///
+ ///
<https://cloud.google.com/storage/docs/authentication/canonical-requests>
+ fn canonicalize_request(url: &Url, methond: &Method, headers: &HeaderMap)
-> String {
+ let verb = methond.as_str();
+ let path = url.path();
+ let query = Self::canonicalize_query(url);
+ let (canaonical_headers, signed_headers) =
Self::canonicalize_headers(headers);
+
+ format!(
+ "{}\n{}\n{}\n{}\n\n{}\n{}",
+ verb, path, query, canaonical_headers, signed_headers,
DEFAULT_GCS_PLAYLOAD_STRING
+ )
+ }
+
+ /// Canonicalizes query parameters into the GCP canonical form
+ /// form like `max-keys=2&prefix=object`
+ ///
+ ///
<https://cloud.google.com/storage/docs/authentication/canonical-requests#about-query-strings>
+ fn canonicalize_query(url: &Url) -> String {
+ url.query_pairs()
+ .sorted_unstable_by(|a, b| a.0.cmp(&b.0))
+ .map(|(k, v)| {
+ format!(
+ "{}={}",
+ utf8_percent_encode(k.as_ref(), &STRICT_ENCODE_SET),
+ utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)
+ )
+ })
+ .join("&")
+ }
+
+ /// Canonicalizes header into the GCP canonical form
+ ///
+ ///
<https://cloud.google.com/storage/docs/authentication/canonical-requests#about-headers>
+ fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
+ //FIXME add error handling for invalid header values
+ let mut headers = BTreeMap::<String, Vec<&str>>::new();
+ for (k, v) in header_map {
+ headers
+ .entry(k.as_str().to_lowercase())
+ .or_default()
+ .push(std::str::from_utf8(v.as_bytes()).unwrap());
+ }
+
+ let canonicalize_headers = headers
+ .iter()
+ .map(|(k, v)| {
+ format!(
+ "{}:{}",
+ k.trim(),
+ v.iter().map(|v| trim_header_value(v)).join(",")
+ )
+ })
+ .join("\n");
+
+ let signed_headers = headers.keys().join(";");
+
+ (canonicalize_headers, signed_headers)
+ }
+
+ ///construct the string to sign
+ ///form like:
+ ///```plaintext
+ ///SIGNING_ALGORITHM
+ ///ACTIVE_DATETIME
+ ///CREDENTIAL_SCOPE
+ ///HASHED_CANONICAL_REQUEST
+ ///```
+ ///`ACTIVE_DATETIME` format:`YYYYMMDD'T'HHMMSS'Z'`
+ ///
<https://cloud.google.com/storage/docs/authentication/signatures#string-to-sign>
+ pub fn string_to_sign(
+ &self,
+ date: DateTime<Utc>,
+ request_method: &Method,
+ url: &Url,
+ headers: &HeaderMap,
+ ) -> String {
+ let caninical_request = Self::canonicalize_request(url,
request_method, headers);
+ let hashed_canonical_req = hex_digest(caninical_request.as_bytes());
+ let scope = self.scope(date);
+
+ format!(
+ "{}\n{}\n{}\n{}",
+ "GOOG4-RSA-SHA256",
+ date.format("%Y%m%dT%H%M%SZ"),
+ scope,
+ hashed_canonical_req
+ )
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_canonicalize_headers() {
+ let mut input_header = HeaderMap::new();
+ input_header.insert("content-type", "text/plain".parse().unwrap());
+ input_header.insert("host", "storage.googleapis.com".parse().unwrap());
+ input_header.insert("x-goog-meta-reviewer", "jane".parse().unwrap());
+ input_header.append("x-goog-meta-reviewer", "john".parse().unwrap());
+ assert_eq!(
+ GCSAuthorizer::canonicalize_headers(&input_header),
+ (
+ "content-type:text/plain
+host:storage.googleapis.com
+x-goog-meta-reviewer:jane,john"
+ .into(),
+ "content-type;host;x-goog-meta-reviewer".to_string()
+ )
+ );
+ }
+
+ #[test]
+ fn test_canonicalize_query() {
+ let mut url =
Url::parse("https://storage.googleapis.com/bucket/object").unwrap();
+ url.query_pairs_mut()
+ .append_pair("max-keys", "2")
+ .append_pair("prefix", "object");
+ assert_eq!(
+ GCSAuthorizer::canonicalize_query(&url),
+ "max-keys=2&prefix=object".to_string()
+ );
+ }
+}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 2058d1f8055..96afa45f2b6 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -35,8 +35,11 @@
//!
//! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
use std::sync::Arc;
+use std::time::Duration;
use crate::client::CredentialProvider;
+use crate::gcp::credential::GCSAuthorizer;
+use crate::signer::Signer;
use crate::{
multipart::PartId, path::Path, GetOptions, GetResult, ListResult,
MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart,
@@ -45,13 +48,15 @@ use async_trait::async_trait;
use bytes::Bytes;
use client::GoogleCloudStorageClient;
use futures::stream::BoxStream;
+use hyper::Method;
+use url::Url;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::parts::Parts;
use crate::multipart::MultipartStore;
pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
-pub use credential::GcpCredential;
+pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
mod builder;
mod client;
@@ -62,6 +67,10 @@ const STORE: &str = "GCS";
/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential =
GcpCredential>>;
+/// [`GcpSigningCredential`] for [`GoogleCloudStorage`]
+pub type GcpSigningCredentialProvider =
+ Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
+
/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
#[derive(Debug)]
pub struct GoogleCloudStorage {
@@ -83,6 +92,11 @@ impl GoogleCloudStorage {
pub fn credentials(&self) -> &GcpCredentialProvider {
&self.client.config().credentials
}
+
+ /// Returns the [`GcpSigningCredentialProvider`] used by
[`GoogleCloudStorage`]
+ pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
+ &self.client.config().signing_credentials
+ }
}
#[derive(Debug)]
@@ -215,6 +229,34 @@ impl MultipartStore for GoogleCloudStorage {
}
}
+#[async_trait]
+impl Signer for GoogleCloudStorage {
+ async fn signed_url(&self, method: Method, path: &Path, expires_in:
Duration) -> Result<Url> {
+ if expires_in.as_secs() > 604800 {
+ return Err(crate::Error::Generic {
+ store: STORE,
+ source: "Expiration Time can't be longer than 604800 seconds
(7 days).".into(),
+ });
+ }
+
+ let config = self.client.config();
+ let path_url = config.path_url(path);
+ let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
+ store: STORE,
+ source: format!("Unable to parse url {path_url}: {e}").into(),
+ })?;
+
+ let signing_credentials =
self.signing_credentials().get_credential().await?;
+ let authorizer = GCSAuthorizer::new(signing_credentials);
+
+ authorizer
+ .sign(method, &mut url, expires_in, &self.client)
+ .await?;
+
+ Ok(url)
+ }
+}
+
#[cfg(test)]
mod test {
@@ -250,6 +292,36 @@ mod test {
}
}
+ #[tokio::test]
+ #[ignore]
+ async fn gcs_test_sign() {
+ crate::test_util::maybe_skip_integration!();
+ let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
+
+ let client = reqwest::Client::new();
+
+ let path = Path::from("test_sign");
+ let url = integration
+ .signed_url(Method::PUT, &path, Duration::from_secs(3600))
+ .await
+ .unwrap();
+ println!("PUT {url}");
+
+ let resp = client.put(url).body("data").send().await.unwrap();
+ resp.error_for_status().unwrap();
+
+ let url = integration
+ .signed_url(Method::GET, &path, Duration::from_secs(3600))
+ .await
+ .unwrap();
+ println!("GET {url}");
+
+ let resp = client.get(url).send().await.unwrap();
+ let resp = resp.error_for_status().unwrap();
+ let data = resp.bytes().await.unwrap();
+ assert_eq!(data.as_ref(), b"data");
+ }
+
#[tokio::test]
async fn gcs_test_get_nonexistent_location() {
crate::test_util::maybe_skip_integration!();
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index a19d5aab4b5..161d2d138e0 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -285,6 +285,35 @@ impl<T: RangeBounds<usize>> From<T> for GetRange {
}
}
}
+//
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+//
+// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
+// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~
).
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
percent_encoding::NON_ALPHANUMERIC
+ .remove(b'-')
+ .remove(b'.')
+ .remove(b'_')
+ .remove(b'~');
+
+/// Computes the SHA256 digest of `body` returned as a hex encoded string
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) fn hex_digest(bytes: &[u8]) -> String {
+ let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
+ hex_encode(digest.as_ref())
+}
+
+/// Returns `bytes` as a lower-case hex encoded string
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) fn hex_encode(bytes: &[u8]) -> String {
+ use std::fmt::Write;
+ let mut out = String::with_capacity(bytes.len() * 2);
+ for byte in bytes {
+ // String writing is infallible
+ let _ = write!(out, "{byte:02x}");
+ }
+ out
+}
#[cfg(test)]
mod tests {