This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a0baf1aeee Add GCS signed URL support (#5300)
5a0baf1aeee is described below

commit 5a0baf1aeeead4c461f1c8d14bfc65c85cfff78e
Author: Yu Zeng <[email protected]>
AuthorDate: Thu Apr 4 23:23:18 2024 +0800

    Add GCS signed URL support (#5300)
    
    * add util function for gcp sign url
    
    * add string to sign and other sign functions
    
    * add GoogleCloudStorageConfig::new and config and move functions to client
    
    * add more code and rearrange struct
    
    * add client_email for credential and return the signed url
    
    * clean some code
    
    * add client email for AuthorizedUserCredentials
    
    * tidy some code
    
    * format doc
    
    * Add GcpSigningCredentialProvider for getting email
    
    * add test
    
    * Move some functions which shared by aws and gcp to utils.
    
    * fix some bug and make it can get proper result
    
    * remoe useless code
    
    * tidy some code
    
    * do not export host
    
    * add sign_by_key
    
    * Cleanup
    
    * Add ServiceAccountKey
    
    * Further tweaks
    
    * add more scope for signing.
    
    * tidy
    
    * Tweak and add test
    
    * Retry and handle errors for signBlob
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 object_store/src/aws/credential.rs |  19 +-
 object_store/src/aws/mod.rs        |  11 +-
 object_store/src/gcp/builder.rs    |  55 ++++-
 object_store/src/gcp/client.rs     | 105 ++++++++-
 object_store/src/gcp/credential.rs | 465 +++++++++++++++++++++++++++++++++----
 object_store/src/gcp/mod.rs        |  74 +++++-
 object_store/src/util.rs           |  29 +++
 7 files changed, 677 insertions(+), 81 deletions(-)

diff --git a/object_store/src/aws/credential.rs 
b/object_store/src/aws/credential.rs
index dd7fa5b41da..478e56dd09c 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -19,7 +19,7 @@ use crate::aws::{AwsCredentialProvider, STORE, 
STRICT_ENCODE_SET, STRICT_PATH_EN
 use crate::client::retry::RetryExt;
 use crate::client::token::{TemporaryToken, TokenCache};
 use crate::client::TokenProvider;
-use crate::util::hmac_sha256;
+use crate::util::{hex_digest, hex_encode, hmac_sha256};
 use crate::{CredentialProvider, Result, RetryConfig};
 use async_trait::async_trait;
 use bytes::Buf;
@@ -342,23 +342,6 @@ impl CredentialExt for RequestBuilder {
     }
 }
 
-/// Computes the SHA256 digest of `body` returned as a hex encoded string
-fn hex_digest(bytes: &[u8]) -> String {
-    let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
-    hex_encode(digest.as_ref())
-}
-
-/// Returns `bytes` as a lower-case hex encoded string
-fn hex_encode(bytes: &[u8]) -> String {
-    use std::fmt::Write;
-    let mut out = String::with_capacity(bytes.len() * 2);
-    for byte in bytes {
-        // String writing is infallible
-        let _ = write!(out, "{byte:02x}");
-    }
-    out
-}
-
 /// Canonicalizes query parameters into the AWS canonical form
 ///
 /// 
<https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b33771de9a8..76d01d59704 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -43,6 +43,7 @@ use crate::client::list::ListClientExt;
 use crate::client::CredentialProvider;
 use crate::multipart::{MultipartStore, PartId};
 use crate::signer::Signer;
+use crate::util::STRICT_ENCODE_SET;
 use crate::{
     Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, 
ObjectMeta,
     ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
@@ -64,16 +65,6 @@ pub use dynamo::DynamoCommit;
 pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
 pub use resolve::resolve_bucket_region;
 
-// 
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
-//
-// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
-// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ 
).
-pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = 
percent_encoding::NON_ALPHANUMERIC
-    .remove(b'-')
-    .remove(b'.')
-    .remove(b'_')
-    .remove(b'~');
-
 /// This struct is used to maintain the URI path encoding
 const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = 
STRICT_ENCODE_SET.remove(b'/');
 
diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs
index 2cf75040b85..4fa91677aa7 100644
--- a/object_store/src/gcp/builder.rs
+++ b/object_store/src/gcp/builder.rs
@@ -21,7 +21,10 @@ use crate::gcp::credential::{
     ApplicationDefaultCredentials, InstanceCredentialProvider, 
ServiceAccountCredentials,
     DEFAULT_GCS_BASE_URL,
 };
-use crate::gcp::{credential, GcpCredential, GcpCredentialProvider, 
GoogleCloudStorage, STORE};
+use crate::gcp::{
+    credential, GcpCredential, GcpCredentialProvider, GcpSigningCredential,
+    GcpSigningCredentialProvider, GoogleCloudStorage, STORE,
+};
 use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, 
StaticCredentialProvider};
 use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
@@ -29,6 +32,8 @@ use std::str::FromStr;
 use std::sync::Arc;
 use url::Url;
 
+use super::credential::{AuthorizedUserSigningCredentials, 
InstanceSigningCredentialProvider};
+
 #[derive(Debug, Snafu)]
 enum Error {
     #[snafu(display("Missing bucket name"))]
@@ -107,6 +112,8 @@ pub struct GoogleCloudStorageBuilder {
     client_options: ClientOptions,
     /// Credentials
     credentials: Option<GcpCredentialProvider>,
+    /// Credentials for sign url
+    signing_cedentials: Option<GcpSigningCredentialProvider>,
 }
 
 /// Configuration keys for [`GoogleCloudStorageBuilder`]
@@ -202,6 +209,7 @@ impl Default for GoogleCloudStorageBuilder {
             client_options: ClientOptions::new().with_allow_http(true),
             url: None,
             credentials: None,
+            signing_cedentials: None,
         }
     }
 }
