This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 69c937565f Support service_account in ApplicationDefaultCredentials
and Use SelfSignedJwt (#4926)
69c937565f is described below
commit 69c937565f7404dc1576bc22d153ce79bf107cfb
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon Oct 16 14:18:53 2023 +0100
Support service_account in ApplicationDefaultCredentials and Use
SelfSignedJwt (#4926)
* Support service_account in ApplicationDefaultCredentials
* Use SelfSignedJwt for Service Accounts
* Update CI
* Apply suggestions from code review
Co-authored-by: Marco Neumann <[email protected]>
---------
Co-authored-by: Marco Neumann <[email protected]>
---
.github/workflows/object_store.yml | 2 +-
object_store/src/gcp/credential.rs | 219 ++++++++++++++-----------------------
object_store/src/gcp/mod.rs | 45 ++++----
3 files changed, 108 insertions(+), 158 deletions(-)
diff --git a/.github/workflows/object_store.yml
b/.github/workflows/object_store.yml
index c28f8037a3..1b991e33c0 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -126,7 +126,7 @@ jobs:
# Give the container a moment to start up prior to configuring it
sleep 1
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H
"Content-Type: application/json" "http://localhost:4443/storage/v1/b"
- echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth":
true, "client_email": "", "private_key": ""}' > "$GOOGLE_SERVICE_ACCOUNT"
+ echo '{"gcs_base_url": "http://localhost:4443", "disable_oauth":
true, "client_email": "", "private_key": "", "private_key_id": ""}' >
"$GOOGLE_SERVICE_ACCOUNT"
- name: Setup WebDav
run: docker run -d -p 8080:80 rclone/rclone serve webdav /data --addr
:80
diff --git a/object_store/src/gcp/credential.rs
b/object_store/src/gcp/credential.rs
index ad21c33b8b..87f8e244f2 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -17,10 +17,8 @@
use crate::client::retry::RetryExt;
use crate::client::token::TemporaryToken;
-use crate::client::{TokenCredentialProvider, TokenProvider};
-use crate::gcp::credential::Error::UnsupportedCredentialsType;
-use crate::gcp::{GcpCredentialProvider, STORE};
-use crate::ClientOptions;
+use crate::client::TokenProvider;
+use crate::gcp::STORE;
use crate::RetryConfig;
use async_trait::async_trait;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
@@ -28,6 +26,7 @@ use base64::Engine;
use futures::TryFutureExt;
use reqwest::{Client, Method};
use ring::signature::RsaKeyPair;
+use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::env;
use std::fs::File;
@@ -37,6 +36,10 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::info;
+pub const DEFAULT_SCOPE: &str =
"https://www.googleapis.com/auth/devstorage.full_control";
+
+pub const DEFAULT_GCS_BASE_URL: &str = "https://storage.googleapis.com";
+
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Unable to open service account file from {}: {}",
path.display(), source))]
@@ -68,9 +71,6 @@ pub enum Error {
#[snafu(display("Error getting token response body: {}", source))]
TokenResponseBody { source: reqwest::Error },
-
- #[snafu(display("Unsupported ApplicationCredentials type: {}", type_))]
- UnsupportedCredentialsType { type_: String },
}
impl From<Error> for crate::Error {
@@ -92,48 +92,48 @@ pub struct GcpCredential {
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Default, serde::Serialize)]
-pub struct JwtHeader {
+pub struct JwtHeader<'a> {
/// The type of JWS: it can only be "JWT" here
///
/// Defined in
[RFC7515#4.1.9](https://tools.ietf.org/html/rfc7515#section-4.1.9).
#[serde(skip_serializing_if = "Option::is_none")]
- pub typ: Option<String>,
+ pub typ: Option<&'a str>,
/// The algorithm used
///
/// Defined in
[RFC7515#4.1.1](https://tools.ietf.org/html/rfc7515#section-4.1.1).
- pub alg: String,
+ pub alg: &'a str,
/// Content type
///
/// Defined in
[RFC7519#5.2](https://tools.ietf.org/html/rfc7519#section-5.2).
#[serde(skip_serializing_if = "Option::is_none")]
- pub cty: Option<String>,
+ pub cty: Option<&'a str>,
/// JSON Key URL
///
/// Defined in
[RFC7515#4.1.2](https://tools.ietf.org/html/rfc7515#section-4.1.2).
#[serde(skip_serializing_if = "Option::is_none")]
- pub jku: Option<String>,
+ pub jku: Option<&'a str>,
/// Key ID
///
/// Defined in
[RFC7515#4.1.4](https://tools.ietf.org/html/rfc7515#section-4.1.4).
#[serde(skip_serializing_if = "Option::is_none")]
- pub kid: Option<String>,
+ pub kid: Option<&'a str>,
/// X.509 URL
///
/// Defined in
[RFC7515#4.1.5](https://tools.ietf.org/html/rfc7515#section-4.1.5).
#[serde(skip_serializing_if = "Option::is_none")]
- pub x5u: Option<String>,
+ pub x5u: Option<&'a str>,
/// X.509 certificate thumbprint
///
/// Defined in
[RFC7515#4.1.7](https://tools.ietf.org/html/rfc7515#section-4.1.7).
#[serde(skip_serializing_if = "Option::is_none")]
- pub x5t: Option<String>,
+ pub x5t: Option<&'a str>,
}
#[derive(serde::Serialize)]
struct TokenClaims<'a> {
iss: &'a str,
+ sub: &'a str,
scope: &'a str,
- aud: &'a str,
exp: u64,
iat: u64,
}
@@ -144,28 +144,32 @@ struct TokenResponse {
expires_in: u64,
}
-/// Encapsulates the logic to perform an OAuth token challenge
+/// Self-signed JWT (JSON Web Token).
+///
+/// # References
+/// - <https://google.aip.dev/auth/4111>
#[derive(Debug)]
-pub struct OAuthProvider {
+pub struct SelfSignedJwt {
issuer: String,
scope: String,
- audience: String,
key_pair: RsaKeyPair,
jwt_header: String,
random: ring::rand::SystemRandom,
}
-impl OAuthProvider {
- /// Create a new [`OAuthProvider`]
+impl SelfSignedJwt {
+ /// Create a new [`SelfSignedJwt`]
pub fn new(
+ key_id: String,
issuer: String,
private_key_pem: String,
scope: String,
- audience: String,
) -> Result<Self> {
let key_pair = decode_first_rsa_key(private_key_pem)?;
let jwt_header = b64_encode_obj(&JwtHeader {
- alg: "RS256".to_string(),
+ alg: "RS256",
+ typ: Some("JWT"),
+ kid: Some(&key_id),
..Default::default()
})?;
@@ -173,7 +177,6 @@ impl OAuthProvider {
issuer,
key_pair,
scope,
- audience,
jwt_header,
random: ring::rand::SystemRandom::new(),
})
@@ -181,24 +184,24 @@ impl OAuthProvider {
}
#[async_trait]
-impl TokenProvider for OAuthProvider {
+impl TokenProvider for SelfSignedJwt {
type Credential = GcpCredential;
/// Fetch a fresh token
async fn fetch_token(
&self,
- client: &Client,
- retry: &RetryConfig,
+ _client: &Client,
+ _retry: &RetryConfig,
) -> crate::Result<TemporaryToken<Arc<GcpCredential>>> {
let now = seconds_since_epoch();
let exp = now + 3600;
let claims = TokenClaims {
iss: &self.issuer,
+ sub: &self.issuer,
scope: &self.scope,
- aud: &self.audience,
- exp,
iat: now,
+ exp,
};
let claim_str = b64_encode_obj(&claims)?;
@@ -214,28 +217,11 @@ impl TokenProvider for OAuthProvider {
.context(SignSnafu)?;
let signature = BASE64_URL_SAFE_NO_PAD.encode(sig_bytes);
- let jwt = [message, signature].join(".");
-
- let body = [
- ("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer"),
- ("assertion", &jwt),
- ];
-
- let response: TokenResponse = client
- .request(Method::POST, &self.audience)
- .form(&body)
- .send_retry(retry)
- .await
- .context(TokenRequestSnafu)?
- .json()
- .await
- .context(TokenResponseBodySnafu)?;
+ let bearer = [message, signature].join(".");
Ok(TemporaryToken {
- token: Arc::new(GcpCredential {
- bearer: response.access_token,
- }),
- expiry: Some(Instant::now() +
Duration::from_secs(response.expires_in)),
+ token: Arc::new(GcpCredential { bearer }),
+ expiry: Some(Instant::now() + Duration::from_secs(3600)),
})
}
}
@@ -259,29 +245,24 @@ pub struct ServiceAccountCredentials {
/// The private key in RSA format.
pub private_key: String,
+ /// The private key ID
+ pub private_key_id: String,
+
/// The email address associated with the service account.
pub client_email: String,
/// Base URL for GCS
- #[serde(default = "default_gcs_base_url")]
- pub gcs_base_url: String,
+ #[serde(default)]
+ pub gcs_base_url: Option<String>,
/// Disable oauth and use empty tokens.
- #[serde(default = "default_disable_oauth")]
+ #[serde(default)]
pub disable_oauth: bool,
}
-pub fn default_gcs_base_url() -> String {
- "https://storage.googleapis.com".to_owned()
-}
-
-pub fn default_disable_oauth() -> bool {
- false
-}
-
impl ServiceAccountCredentials {
/// Create a new [`ServiceAccountCredentials`] from a file.
- pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
+ pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
read_credentials_file(path)
}
@@ -290,17 +271,20 @@ impl ServiceAccountCredentials {
serde_json::from_str(key).context(DecodeCredentialsSnafu)
}
- /// Create an [`OAuthProvider`] from this credentials struct.
- pub fn oauth_provider(
- self,
- scope: &str,
- audience: &str,
- ) -> crate::Result<OAuthProvider> {
- Ok(OAuthProvider::new(
+ /// Create a [`SelfSignedJwt`] from this credentials struct.
+ ///
+ /// We use a scope of [`DEFAULT_SCOPE`] as opposed to an audience
+ /// as GCS appears to not support audience
+ ///
+ /// # References
+ /// -
<https://stackoverflow.com/questions/63222450/service-account-authorization-without-oauth-can-we-get-file-from-google-cloud/71834557#71834557>
+ /// -
<https://www.codejam.info/2022/05/google-cloud-service-account-authorization-without-oauth.html>
+ pub fn token_provider(self) -> crate::Result<SelfSignedJwt> {
+ Ok(SelfSignedJwt::new(
+ self.private_key_id,
self.client_email,
self.private_key,
- scope.to_string(),
- audience.to_string(),
+ DEFAULT_SCOPE.to_string(),
)?)
}
}
@@ -337,25 +321,13 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) ->
Result<String> {
///
/// <https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
#[derive(Debug, Default)]
-pub struct InstanceCredentialProvider {
- audience: String,
-}
-
-impl InstanceCredentialProvider {
- /// Create a new [`InstanceCredentialProvider`], we need to control the
client in order to enable http access so save the options.
- pub fn new<T: Into<String>>(audience: T) -> Self {
- Self {
- audience: audience.into(),
- }
- }
-}
+pub struct InstanceCredentialProvider {}
/// Make a request to the metadata server to fetch a token, using a a given
hostname.
async fn make_metadata_request(
client: &Client,
hostname: &str,
retry: &RetryConfig,
- audience: &str,
) -> crate::Result<TokenResponse> {
let url = format!(
"http://{hostname}/computeMetadata/v1/instance/service-accounts/default/token"
@@ -363,7 +335,7 @@ async fn make_metadata_request(
let response: TokenResponse = client
.request(Method::GET, url)
.header("Metadata-Flavor", "Google")
- .query(&[("audience", audience)])
+ .query(&[("audience", "https://www.googleapis.com/oauth2/v4/token")])
.send_retry(retry)
.await
.context(TokenRequestSnafu)?
@@ -388,12 +360,9 @@ impl TokenProvider for InstanceCredentialProvider {
const METADATA_HOST: &str = "metadata";
info!("fetching token from metadata server");
- let response =
- make_metadata_request(client, METADATA_HOST, retry, &self.audience)
- .or_else(|_| {
- make_metadata_request(client, METADATA_IP, retry,
&self.audience)
- })
- .await?;
+ let response = make_metadata_request(client, METADATA_HOST, retry)
+ .or_else(|_| make_metadata_request(client, METADATA_IP, retry))
+ .await?;
let token = TemporaryToken {
token: Arc::new(GcpCredential {
bearer: response.access_token,
@@ -404,62 +373,36 @@ impl TokenProvider for InstanceCredentialProvider {
}
}
-/// ApplicationDefaultCredentials
-/// <https://google.aip.dev/auth/4110>
-pub fn application_default_credentials(
- path: Option<&str>,
- client: &ClientOptions,
- retry: &RetryConfig,
-) -> crate::Result<Option<GcpCredentialProvider>> {
- let file = match ApplicationDefaultCredentialsFile::read(path)? {
- Some(x) => x,
- None => return Ok(None),
- };
-
- match file.type_.as_str() {
- // <https://google.aip.dev/auth/4113>
- "authorized_user" => {
- let token = AuthorizedUserCredentials {
- client_id: file.client_id,
- client_secret: file.client_secret,
- refresh_token: file.refresh_token,
- };
-
- Ok(Some(Arc::new(TokenCredentialProvider::new(
- token,
- client.client()?,
- retry.clone(),
- ))))
- }
- type_ => Err(UnsupportedCredentialsType {
- type_: type_.to_string(),
- }
- .into()),
- }
-}
-
/// A deserialized `application_default_credentials.json`-file.
-///
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+///
+/// # References
+/// -
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
+/// - <https://google.aip.dev/auth/4110>
#[derive(serde::Deserialize)]
-struct ApplicationDefaultCredentialsFile {
- #[serde(default)]
- client_id: String,
- #[serde(default)]
- client_secret: String,
- #[serde(default)]
- refresh_token: String,
- #[serde(rename = "type")]
- type_: String,
+#[serde(tag = "type")]
+pub enum ApplicationDefaultCredentials {
+ /// Service Account.
+ ///
+ /// # References
+ /// - <https://google.aip.dev/auth/4112>
+ #[serde(rename = "service_account")]
+ ServiceAccount(ServiceAccountCredentials),
+ /// Authorized user via "gcloud CLI Integration".
+ ///
+ /// # References
+ /// - <https://google.aip.dev/auth/4113>
+ #[serde(rename = "authorized_user")]
+ AuthorizedUser(AuthorizedUserCredentials),
}
-impl ApplicationDefaultCredentialsFile {
+impl ApplicationDefaultCredentials {
const CREDENTIALS_PATH: &'static str =
".config/gcloud/application_default_credentials.json";
// Create a new application default credential in the following situations:
// 1. a file is passed in and the type matches.
// 2. without argument if the well-known configuration file is present.
- fn read(path: Option<&str>) -> Result<Option<Self>, Error> {
+ pub fn read(path: Option<&str>) -> Result<Option<Self>, Error> {
if let Some(path) = path {
return read_credentials_file::<Self>(path).map(Some);
}
@@ -478,8 +421,8 @@ impl ApplicationDefaultCredentialsFile {
const DEFAULT_TOKEN_GCP_URI: &str =
"https://accounts.google.com/o/oauth2/token";
/// <https://google.aip.dev/auth/4113>
-#[derive(Debug)]
-struct AuthorizedUserCredentials {
+#[derive(Debug, Deserialize)]
+pub struct AuthorizedUserCredentials {
client_id: String,
client_secret: String,
refresh_token: String,
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index f8a16310dd..a75527fe7b 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -57,10 +57,7 @@ use crate::{
ObjectStore, Result, RetryConfig,
};
-use credential::{
- application_default_credentials, default_gcs_base_url,
InstanceCredentialProvider,
- ServiceAccountCredentials,
-};
+use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
mod credential;
@@ -68,6 +65,7 @@ const STORE: &str = "GCS";
/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential =
GcpCredential>>;
+use crate::gcp::credential::{ApplicationDefaultCredentials,
DEFAULT_GCS_BASE_URL};
pub use credential::GcpCredential;
#[derive(Debug, Snafu)]
@@ -1034,10 +1032,8 @@ impl GoogleCloudStorageBuilder {
};
// Then try to initialize from the application credentials file, or
the environment.
- let application_default_credentials = application_default_credentials(
+ let application_default_credentials =
ApplicationDefaultCredentials::read(
self.application_credentials_path.as_deref(),
- &self.client_options,
- &self.retry_config,
)?;
let disable_oauth = service_account_credentials
@@ -1045,14 +1041,10 @@ impl GoogleCloudStorageBuilder {
.map(|c| c.disable_oauth)
.unwrap_or(false);
- let gcs_base_url = service_account_credentials
+ let gcs_base_url: String = service_account_credentials
.as_ref()
- .map(|c| c.gcs_base_url.clone())
- .unwrap_or_else(default_gcs_base_url);
-
- // TODO:
https://cloud.google.com/storage/docs/authentication#oauth-scopes
- let scope = "https://www.googleapis.com/auth/devstorage.full_control";
- let audience = "https://www.googleapis.com/oauth2/v4/token";
+ .and_then(|c| c.gcs_base_url.clone())
+ .unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string());
let credentials = if let Some(credentials) = self.credentials {
credentials
@@ -1062,15 +1054,30 @@ impl GoogleCloudStorageBuilder {
})) as _
} else if let Some(credentials) = service_account_credentials {
Arc::new(TokenCredentialProvider::new(
- credentials.oauth_provider(scope, audience)?,
+ credentials.token_provider()?,
self.client_options.client()?,
self.retry_config.clone(),
)) as _
} else if let Some(credentials) = application_default_credentials {
- credentials
+ match credentials {
+ ApplicationDefaultCredentials::AuthorizedUser(token) => {
+ Arc::new(TokenCredentialProvider::new(
+ token,
+ self.client_options.client()?,
+ self.retry_config.clone(),
+ )) as _
+ }
+ ApplicationDefaultCredentials::ServiceAccount(token) => {
+ Arc::new(TokenCredentialProvider::new(
+ token.token_provider()?,
+ self.client_options.client()?,
+ self.retry_config.clone(),
+ )) as _
+ }
+ }
} else {
Arc::new(TokenCredentialProvider::new(
- InstanceCredentialProvider::new(audience),
+ InstanceCredentialProvider::default(),
self.client_options.metadata_client()?,
self.retry_config.clone(),
)) as _
@@ -1105,7 +1112,7 @@ mod test {
use super::*;
- const FAKE_KEY: &str = r#"{"private_key": "private_key",
"client_email":"client_email", "disable_oauth":true}"#;
+ const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id":
"private_key_id", "client_email":"client_email", "disable_oauth":true}"#;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
@@ -1117,7 +1124,7 @@ mod test {
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
- if integration.client.base_url == default_gcs_base_url() {
+ if integration.client.base_url == DEFAULT_GCS_BASE_URL {
// Fake GCS server doesn't currently honor ifGenerationMatch
// https://github.com/fsouza/fake-gcs-server/issues/994
copy_if_not_exists(&integration).await;