This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 69c937565f Support service_account in ApplicationDefaultCredentials 
and Use SelfSignedJwt (#4926)
69c937565f is described below

commit 69c937565f7404dc1576bc22d153ce79bf107cfb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 16 14:18:53 2023 +0100

    Support service_account in ApplicationDefaultCredentials and Use 
SelfSignedJwt (#4926)
    
    * Support service_account in ApplicationDefaultCredentials
    
    * Use SelfSignedJwt for Service Accounts
    
    * Update CI
    
    * Apply suggestions from code review
    
    Co-authored-by: Marco Neumann <[email protected]>
    
    ---------
    
    Co-authored-by: Marco Neumann <[email protected]>
---
 .github/workflows/object_store.yml |   2 +-
 object_store/src/gcp/credential.rs | 219 ++++++++++++++-----------------------
 object_store/src/gcp/mod.rs        |  45 ++++----
 3 files changed, 108 insertions(+), 158 deletions(-)

diff --git a/.github/workflows/object_store.yml 
b/.github/workflows/object_store.yml
index c28f8037a3..1b991e33c0 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -126,7 +126,7 @@ jobs:
           # Give the container a moment to start up prior to configuring it
           sleep 1
           curl -v -X POST --data-binary '{"name":"test-bucket"}' -H 
"Content-Type: application/json" "http://localhost:4443/storage/v1/b";
-          echo '{"gcs_base_url": "http://localhost:4443";, "disable_oauth": 
true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
+          echo '{"gcs_base_url": "http://localhost:4443";, "disable_oauth": 
true, "client_email": "", "private_key": "", "private_key_id": ""}' > 
"$GOOGLE_SERVICE_ACCOUNT"
 
       - name: Setup WebDav
         run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr 
:80
diff --git a/object_store/src/gcp/credential.rs 
b/object_store/src/gcp/credential.rs
index ad21c33b8b..87f8e244f2 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -17,10 +17,8 @@
 
 use crate::client::retry::RetryExt;
 use crate::client::token::TemporaryToken;
-use crate::client::{TokenCredentialProvider, TokenProvider};
-use crate::gcp::credential::Error::UnsupportedCredentialsType;
-use crate::gcp::{GcpCredentialProvider, STORE};
-use crate::ClientOptions;
+use crate::client::TokenProvider;
+use crate::gcp::STORE;
 use crate::RetryConfig;
 use async_trait::async_trait;
 use base64::prelude::BASE64_URL_SAFE_NO_PAD;
@@ -28,6 +26,7 @@ use base64::Engine;
 use futures::TryFutureExt;
 use reqwest::{Client, Method};
 use ring::signature::RsaKeyPair;
+use serde::Deserialize;
 use snafu::{ResultExt, Snafu};
 use std::env;
 use std::fs::File;
@@ -37,6 +36,10 @@ use std::sync::Arc;
 use std::time::{Duration, Instant};
 use tracing::info;
 
+pub const DEFAULT_SCOPE: &str = 
"https://www.googleapis.com/auth/devstorage.full_control";;
+
+pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";;
+
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("Unable to open service account file from {}: {}", 
path.display(), source))]
@@ -68,9 +71,6 @@ pub enum Error {
 
     #[snafu(display("Error getting token response body: {}", source))]
     TokenResponseBody { source: reqwest::Error },
-
-    #[snafu(display("Unsupported ApplicationCredentials type: {}", type_))]
-    UnsupportedCredentialsType { type_: String },
 }
 
 impl From<Error> for crate::Error {
@@ -92,48 +92,48 @@ pub struct GcpCredential {
 pub type Result<T, E = Error> = std::result::Result<T, E>;
 
 #[derive(Debug, Default, serde::Serialize)]
-pub struct JwtHeader {
+pub struct JwtHeader<'a> {
     /// The type of JWS: it can only be "JWT" here
     ///
     /// Defined in 
[RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub typ: Option<String>,
+    pub typ: Option<&'a str>,
     /// The algorithm used
     ///
     /// Defined in 
[RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1).
-    pub alg: String,
+    pub alg: &'a str,
     /// Content type
     ///
     /// Defined in 
[RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub cty: Option<String>,
+    pub cty: Option<&'a str>,
     /// JSON Key URL
     ///
     /// Defined in 
[RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub jku: Option<String>,
+    pub jku: Option<&'a str>,
     /// Key ID
     ///
     /// Defined in 
[RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub kid: Option<String>,
+    pub kid: Option<&'a str>,
     /// X.509 URL
     ///
     /// Defined in 
[RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub x5u: Option<String>,
+    pub x5u: Option<&'a str>,
     /// X.509 certificate thumbprint
     ///
     /// Defined in 
[RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7).
     #[serde(skip_serializing_if = "Option::is_none")]
-    pub x5t: Option<String>,
+    pub x5t: Option<&'a str>,
 }
 
 #[derive(serde::Serialize)]
 struct TokenClaims<'a> {
     iss: &'a str,
+    sub: &'a str,
     scope: &'a str,
-    aud: &'a str,
     exp: u64,
     iat: u64,
 }
@@ -144,28 +144,32 @@ struct TokenResponse {
     expires_in: u64,
 }
 
-/// Encapsulates the logic to perform an OAuth token challenge
+/// Self-signed JWT (JSON Web Token).
+///
+/// # References
+/// - <https://google.aip.dev/auth/4111>
 #[derive(Debug)]
-pub struct OAuthProvider {
+pub struct SelfSignedJwt {
     issuer: String,
     scope: String,
-    audience: String,
     key_pair: RsaKeyPair,
     jwt_header: String,
     random: ring::rand::SystemRandom,
 }
 
-impl OAuthProvider {
-    /// Create a new [`OAuthProvider`]
+impl SelfSignedJwt {
+    /// Create a new [`SelfSignedJwt`]
     pub fn new(
+        key_id: String,
         issuer: String,
         private_key_pem: String,
         scope: String,
-        audience: String,
     ) -> Result<Self> {
         let key_pair = decode_first_rsa_key(private_key_pem)?;
         let jwt_header = b64_encode_obj(&JwtHeader {
-            alg: "RS256".to_string(),
+            alg: "RS256",
+            typ: Some("JWT"),
+            kid: Some(&key_id),
             ..Default::default()
         })?;
 
@@ -173,7 +177,6 @@ impl OAuthProvider {
             issuer,
             key_pair,
             scope,
-            audience,
             jwt_header,
             random: ring::rand::SystemRandom::new(),
         })
@@ -181,24 +184,24 @@ impl OAuthProvider {
 }
 
 #[async_trait]
-impl TokenProvider for OAuthProvider {
+impl TokenProvider for SelfSignedJwt {
     type Credential = GcpCredential;
 
     /// Fetch a fresh token
     async fn fetch_token(
         &self,
-        client: &Client,
-        retry: &RetryConfig,
+        _client: &Client,
+        _retry: &RetryConfig,
     ) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
         let now = seconds_since_epoch();
         let exp = now + 3600;
 
         let claims = TokenClaims {
             iss: &self.issuer,
+            sub: &self.issuer,
             scope: &self.scope,
-            aud: &self.audience,
-            exp,
             iat: now,
+            exp,
         };
 
         let claim_str = b64_encode_obj(&claims)?;
@@ -214,28 +217,11 @@ impl TokenProvider for OAuthProvider {
             .context(SignSnafu)?;
 
         let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes);
-        let jwt = [message, signature].join(".");
-
-        let body = [
-            ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
-            ("assertion", &jwt),
-        ];
-
-        let response: TokenResponse = client
-            .request(Method::POST, &self.audience)
-            .form(&body)
-            .send_retry(retry)
-            .await
-            .context(TokenRequestSnafu)?
-            .json()
-            .await
-            .context(TokenResponseBodySnafu)?;
+        let bearer = [message, signature].join(".");
 
         Ok(TemporaryToken {
-            token: Arc::new(GcpCredential {
-                bearer: response.access_token,
-            }),
-            expiry: Some(Instant::now() + 
Duration::from_secs(response.expires_in)),
+            token: Arc::new(GcpCredential { bearer }),
+            expiry: Some(Instant::now() + Duration::from_secs(3600)),
         })
     }
 }
@@ -259,29 +245,24 @@ pub struct ServiceAccountCredentials {
     /// The private key in RSA format.
     pub private_key: String,
 
+    /// The private key ID
+    pub private_key_id: String,
+
     /// The email address associated with the service account.
     pub client_email: String,
 
     /// Base URL for GCS
-    #[serde(default = "default_gcs_base_url")]
-    pub gcs_base_url: String,
+    #[serde(default)]
+    pub gcs_base_url: Option<String>,
 
     /// Disable oauth and use empty tokens.
-    #[serde(default = "default_disable_oauth")]
+    #[serde(default)]
     pub disable_oauth: bool,
 }
 
-pub fn default_gcs_base_url() -> String {
-    "https://storage.googleapis.com".to_owned()
-}
-
-pub fn default_disable_oauth() -> bool {
-    false
-}
-
 impl ServiceAccountCredentials {
     /// Create a new [`ServiceAccountCredentials`] from a file.
-    pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
+    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
         read_credentials_file(path)
     }
 
@@ -290,17 +271,20 @@ impl ServiceAccountCredentials {
         serde_json::from_str(key).context(DecodeCredentialsSnafu)
     }
 
-    /// Create an [`OAuthProvider`] from this credentials struct.
-    pub fn oauth_provider(
-        self,
-        scope: &str,
-        audience: &str,
-    ) -> crate::Result<OAuthProvider> {
-        Ok(OAuthProvider::new(
+    /// Create a [`SelfSignedJwt`] from this credentials struct.
+    ///
+    /// We use a scope of [`DEFAULT_SCOPE`] as opposed to an audience
+    /// as GCS appears to not support audience
+    ///
+    /// # References
+    /// - 
<https://stackoverflow.com/questions/63222450/service-account-authorization-without-oauth-can-we-get-file-from-google-cloud/71834557#71834557>
+    /// - 
<https://www.codejam.info/2022/05/google-cloud-service-account-authorization-without-oauth.html>
+    pub fn token_provider(self) -> crate::Result<SelfSignedJwt> {
+        Ok(SelfSignedJwt::new(
+            self.private_key_id,
             self.client_email,
             self.private_key,
-            scope.to_string(),
-            audience.to_string(),
+            DEFAULT_SCOPE.to_string(),
         )?)
     }
 }
@@ -337,25 +321,13 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
 ///
 /// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
 #[derive(Debug, Default)]
-pub struct InstanceCredentialProvider {
-    audience: String,
-}
-
-impl InstanceCredentialProvider {
-    /// Create a new [`InstanceCredentialProvider`], we need to control the 
client in order to enable http access so save the options.
-    pub fn new<T: Into<String>>(audience: T) -> Self {
-        Self {
-            audience: audience.into(),
-        }
-    }
-}
+pub struct InstanceCredentialProvider {}
 
 /// Make a request to the metadata server to fetch a token, using a a given 
hostname.
 async fn make_metadata_request(
     client: &Client,
     hostname: &str,
     retry: &RetryConfig,
-    audience: &str,
 ) -> crate::Result<TokenResponse> {
     let url = format!(
         
"http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token";
@@ -363,7 +335,7 @@ async fn make_metadata_request(
     let response: TokenResponse = client
         .request(Method::GET, url)
         .header("Metadata-Flavor", "Google")
-        .query(&[("audience", audience)])
+        .query(&[("audience", "https://www.googleapis.com/oauth2/v4/token";)])
         .send_retry(retry)
         .await
         .context(TokenRequestSnafu)?
@@ -388,12 +360,9 @@ impl TokenProvider for InstanceCredentialProvider {
         const METADATA_HOST: &str = "metadata";
 
         info!("fetching token from metadata server");
-        let response =
-            make_metadata_request(client, METADATA_HOST, retry, &self.audience)
-                .or_else(|_| {
-                    make_metadata_request(client, METADATA_IP, retry, 
&self.audience)
-                })
-                .await?;
+        let response = make_metadata_request(client, METADATA_HOST, retry)
+            .or_else(|_| make_metadata_request(client, METADATA_IP, retry))
+            .await?;
         let token = TemporaryToken {
             token: Arc::new(GcpCredential {
                 bearer: response.access_token,
@@ -404,62 +373,36 @@ impl TokenProvider for InstanceCredentialProvider {
     }
 }
 
-/// ApplicationDefaultCredentials
-/// <https://google.aip.dev/auth/4110>
-pub fn application_default_credentials(
-    path: Option<&str>,
-    client: &ClientOptions,
-    retry: &RetryConfig,
-) -> crate::Result<Option<GcpCredentialProvider>> {
-    let file = match ApplicationDefaultCredentialsFile::read(path)? {
-        Some(x) => x,
-        None => return Ok(None),
-    };
-
-    match file.type_.as_str() {
-        // <https://google.aip.dev/auth/4113>
-        "authorized_user" => {
-            let token = AuthorizedUserCredentials {
-                client_id: file.client_id,
-                client_secret: file.client_secret,
-                refresh_token: file.refresh_token,
-            };
-
-            Ok(Some(Arc::new(TokenCredentialProvider::new(
-                token,
-                client.client()?,
-                retry.clone(),
-            ))))
-        }
-        type_ => Err(UnsupportedCredentialsType {
-            type_: type_.to_string(),
-        }
-        .into()),
-    }
-}
-
 /// A deserialized `application_default_credentials.json`-file.
-/// 
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+///
+/// # References
+/// - 
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+/// - <https://google.aip.dev/auth/4110>
 #[derive(serde::Deserialize)]
-struct ApplicationDefaultCredentialsFile {
-    #[serde(default)]
-    client_id: String,
-    #[serde(default)]
-    client_secret: String,
-    #[serde(default)]
-    refresh_token: String,
-    #[serde(rename = "type")]
-    type_: String,
+#[serde(tag = "type")]
+pub enum ApplicationDefaultCredentials {
+    /// Service Account.
+    ///
+    /// # References
+    /// - <https://google.aip.dev/auth/4112>
+    #[serde(rename = "service_account")]
+    ServiceAccount(ServiceAccountCredentials),
+    /// Authorized user via "gcloud CLI Integration".
+    ///
+    /// # References
+    /// - <https://google.aip.dev/auth/4113>
+    #[serde(rename = "authorized_user")]
+    AuthorizedUser(AuthorizedUserCredentials),
 }
 
-impl ApplicationDefaultCredentialsFile {
+impl ApplicationDefaultCredentials {
     const CREDENTIALS_PATH: &'static str =
         ".config/gcloud/application_default_credentials.json";
 
     // Create a new application default credential in the following situations:
     //  1. a file is passed in and the type matches.
     //  2. without argument if the well-known configuration file is present.
-    fn read(path: Option<&str>) -> Result<Option<Self>, Error> {
+    pub fn read(path: Option<&str>) -> Result<Option<Self>, Error> {
         if let Some(path) = path {
             return read_credentials_file::<Self>(path).map(Some);
         }
@@ -478,8 +421,8 @@ impl ApplicationDefaultCredentialsFile {
 const DEFAULT_TOKEN_GCP_URI: &str = 
"https://accounts.google.com/o/oauth2/token";;
 
 /// <https://google.aip.dev/auth/4113>
-#[derive(Debug)]
-struct AuthorizedUserCredentials {
+#[derive(Debug, Deserialize)]
+pub struct AuthorizedUserCredentials {
     client_id: String,
     client_secret: String,
     refresh_token: String,
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index f8a16310dd..a75527fe7b 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -57,10 +57,7 @@ use crate::{
     ObjectStore, Result, RetryConfig,
 };
 
-use credential::{
-    application_default_credentials, default_gcs_base_url, 
InstanceCredentialProvider,
-    ServiceAccountCredentials,
-};
+use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
 
 mod credential;
 
@@ -68,6 +65,7 @@ const STORE: &str = "GCS";
 
 /// [`CredentialProvider`] for [`GoogleCloudStorage`]
 pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = 
GcpCredential>>;
+use crate::gcp::credential::{ApplicationDefaultCredentials, 
DEFAULT_GCS_BASE_URL};
 pub use credential::GcpCredential;
 
 #[derive(Debug, Snafu)]
@@ -1034,10 +1032,8 @@ impl GoogleCloudStorageBuilder {
             };
 
         // Then try to initialize from the application credentials file, or 
the environment.
-        let application_default_credentials = application_default_credentials(
+        let application_default_credentials = 
ApplicationDefaultCredentials::read(
             self.application_credentials_path.as_deref(),
-            &self.client_options,
-            &self.retry_config,
         )?;
 
         let disable_oauth = service_account_credentials
@@ -1045,14 +1041,10 @@ impl GoogleCloudStorageBuilder {
             .map(|c| c.disable_oauth)
             .unwrap_or(false);
 
-        let gcs_base_url = service_account_credentials
+        let gcs_base_url: String = service_account_credentials
             .as_ref()
-            .map(|c| c.gcs_base_url.clone())
-            .unwrap_or_else(default_gcs_base_url);
-
-        // TODO: 
https://cloud.google.com/storage/docs/authentication#oauth-scopes
-        let scope = "https://www.googleapis.com/auth/devstorage.full_control";;
-        let audience = "https://www.googleapis.com/oauth2/v4/token";;
+            .and_then(|c| c.gcs_base_url.clone())
+            .unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string());
 
         let credentials = if let Some(credentials) = self.credentials {
             credentials
@@ -1062,15 +1054,30 @@ impl GoogleCloudStorageBuilder {
             })) as _
         } else if let Some(credentials) = service_account_credentials {
             Arc::new(TokenCredentialProvider::new(
-                credentials.oauth_provider(scope, audience)?,
+                credentials.token_provider()?,
                 self.client_options.client()?,
                 self.retry_config.clone(),
             )) as _
         } else if let Some(credentials) = application_default_credentials {
-            credentials
+            match credentials {
+                ApplicationDefaultCredentials::AuthorizedUser(token) => {
+                    Arc::new(TokenCredentialProvider::new(
+                        token,
+                        self.client_options.client()?,
+                        self.retry_config.clone(),
+                    )) as _
+                }
+                ApplicationDefaultCredentials::ServiceAccount(token) => {
+                    Arc::new(TokenCredentialProvider::new(
+                        token.token_provider()?,
+                        self.client_options.client()?,
+                        self.retry_config.clone(),
+                    )) as _
+                }
+            }
         } else {
             Arc::new(TokenCredentialProvider::new(
-                InstanceCredentialProvider::new(audience),
+                InstanceCredentialProvider::default(),
                 self.client_options.metadata_client()?,
                 self.retry_config.clone(),
             )) as _
@@ -1105,7 +1112,7 @@ mod test {
 
     use super::*;
 
-    const FAKE_KEY: &str = r#"{"private_key": "private_key", 
"client_email":"client_email", "disable_oauth":true}"#;
+    const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id": 
"private_key_id", "client_email":"client_email", "disable_oauth":true}"#;
     const NON_EXISTENT_NAME: &str = "nonexistentname";
 
     #[tokio::test]
@@ -1117,7 +1124,7 @@ mod test {
         list_uses_directories_correctly(&integration).await;
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
-        if integration.client.base_url == default_gcs_base_url() {
+        if integration.client.base_url == DEFAULT_GCS_BASE_URL {
             // Fake GCS server doesn't currently honor ifGenerationMatch
             // https://github.com/fsouza/fake-gcs-server/issues/994
             copy_if_not_exists(&integration).await;

Reply via email to