@@ -452,13 +460,13 @@ impl GoogleCloudStorageBuilder {
             Arc::new(StaticCredentialProvider::new(GcpCredential {
                 bearer: "".to_string(),
             })) as _
-        } else if let Some(credentials) = service_account_credentials {
+        } else if let Some(credentials) = service_account_credentials.clone() {
             Arc::new(TokenCredentialProvider::new(
                 credentials.token_provider()?,
                 self.client_options.client()?,
                 self.retry_config.clone(),
             )) as _
-        } else if let Some(credentials) = application_default_credentials {
+        } else if let Some(credentials) = 
application_default_credentials.clone() {
             match credentials {
                 ApplicationDefaultCredentials::AuthorizedUser(token) => {
                     Arc::new(TokenCredentialProvider::new(
@@ -483,13 +491,44 @@ impl GoogleCloudStorageBuilder {
             )) as _
         };
 
-        let config = GoogleCloudStorageConfig {
-            base_url: gcs_base_url,
+        let signing_credentials = if let Some(signing_credentials) = 
self.signing_cedentials {
+            signing_credentials
+        } else if disable_oauth {
+            Arc::new(StaticCredentialProvider::new(GcpSigningCredential {
+                email: "".to_string(),
+                private_key: None,
+            })) as _
+        } else if let Some(credentials) = service_account_credentials.clone() {
+            credentials.signing_credentials()?
+        } else if let Some(credentials) = 
application_default_credentials.clone() {
+            match credentials {
+                ApplicationDefaultCredentials::AuthorizedUser(token) => {
+                    Arc::new(TokenCredentialProvider::new(
+                        AuthorizedUserSigningCredentials::from(token)?,
+                        self.client_options.client()?,
+                        self.retry_config.clone(),
+                    )) as _
+                }
+                ApplicationDefaultCredentials::ServiceAccount(token) => {
+                    token.signing_credentials()?
+                }
+            }
+        } else {
+            Arc::new(TokenCredentialProvider::new(
+                InstanceSigningCredentialProvider::default(),
+                self.client_options.metadata_client()?,
+                self.retry_config.clone(),
+            )) as _
+        };
+
+        let config = GoogleCloudStorageConfig::new(
+            gcs_base_url,
             credentials,
+            signing_credentials,
             bucket_name,
-            retry_config: self.retry_config,
-            client_options: self.client_options,
-        };
+            self.retry_config,
+            self.client_options,
+        );
 
         Ok(GoogleCloudStorage {
             client: Arc::new(GoogleCloudStorageClient::new(config)?),
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index def53beefe7..901257f917c 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -24,19 +24,22 @@ use crate::client::s3::{
     ListResponse,
 };
 use crate::client::GetOptionsExt;
-use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
+use crate::gcp::{GcpCredential, GcpCredentialProvider, 
GcpSigningCredentialProvider, STORE};
 use crate::multipart::PartId;
 use crate::path::{Path, DELIMITER};
+use crate::util::hex_encode;
 use crate::{
     ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, 
PutResult, Result,
     RetryConfig,
 };
 use async_trait::async_trait;
+use base64::prelude::BASE64_STANDARD;
+use base64::Engine;
 use bytes::{Buf, Bytes};
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::header::HeaderName;
 use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use std::sync::Arc;
 
@@ -101,6 +104,15 @@ enum Error {
 
     #[snafu(display("Got invalid multipart response: {}", source))]
     InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Error signing blob: {}", source))]
+    SignBlobRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Got invalid signing blob repsonse: {}", source))]
+    InvalidSignBlobResponse { source: reqwest::Error },
+
+    #[snafu(display("Got invalid signing blob signature: {}", source))]
+    InvalidSignBlobSignature { source: base64::DecodeError },
 }
 
 impl From<Error> for crate::Error {
@@ -123,6 +135,8 @@ pub struct GoogleCloudStorageConfig {
 
     pub credentials: GcpCredentialProvider,
 
+    pub signing_credentials: GcpSigningCredentialProvider,
+
     pub bucket_name: String,
 
     pub retry_config: RetryConfig,
@@ -130,6 +144,30 @@ pub struct GoogleCloudStorageConfig {
     pub client_options: ClientOptions,
 }
 
+impl GoogleCloudStorageConfig {
+    pub fn new(
+        base_url: String,
+        credentials: GcpCredentialProvider,
+        signing_credentials: GcpSigningCredentialProvider,
+        bucket_name: String,
+        retry_config: RetryConfig,
+        client_options: ClientOptions,
+    ) -> Self {
+        Self {
+            base_url,
+            credentials,
+            signing_credentials,
+            bucket_name,
+            retry_config,
+            client_options,
+        }
+    }
+
+    pub fn path_url(&self, path: &Path) -> String {
+        format!("{}/{}/{}", self.base_url, self.bucket_name, path)
+    }
+}
+
 /// A builder for a put request allowing customisation of the headers and 
query string
 pub struct PutRequest<'a> {
     path: &'a Path,
@@ -163,6 +201,21 @@ impl<'a> PutRequest<'a> {
     }
 }
 
+/// Sign Blob Request Body
+#[derive(Debug, Serialize)]
+struct SignBlobBody {
+    /// The payload to sign
+    payload: String,
+}
+
+/// Sign Blob Response
+#[derive(Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct SignBlobResponse {
+    /// The signature for the payload
+    signed_blob: String,
+}
+
 #[derive(Debug)]
 pub struct GoogleCloudStorageClient {
     config: GoogleCloudStorageConfig,
@@ -197,6 +250,54 @@ impl GoogleCloudStorageClient {
         self.config.credentials.get_credential().await
     }
 
+    /// Create a signature from a string-to-sign using Google Cloud signBlob 
method.
+    /// form like:
+    /// ```plaintext
+    /// curl -X POST --data-binary @JSON_FILE_NAME \
+    /// -H "Authorization: Bearer OAUTH2_TOKEN" \
+    /// -H "Content-Type: application/json" \
+    /// 
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/SERVICE_ACCOUNT_EMAIL:signBlob";
+    /// ```
+    ///
+    /// 'JSON_FILE_NAME' is a file containing the following JSON object:
+    /// ```plaintext
+    /// {
+    ///  "payload": "REQUEST_INFORMATION"
+    /// }
+    /// ```
+    pub async fn sign_blob(&self, string_to_sign: &str, client_email: &str) -> 
Result<String> {
+        let credential = self.get_credential().await?;
+        let body = SignBlobBody {
+            payload: BASE64_STANDARD.encode(string_to_sign),
+        };
+
+        let url = format!(
+            
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob";,
+            client_email
+        );
+
+        let response = self
+            .client
+            .post(&url)
+            .bearer_auth(&credential.bearer)
+            .json(&body)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(SignBlobRequestSnafu)?;
+
+        //If successful, the signature is returned in the signedBlob field in 
the response.
+        let response = response
+            .json::<SignBlobResponse>()
+            .await
+            .context(InvalidSignBlobResponseSnafu)?;
+
+        let signed_blob = BASE64_STANDARD
+            .decode(response.signed_blob)
+            .context(InvalidSignBlobSignatureSnafu)?;
+
+        Ok(hex_encode(&signed_blob))
+    }
+
     pub fn object_url(&self, path: &Path) -> String {
         let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
         format!(
diff --git a/object_store/src/gcp/credential.rs 
b/object_store/src/gcp/credential.rs
index 34cd6eeb6ea..fcd516a1bf1 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -15,19 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::client::GoogleCloudStorageClient;
 use crate::client::retry::RetryExt;
 use crate::client::token::TemporaryToken;
 use crate::client::TokenProvider;
-use crate::gcp::STORE;
-use crate::RetryConfig;
+use crate::gcp::{GcpSigningCredentialProvider, STORE};
+use crate::util::{hex_digest, hex_encode, STRICT_ENCODE_SET};
+use crate::{RetryConfig, StaticCredentialProvider};
 use async_trait::async_trait;
 use base64::prelude::BASE64_URL_SAFE_NO_PAD;
 use base64::Engine;
+use chrono::{DateTime, Utc};
 use futures::TryFutureExt;
+use hyper::HeaderMap;
+use itertools::Itertools;
+use percent_encoding::utf8_percent_encode;
 use reqwest::{Client, Method};
 use ring::signature::RsaKeyPair;
 use serde::Deserialize;
 use snafu::{ResultExt, Snafu};
+use std::collections::BTreeMap;
 use std::env;
 use std::fs::File;
 use std::io::BufReader;
@@ -35,11 +42,15 @@ use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use std::time::{Duration, Instant};
 use tracing::info;
+use url::Url;
 
-pub const DEFAULT_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.full_control";;
+pub const DEFAULT_SCOPE: &str = 
"https://www.googleapis.com/auth/cloud-platform";;
 
 pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";;
 
+const DEFAULT_GCS_PLAYLOAD_STRING: &str = "UNSIGNED-PAYLOAD";
+const DEFAULT_GCS_SIGN_BLOB_HOST: &str = "storage.googleapis.com";
+
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("Unable to open service account file from {}: {}", 
path.display(), source))]
@@ -57,7 +68,7 @@ pub enum Error {
     #[snafu(display("Invalid RSA key: {}", source), context(false))]
     InvalidKey { source: ring::error::KeyRejected },
 
-    #[snafu(display("Error signing jwt: {}", source))]
+    #[snafu(display("Error signing: {}", source))]
     Sign { source: ring::error::Unspecified },
 
     #[snafu(display("Error encoding jwt payload: {}", source))]
@@ -82,6 +93,69 @@ impl From<Error> for crate::Error {
     }
 }
 
+/// A Google Cloud Storage Credential for signing
+#[derive(Debug)]
+pub struct GcpSigningCredential {
+    /// The email of the service account
+    pub email: String,
+
+    /// An optional RSA private key
+    ///
+    /// If provided this will be used to sign the URL, otherwise a call will 
be made to
+    /// [`iam.serviceAccounts.signBlob`]. This allows supporting credential 
sources
+    /// that don't expose the service account private key, e.g. [IMDS].
+    ///
+    /// [IMDS]: 
https://cloud.google.com/docs/authentication/get-id-token#metadata-server
+    /// [`iam.serviceAccounts.signBlob`]: 
https://cloud.google.com/storage/docs/authentication/creating-signatures
+    pub private_key: Option<ServiceAccountKey>,
+}
+
+/// A private RSA key for a service account
+#[derive(Debug)]
+pub struct ServiceAccountKey(RsaKeyPair);
+
+impl ServiceAccountKey {
+    /// Parses a pem-encoded RSA key
+    pub fn from_pem(encoded: &[u8]) -> Result<Self> {
+        use rustls_pemfile::Item;
+        use std::io::Cursor;
+
+        let mut cursor = Cursor::new(encoded);
+        let mut reader = BufReader::new(&mut cursor);
+
+        // Reading from string is infallible
+        match rustls_pemfile::read_one(&mut reader).unwrap() {
+            Some(Item::Pkcs8Key(key)) => 
Self::from_pkcs8(key.secret_pkcs8_der()),
+            Some(Item::Pkcs1Key(key)) => 
Self::from_der(key.secret_pkcs1_der()),
+            _ => Err(Error::MissingKey),
+        }
+    }
+
+    /// Parses an unencrypted PKCS#8-encoded RSA private key.
+    pub fn from_pkcs8(key: &[u8]) -> Result<Self> {
+        Ok(Self(RsaKeyPair::from_pkcs8(key)?))
+    }
+
+    /// Parses an unencrypted PKCS#8-encoded RSA private key.
+    pub fn from_der(key: &[u8]) -> Result<Self> {
+        Ok(Self(RsaKeyPair::from_der(key)?))
+    }
+
+    fn sign(&self, string_to_sign: &str) -> Result<String> {
+        let mut signature = vec![0; self.0.public().modulus_len()];
+        self.0
+            .sign(
+                &ring::signature::RSA_PKCS1_SHA256,
+                &ring::rand::SystemRandom::new(),
+                string_to_sign.as_bytes(),
+                &mut signature,
+            )
+            .context(SignSnafu)?;
+
+        Ok(hex_encode(&signature))
+    }
+}
+
 /// A Google Cloud Storage Credential
 #[derive(Debug, Eq, PartialEq)]
 pub struct GcpCredential {
@@ -152,9 +226,8 @@ struct TokenResponse {
 pub struct SelfSignedJwt {
     issuer: String,
     scope: String,
-    key_pair: RsaKeyPair,
-    jwt_header: String,
-    random: ring::rand::SystemRandom,
+    private_key: ServiceAccountKey,
+    key_id: String,
 }
 
 impl SelfSignedJwt {
@@ -162,23 +235,14 @@ impl SelfSignedJwt {
     pub fn new(
         key_id: String,
         issuer: String,
-        private_key_pem: String,
+        private_key: ServiceAccountKey,
         scope: String,
     ) -> Result<Self> {
-        let key_pair = decode_first_rsa_key(private_key_pem)?;
-        let jwt_header = b64_encode_obj(&JwtHeader {
-            alg: "RS256",
-            typ: Some("JWT"),
-            kid: Some(&key_id),
-            ..Default::default()
-        })?;
-
         Ok(Self {
             issuer,
-            key_pair,
             scope,
-            jwt_header,
-            random: ring::rand::SystemRandom::new(),
+            private_key,
+            key_id,
         })
     }
 }
@@ -204,13 +268,21 @@ impl TokenProvider for SelfSignedJwt {
             exp,
         };
 
+        let jwt_header = b64_encode_obj(&JwtHeader {
+            alg: "RS256",
+            typ: Some("JWT"),
+            kid: Some(&self.key_id),
+            ..Default::default()
+        })?;
+
         let claim_str = b64_encode_obj(&claims)?;
-        let message = [self.jwt_header.as_ref(), claim_str.as_ref()].join(".");
-        let mut sig_bytes = vec![0; self.key_pair.public().modulus_len()];
-        self.key_pair
+        let message = [jwt_header.as_ref(), claim_str.as_ref()].join(".");
+        let mut sig_bytes = vec![0; self.private_key.0.public().modulus_len()];
+        self.private_key
+            .0
             .sign(
                 &ring::signature::RSA_PKCS1_SHA256,
-                &self.random,
+                &ring::rand::SystemRandom::new(),
                 message.as_bytes(),
                 &mut sig_bytes,
             )
@@ -238,7 +310,7 @@ where
 }
 
 /// A deserialized `service-account-********.json`-file.
-#[derive(serde::Deserialize, Debug)]
+#[derive(serde::Deserialize, Debug, Clone)]
 pub struct ServiceAccountCredentials {
     /// The private key in RSA format.
     pub private_key: String,
@@ -281,10 +353,19 @@ impl ServiceAccountCredentials {
         Ok(SelfSignedJwt::new(
             self.private_key_id,
             self.client_email,
-            self.private_key,
+            ServiceAccountKey::from_pem(self.private_key.as_bytes())?,
             DEFAULT_SCOPE.to_string(),
         )?)
     }
+
+    pub fn signing_credentials(self) -> 
crate::Result<GcpSigningCredentialProvider> {
+        Ok(Arc::new(StaticCredentialProvider::new(
+            GcpSigningCredential {
+                email: self.client_email,
+                private_key: 
Some(ServiceAccountKey::from_pem(self.private_key.as_bytes())?),
+            },
+        )))
+    }
 }
 
 /// Returns the number of seconds since unix epoch
@@ -295,21 +376,6 @@ fn seconds_since_epoch() -> u64 {
         .as_secs()
 }
 
-fn decode_first_rsa_key(private_key_pem: String) -> Result<RsaKeyPair> {
-    use rustls_pemfile::Item;
-    use std::io::Cursor;
-
-    let mut cursor = Cursor::new(private_key_pem);
-    let mut reader = BufReader::new(&mut cursor);
-
-    // Reading from string is infallible
-    match rustls_pemfile::read_one(&mut reader).unwrap() {
-        Some(Item::Pkcs8Key(key)) => 
Ok(RsaKeyPair::from_pkcs8(key.secret_pkcs8_der())?),
-        Some(Item::Pkcs1Key(key)) => 
Ok(RsaKeyPair::from_der(key.secret_pkcs1_der())?),
-        _ => Err(Error::MissingKey),
-    }
-}
-
 fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
@@ -360,6 +426,7 @@ impl TokenProvider for InstanceCredentialProvider {
         let response = make_metadata_request(client, METADATA_HOST, retry)
             .or_else(|_| make_metadata_request(client, METADATA_IP, retry))
             .await?;
+
         let token = TemporaryToken {
             token: Arc::new(GcpCredential {
                 bearer: response.access_token,
@@ -370,12 +437,69 @@ impl TokenProvider for InstanceCredentialProvider {
     }
 }
 
+/// Make a request to the metadata server to fetch the client email, using a 
given hostname.
+async fn make_metadata_request_for_email(
+    client: &Client,
+    hostname: &str,
+    retry: &RetryConfig,
+) -> crate::Result<String> {
+    let url =
+        
format!("http://{hostname}/computeMetadata/v1/instance/service-accounts/default/email";,);
+    let response = client
+        .request(Method::GET, url)
+        .header("Metadata-Flavor", "Google")
+        .send_retry(retry)
+        .await
+        .context(TokenRequestSnafu)?
+        .text()
+        .await
+        .context(TokenResponseBodySnafu)?;
+    Ok(response)
+}
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
email for signing.
+///
+/// 
<https://cloud.google.com/appengine/docs/legacy/standard/java/accessing-instance-metadata>
+#[derive(Debug, Default)]
+pub struct InstanceSigningCredentialProvider {}
+
+#[async_trait]
+impl TokenProvider for InstanceSigningCredentialProvider {
+    type Credential = GcpSigningCredential;
+
+    /// Fetch a token from the metadata server.
+    /// Since the connection is local we need to enable http access and don't 
actually use the client object passed in.
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
+        const METADATA_IP: &str = "169.254.169.254";
+        const METADATA_HOST: &str = "metadata";
+
+        info!("fetching token from metadata server");
+
+        let email = make_metadata_request_for_email(client, METADATA_HOST, 
retry)
+            .or_else(|_| make_metadata_request_for_email(client, METADATA_IP, 
retry))
+            .await?;
+
+        let token = TemporaryToken {
+            token: Arc::new(GcpSigningCredential {
+                email,
+                private_key: None,
+            }),
+            expiry: None,
+        };
+        Ok(token)
+    }
+}
+
 /// A deserialized `application_default_credentials.json`-file.
 ///
 /// # References
 /// - 
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
 /// - <https://google.aip.dev/auth/4110>
-#[derive(serde::Deserialize)]
+#[derive(serde::Deserialize, Clone)]
 #[serde(tag = "type")]
 pub enum ApplicationDefaultCredentials {
     /// Service Account.
@@ -423,13 +547,65 @@ impl ApplicationDefaultCredentials {
 const DEFAULT_TOKEN_GCP_URI: &str = 
"https://accounts.google.com/o/oauth2/token";;
 
 /// <https://google.aip.dev/auth/4113>
-#[derive(Debug, Deserialize)]
+#[derive(Debug, Deserialize, Clone)]
 pub struct AuthorizedUserCredentials {
     client_id: String,
     client_secret: String,
     refresh_token: String,
 }
 
+#[derive(Debug, Deserialize)]
+pub struct AuthorizedUserSigningCredentials {
+    credential: AuthorizedUserCredentials,
+}
+
+///<https://oauth2.googleapis.com/tokeninfo?access_token=ACCESS_TOKEN>
+#[derive(Debug, Deserialize)]
+struct EmailResponse {
+    email: String,
+}
+
+impl AuthorizedUserSigningCredentials {
+    pub fn from(credential: AuthorizedUserCredentials) -> crate::Result<Self> {
+        Ok(Self { credential })
+    }
+
+    async fn client_email(&self, client: &Client, retry: &RetryConfig) -> 
crate::Result<String> {
+        let response = client
+            .request(Method::GET, "https://oauth2.googleapis.com/tokeninfo";)
+            .query(&[("access_token", &self.credential.refresh_token)])
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json::<EmailResponse>()
+            .await
+            .context(TokenResponseBodySnafu)?;
+
+        Ok(response.email)
+    }
+}
+
+#[async_trait]
+impl TokenProvider for AuthorizedUserSigningCredentials {
+    type Credential = GcpSigningCredential;
+
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> crate::Result<TemporaryToken<Arc<GcpSigningCredential>>> {
+        let email = self.client_email(client, retry).await?;
+
+        Ok(TemporaryToken {
+            token: Arc::new(GcpSigningCredential {
+                email,
+                private_key: None,
+            }),
+            expiry: None,
+        })
+    }
+}
+
 #[async_trait]
 impl TokenProvider for AuthorizedUserCredentials {
     type Credential = GcpCredential;
@@ -462,3 +638,208 @@ impl TokenProvider for AuthorizedUserCredentials {
         })
     }
 }
+
+/// Trim whitespace from header values
+fn trim_header_value(value: &str) -> String {
+    let mut ret = value.to_string();
+    ret.retain(|c| !c.is_whitespace());
+    ret
+}
+
+/// A Google Cloud Storage Authorizer for generating signed URL using [Google 
SigV4]
+///
+/// [Google SigV4]: 
https://cloud.google.com/storage/docs/access-control/signed-urls
+#[derive(Debug)]
+pub struct GCSAuthorizer {
+    date: Option<DateTime<Utc>>,
+    credential: Arc<GcpSigningCredential>,
+}
+
+impl GCSAuthorizer {
+    /// Create a new [`GCSAuthorizer`]
+    pub fn new(credential: Arc<GcpSigningCredential>) -> Self {
+        Self {
+            date: None,
+            credential,
+        }
+    }
+
+    pub(crate) async fn sign(
+        &self,
+        method: Method,
+        url: &mut Url,
+        expires_in: Duration,
+        client: &GoogleCloudStorageClient,
+    ) -> crate::Result<()> {
+        let email = &self.credential.email;
+        let date = self.date.unwrap_or_else(Utc::now);
+        let scope = self.scope(date);
+        let credential_with_scope = format!("{}/{}", email, scope);
+
+        let mut headers = HeaderMap::new();
+        headers.insert("host", DEFAULT_GCS_SIGN_BLOB_HOST.parse().unwrap());
+
+        let (_, signed_headers) = Self::canonicalize_headers(&headers);
+
+        url.query_pairs_mut()
+            .append_pair("X-Goog-Algorithm", "GOOG4-RSA-SHA256")
+            .append_pair("X-Goog-Credential", &credential_with_scope)
+            .append_pair("X-Goog-Date", 
&date.format("%Y%m%dT%H%M%SZ").to_string())
+            .append_pair("X-Goog-Expires", &expires_in.as_secs().to_string())
+            .append_pair("X-Goog-SignedHeaders", &signed_headers);
+
+        let string_to_sign = self.string_to_sign(date, &method, url, &headers);
+        let signature = match &self.credential.private_key {
+            Some(key) => key.sign(&string_to_sign)?,
+            None => client.sign_blob(&string_to_sign, email).await?,
+        };
+
+        url.query_pairs_mut()
+            .append_pair("X-Goog-Signature", &signature);
+        Ok(())
+    }
+
+    /// Get scope for the request
+    ///
+    /// 
<https://cloud.google.com/storage/docs/authentication/signatures#credential-scope>
+    fn scope(&self, date: DateTime<Utc>) -> String {
+        format!("{}/auto/storage/goog4_request", date.format("%Y%m%d"),)
+    }
+
+    /// Canonicalizes query parameters into the GCP canonical form
+    /// form like:
+    ///```plaintext
+    ///HTTP_VERB  
+    ///PATH_TO_RESOURCE  
+    ///CANONICAL_QUERY_STRING  
+    ///CANONICAL_HEADERS  
+    ///
+    ///SIGNED_HEADERS  
+    ///PAYLOAD
+    ///```
+    ///
+    /// 
<https://cloud.google.com/storage/docs/authentication/canonical-requests>
+    fn canonicalize_request(url: &Url, methond: &Method, headers: &HeaderMap) 
-> String {
+        let verb = methond.as_str();
+        let path = url.path();
+        let query = Self::canonicalize_query(url);
+        let (canaonical_headers, signed_headers) = 
Self::canonicalize_headers(headers);
+
+        format!(
+            "{}\n{}\n{}\n{}\n\n{}\n{}",
+            verb, path, query, canaonical_headers, signed_headers, 
DEFAULT_GCS_PLAYLOAD_STRING
+        )
+    }
+
+    /// Canonicalizes query parameters into the GCP canonical form
+    /// form like `max-keys=2&prefix=object`
+    ///
+    /// 
<https://cloud.google.com/storage/docs/authentication/canonical-requests#about-query-strings>
+    fn canonicalize_query(url: &Url) -> String {
+        url.query_pairs()
+            .sorted_unstable_by(|a, b| a.0.cmp(&b.0))
+            .map(|(k, v)| {
+                format!(
+                    "{}={}",
+                    utf8_percent_encode(k.as_ref(), &STRICT_ENCODE_SET),
+                    utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)
+                )
+            })
+            .join("&")
+    }
+
+    /// Canonicalizes header into the GCP canonical form
+    ///
+    /// 
<https://cloud.google.com/storage/docs/authentication/canonical-requests#about-headers>
+    fn canonicalize_headers(header_map: &HeaderMap) -> (String, String) {
+        //FIXME add error handling for invalid header values
+        let mut headers = BTreeMap::<String, Vec<&str>>::new();
+        for (k, v) in header_map {
+            headers
+                .entry(k.as_str().to_lowercase())
+                .or_default()
+                .push(std::str::from_utf8(v.as_bytes()).unwrap());
+        }
+
+        let canonicalize_headers = headers
+            .iter()
+            .map(|(k, v)| {
+                format!(
+                    "{}:{}",
+                    k.trim(),
+                    v.iter().map(|v| trim_header_value(v)).join(",")
+                )
+            })
+            .join("\n");
+
+        let signed_headers = headers.keys().join(";");
+
+        (canonicalize_headers, signed_headers)
+    }
+
+    ///construct the string to sign
+    ///form like:
+    ///```plaintext
+    ///SIGNING_ALGORITHM  
+    ///ACTIVE_DATETIME  
+    ///CREDENTIAL_SCOPE  
+    ///HASHED_CANONICAL_REQUEST
+    ///```
+    ///`ACTIVE_DATETIME` format:`YYYYMMDD'T'HHMMSS'Z'`
+    /// 
<https://cloud.google.com/storage/docs/authentication/signatures#string-to-sign>
+    pub fn string_to_sign(
+        &self,
+        date: DateTime<Utc>,
+        request_method: &Method,
+        url: &Url,
+        headers: &HeaderMap,
+    ) -> String {
+        let caninical_request = Self::canonicalize_request(url, 
request_method, headers);
+        let hashed_canonical_req = hex_digest(caninical_request.as_bytes());
+        let scope = self.scope(date);
+
+        format!(
+            "{}\n{}\n{}\n{}",
+            "GOOG4-RSA-SHA256",
+            date.format("%Y%m%dT%H%M%SZ"),
+            scope,
+            hashed_canonical_req
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_canonicalize_headers() {
+        let mut input_header = HeaderMap::new();
+        input_header.insert("content-type", "text/plain".parse().unwrap());
+        input_header.insert("host", "storage.googleapis.com".parse().unwrap());
+        input_header.insert("x-goog-meta-reviewer", "jane".parse().unwrap());
+        input_header.append("x-goog-meta-reviewer", "john".parse().unwrap());
+        assert_eq!(
+            GCSAuthorizer::canonicalize_headers(&input_header),
+            (
+                "content-type:text/plain
+host:storage.googleapis.com
+x-goog-meta-reviewer:jane,john"
+                    .into(),
+                "content-type;host;x-goog-meta-reviewer".to_string()
+            )
+        );
+    }
+
+    #[test]
+    fn test_canonicalize_query() {
+        let mut url = 
Url::parse("https://storage.googleapis.com/bucket/object";).unwrap();
+        url.query_pairs_mut()
+            .append_pair("max-keys", "2")
+            .append_pair("prefix", "object");
+        assert_eq!(
+            GCSAuthorizer::canonicalize_query(&url),
+            "max-keys=2&prefix=object".to_string()
+        );
+    }
+}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 2058d1f8055..96afa45f2b6 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -35,8 +35,11 @@
 //!
 //! [lifecycle rule]: https://cloud.google.com/storage/docs/lifecycle#abort-mpu
 use std::sync::Arc;
+use std::time::Duration;
 
 use crate::client::CredentialProvider;
+use crate::gcp::credential::GCSAuthorizer;
+use crate::signer::Signer;
 use crate::{
     multipart::PartId, path::Path, GetOptions, GetResult, ListResult, 
MultipartId, MultipartUpload,
     ObjectMeta, ObjectStore, PutOptions, PutResult, Result, UploadPart,
@@ -45,13 +48,15 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use client::GoogleCloudStorageClient;
 use futures::stream::BoxStream;
+use hyper::Method;
+use url::Url;
 
 use crate::client::get::GetClientExt;
 use crate::client::list::ListClientExt;
 use crate::client::parts::Parts;
 use crate::multipart::MultipartStore;
 pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
-pub use credential::GcpCredential;
+pub use credential::{GcpCredential, GcpSigningCredential, ServiceAccountKey};
 
 mod builder;
 mod client;
@@ -62,6 +67,10 @@ const STORE: &str = "GCS";
 /// [`CredentialProvider`] for [`GoogleCloudStorage`]
 pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = 
GcpCredential>>;
 
+/// [`GcpSigningCredential`] for [`GoogleCloudStorage`]
+pub type GcpSigningCredentialProvider =
+    Arc<dyn CredentialProvider<Credential = GcpSigningCredential>>;
+
 /// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
 #[derive(Debug)]
 pub struct GoogleCloudStorage {
@@ -83,6 +92,11 @@ impl GoogleCloudStorage {
     pub fn credentials(&self) -> &GcpCredentialProvider {
         &self.client.config().credentials
     }
+
+    /// Returns the [`GcpSigningCredentialProvider`] used by 
[`GoogleCloudStorage`]
+    pub fn signing_credentials(&self) -> &GcpSigningCredentialProvider {
+        &self.client.config().signing_credentials
+    }
 }
 
 #[derive(Debug)]
@@ -215,6 +229,34 @@ impl MultipartStore for GoogleCloudStorage {
     }
 }
 
+#[async_trait]
+impl Signer for GoogleCloudStorage {
+    async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url> {
+        if expires_in.as_secs() > 604800 {
+            return Err(crate::Error::Generic {
+                store: STORE,
+                source: "Expiration Time can't be longer than 604800 seconds 
(7 days).".into(),
+            });
+        }
+
+        let config = self.client.config();
+        let path_url = config.path_url(path);
+        let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
+            store: STORE,
+            source: format!("Unable to parse url {path_url}: {e}").into(),
+        })?;
+
+        let signing_credentials = 
self.signing_credentials().get_credential().await?;
+        let authorizer = GCSAuthorizer::new(signing_credentials);
+
+        authorizer
+            .sign(method, &mut url, expires_in, &self.client)
+            .await?;
+
+        Ok(url)
+    }
+}
+
 #[cfg(test)]
 mod test {
 
@@ -250,6 +292,36 @@ mod test {
         }
     }
 
+    #[tokio::test]
+    #[ignore]
+    async fn gcs_test_sign() {
+        crate::test_util::maybe_skip_integration!();
+        let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
+
+        let client = reqwest::Client::new();
+
+        let path = Path::from("test_sign");
+        let url = integration
+            .signed_url(Method::PUT, &path, Duration::from_secs(3600))
+            .await
+            .unwrap();
+        println!("PUT {url}");
+
+        let resp = client.put(url).body("data").send().await.unwrap();
+        resp.error_for_status().unwrap();
+
+        let url = integration
+            .signed_url(Method::GET, &path, Duration::from_secs(3600))
+            .await
+            .unwrap();
+        println!("GET {url}");
+
+        let resp = client.get(url).send().await.unwrap();
+        let resp = resp.error_for_status().unwrap();
+        let data = resp.bytes().await.unwrap();
+        assert_eq!(data.as_ref(), b"data");
+    }
+
     #[tokio::test]
     async fn gcs_test_get_nonexistent_location() {
         crate::test_util::maybe_skip_integration!();
diff --git a/object_store/src/util.rs b/object_store/src/util.rs
index a19d5aab4b5..161d2d138e0 100644
--- a/object_store/src/util.rs
+++ b/object_store/src/util.rs
@@ -285,6 +285,35 @@ impl<T: RangeBounds<usize>> From<T> for GetRange {
         }
     }
 }
+// 
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+//
+// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
+// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ 
).
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = 
percent_encoding::NON_ALPHANUMERIC
+    .remove(b'-')
+    .remove(b'.')
+    .remove(b'_')
+    .remove(b'~');
+
+/// Computes the SHA256 digest of `body` returned as a hex encoded string
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) fn hex_digest(bytes: &[u8]) -> String {
+    let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
+    hex_encode(digest.as_ref())
+}
+
+/// Returns `bytes` as a lower-case hex encoded string
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub(crate) fn hex_encode(bytes: &[u8]) -> String {
+    use std::fmt::Write;
+    let mut out = String::with_capacity(bytes.len() * 2);
+    for byte in bytes {
+        // String writing is infallible
+        let _ = write!(out, "{byte:02x}");
+    }
+    out
+}
 
 #[cfg(test)]
 mod tests {


Reply via email to