tustvold commented on code in PR #3541:
URL: https://github.com/apache/arrow-rs/pull/3541#discussion_r1083955697


##########
object_store/src/gcp/mod.rs:
##########
@@ -185,32 +181,6 @@ impl From<Error> for super::Error {
     }
 }
 
-/// A deserialized `service-account-********.json`-file.

Review Comment:
   This is moved into credentials



##########
object_store/src/gcp/credential.rs:
##########
@@ -104,6 +115,15 @@ struct TokenResponse {
     expires_in: u64,
 }
 
+#[async_trait]
+pub(crate) trait TokenProvider: std::fmt::Debug + Send + Sync {

Review Comment:
   ```suggestion
   pub trait TokenProvider: std::fmt::Debug + Send + Sync {
   ```
   FWIW this module as a whole is crate private so this could just be pub



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]

Review Comment:
   ```suggestion
   /// 
<https://cloud.google.com/docs/authentication/get-id-token#metadata-server>
   #[derive(Debug, Default)]
   ```



##########
object_store/src/gcp/credential.rs:
##########
@@ -138,9 +158,12 @@ impl OAuthProvider {
             random: ring::rand::SystemRandom::new(),
         })
     }
+}
 
+#[async_trait]
+impl TokenProvider for OAuthProvider {

Review Comment:
   :+1: to using a trait here, whilst it does mean the future is boxed, we're 
doing network IO here so it doesn't matter :smile: 



##########
object_store/src/gcp/credential.rs:
##########
@@ -195,6 +218,88 @@ impl OAuthProvider {
     }
 }
 
+fn reader_credentials_file<T>(

Review Comment:
   ```suggestion
   fn read_credentials_file<T>(
   ```
   Appreciate this was copied, but could take the opportunity to fix the name



##########
object_store/src/gcp/credential.rs:
##########
@@ -195,6 +218,88 @@ impl OAuthProvider {
     }
 }
 
+fn reader_credentials_file<T>(
+    service_account_path: impl AsRef<std::path::Path>,
+) -> Result<T>
+where
+    T: serde::de::DeserializeOwned,
+{
+    let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+    let reader = BufReader::new(file);
+    serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub(crate) struct ServiceAccountCredentials {
+    /// The private key in RSA format.
+    pub private_key: String,
+
+    /// The email address associated with the service account.
+    pub client_email: String,
+
+    /// Base URL for GCS
+    #[serde(default = "default_gcs_base_url")]
+    pub gcs_base_url: String,
+
+    /// Disable oauth and use empty tokens.
+    #[serde(default = "default_disable_oauth")]
+    pub disable_oauth: bool,
+}
+
+pub(crate) fn default_gcs_base_url() -> String {

Review Comment:
   I appreciate these were just copied but they could be constants instead of 
functions FWIW



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+}
+
+impl InstanceCredentialProvider {
+    pub fn new<T: Into<String>>(audience: T) -> Self {
+        Self {
+            audience: audience.into(),
+        }
+    }
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        println!("fetching token from metadata server");
+        const TOKEN_URL: &str =
+            
"http://metadata/computeMetadata/v1/instance/service-accounts/default/token";;
+        let response: TokenResponse = client
+            .request(Method::GET, TOKEN_URL)
+            .header("Metadata-Flavor", "Google")
+            .query(&[("audience", &self.audience)])
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json()
+            .await
+            .context(TokenResponseBodySnafu)?;
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+        Ok(token)
+    }
+}
+
+/// A deserialized `application_default_credentials.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub struct ApplicationDefaultCredentials {
+    client_id: String,
+    client_secret: String,
+    refresh_token: String,
+    #[serde(rename = "type")]
+    type_: String,
+}
+
+impl ApplicationDefaultCredentials {
+    const DEFAULT_TOKEN_GCP_URI: &'static str =
+        "https://accounts.google.com/o/oauth2/token";;
+    const CREDENTIALS_PATH: &'static str =
+        ".config/gcloud/application_default_credentials.json";
+    const EXPECTED_TYPE: &str = "authorized_user";
+
+    // Create a new application default credential in the following situations:
+    //  1. a file is passed in and the type matches.
+    //  2. without argument if the well-known configuration file is present.
+    pub fn new(path: Option<&str>) -> Result<Option<Self>, Error> {
+        if let Some(path) = path {
+            if let Ok(credentials) = reader_credentials_file::<Self>(path) {
+                if credentials.type_ == Self::EXPECTED_TYPE {
+                    return Ok(Some(credentials));
+                }
+            }
+            // Other credential mechanisms may be able to use this path.
+            return Ok(None);

Review Comment:
   I think if a path has been specified and it doesn't exist or is not the 
expected type we should return an error



##########
object_store/src/gcp/mod.rs:
##########
@@ -1098,44 +1083,71 @@ impl GoogleCloudStorageBuilder {
 
         let client = self.client_options.client()?;
 
-        let credentials = match (self.service_account_path, 
self.service_account_key) {
-            (Some(path), None) => reader_credentials_file(path)?,
-            (None, Some(key)) => {
-                serde_json::from_str(&key).context(DecodeCredentialsSnafu)?
-            }
-            (None, None) => return 
Err(Error::MissingServiceAccountPathOrKey.into()),
-            (Some(_), Some(_)) => {
-                return Err(Error::ServiceAccountPathAndKeyProvided.into())
-            }
-        };
+        // First try to initialize from the service account information.
+        let service_account_credentials =
+            match (self.service_account_path, self.service_account_key) {
+                (Some(path), None) => Some(
+                    ServiceAccountCredentials::from_file(path)
+                        .context(CredentialSnafu)?,
+                ),
+                (None, Some(key)) => Some(
+                    
ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?,
+                ),
+                (None, None) => None,
+                (Some(_), Some(_)) => {
+                    return Err(Error::ServiceAccountPathAndKeyProvided.into())
+                }
+            };
+
+        // Then try to initialize from the application credentials file, or 
the environment.
+        let application_default_credentials = 
ApplicationDefaultCredentials::new(
+            self.application_credentials_path.as_deref(),
+        )
+        .context(CredentialSnafu)?;
+
+        let disable_oauth = service_account_credentials
+            .as_ref()
+            .map(|c| c.disable_oauth)
+            .unwrap_or(false);
+
+        let gcs_base_url = service_account_credentials
+            .as_ref()
+            .map(|c| c.gcs_base_url.clone())
+            .unwrap_or_else(default_gcs_base_url);
 
         // TODO: 
https://cloud.google.com/storage/docs/authentication#oauth-scopes
         let scope = "https://www.googleapis.com/auth/devstorage.full_control";;
-        let audience = 
"https://www.googleapis.com/oauth2/v4/token".to_string();
-
-        let oauth_provider = (!credentials.disable_oauth)
-            .then(|| {
-                OAuthProvider::new(
-                    credentials.client_email,
-                    credentials.private_key,
-                    scope.to_string(),
-                    audience,
-                )
-            })
-            .transpose()
-            .context(CredentialSnafu)?;
+        let audience = "https://www.googleapis.com/oauth2/v4/token";;
+
+        let token_provider = if disable_oauth {
+            None
+        } else {
+            let best_provider = service_account_credentials
+                .map(|credentials| credentials.token_provider(scope, audience))
+                .transpose()
+                .context(CredentialSnafu)?
+                .or_else(|| {
+                    application_default_credentials
+                        .map(|a| Box::new(a) as Box<dyn TokenProvider>)
+                })
+                .or_else(|| 
Some(Box::new(InstanceCredentialProvider::new(audience))));
+
+            // A provider is required at this point, bail out if we don't have 
one.
+            if best_provider.is_some() {
+                best_provider
+            } else {
+                return Err(Error::MissingCredentials.into());
+            }

Review Comment:
   ```suggestion
               best_provider.ok_or(Error::MissingCredentials)?
   ```



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+}
+
+impl InstanceCredentialProvider {
+    pub fn new<T: Into<String>>(audience: T) -> Self {
+        Self {
+            audience: audience.into(),
+        }
+    }
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        println!("fetching token from metadata server");
+        const TOKEN_URL: &str =
+            
"http://metadata/computeMetadata/v1/instance/service-accounts/default/token";;
+        let response: TokenResponse = client
+            .request(Method::GET, TOKEN_URL)
+            .header("Metadata-Flavor", "Google")
+            .query(&[("audience", &self.audience)])
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json()
+            .await
+            .context(TokenResponseBodySnafu)?;
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+        Ok(token)
+    }
+}
+
+/// A deserialized `application_default_credentials.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub struct ApplicationDefaultCredentials {
+    client_id: String,
+    client_secret: String,
+    refresh_token: String,
+    #[serde(rename = "type")]
+    type_: String,
+}
+
+impl ApplicationDefaultCredentials {
+    const DEFAULT_TOKEN_GCP_URI: &'static str =
+        "https://accounts.google.com/o/oauth2/token";;
+    const CREDENTIALS_PATH: &'static str =
+        ".config/gcloud/application_default_credentials.json";
+    const EXPECTED_TYPE: &str = "authorized_user";
+
+    // Create a new application default credential in the following situations:
+    //  1. a file is passed in and the type matches.
+    //  2. without argument if the well-known configuration file is present.
+    pub fn new(path: Option<&str>) -> Result<Option<Self>, Error> {
+        if let Some(path) = path {
+            if let Ok(credentials) = reader_credentials_file::<Self>(path) {
+                if credentials.type_ == Self::EXPECTED_TYPE {
+                    return Ok(Some(credentials));
+                }
+            }
+            // Other credential mechanisms may be able to use this path.
+            return Ok(None);
+        }
+        if let Some(home) = env::var_os("HOME") {

Review Comment:
   This is potentially fragile, but adding an additional dependency is probably 
not worth it so LGTM



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+}
+
+impl InstanceCredentialProvider {
+    pub fn new<T: Into<String>>(audience: T) -> Self {
+        Self {
+            audience: audience.into(),
+        }
+    }
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        println!("fetching token from metadata server");
+        const TOKEN_URL: &str =
+            
"http://metadata/computeMetadata/v1/instance/service-accounts/default/token";;
+        let response: TokenResponse = client
+            .request(Method::GET, TOKEN_URL)
+            .header("Metadata-Flavor", "Google")
+            .query(&[("audience", &self.audience)])
+            .send_retry(retry)
+            .await
+            .context(TokenRequestSnafu)?
+            .json()
+            .await
+            .context(TokenResponseBodySnafu)?;
+        let token = TemporaryToken {
+            token: response.access_token,
+            expiry: Instant::now() + Duration::from_secs(response.expires_in),
+        };
+        Ok(token)
+    }
+}
+
+/// A deserialized `application_default_credentials.json`-file.
+#[derive(serde::Deserialize, Debug)]

Review Comment:
   ```suggestion
   /// 
<https://cloud.google.com/docs/authentication/application-default-credentials#personal>
   #[derive(serde::Deserialize, Debug)]
   ```



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+}
+
+impl InstanceCredentialProvider {
+    pub fn new<T: Into<String>>(audience: T) -> Self {
+        Self {
+            audience: audience.into(),
+        }
+    }
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        println!("fetching token from metadata server");

Review Comment:
   ```suggestion
           info!("fetching token from metadata server");
   ```



##########
object_store/src/gcp/credential.rs:
##########
@@ -222,3 +327,118 @@ fn b64_encode_obj<T: serde::Serialize>(obj: &T) -> 
Result<String> {
     let string = serde_json::to_string(obj).context(EncodeSnafu)?;
     Ok(BASE64_URL_SAFE_NO_PAD.encode(string))
 }
+
+/// A provider that uses the Google Cloud Platform metadata server to fetch a 
token.
+#[derive(Debug, Default)]
+pub struct InstanceCredentialProvider {
+    audience: String,
+}
+
+impl InstanceCredentialProvider {
+    pub fn new<T: Into<String>>(audience: T) -> Self {
+        Self {
+            audience: audience.into(),
+        }
+    }
+}
+
+#[async_trait]
+impl TokenProvider for InstanceCredentialProvider {
+    async fn fetch_token(
+        &self,
+        client: &Client,
+        retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        println!("fetching token from metadata server");
+        const TOKEN_URL: &str =
+            
"http://metadata/computeMetadata/v1/instance/service-accounts/default/token";;
+        let response: TokenResponse = client

Review Comment:
   ```suggestion
           // TODO: Add IP fallback
           // See 
https://github.com/googleapis/google-cloud-go/blob/main/compute/metadata/metadata.go#L123
           let response: TokenResponse = client
   ```
   Not something we necessarily need in this PR



##########
object_store/src/gcp/credential.rs:
##########
@@ -195,6 +218,88 @@ impl OAuthProvider {
     }
 }
 
+fn reader_credentials_file<T>(
+    service_account_path: impl AsRef<std::path::Path>,
+) -> Result<T>
+where
+    T: serde::de::DeserializeOwned,
+{
+    let file = File::open(service_account_path).context(OpenCredentialsSnafu)?;
+    let reader = BufReader::new(file);
+    serde_json::from_reader(reader).context(DecodeCredentialsSnafu)
+}
+
+/// A deserialized `service-account-********.json`-file.
+#[derive(serde::Deserialize, Debug)]
+pub(crate) struct ServiceAccountCredentials {
+    /// The private key in RSA format.
+    pub private_key: String,
+
+    /// The email address associated with the service account.
+    pub client_email: String,
+
+    /// Base URL for GCS
+    #[serde(default = "default_gcs_base_url")]
+    pub gcs_base_url: String,
+
+    /// Disable oauth and use empty tokens.
+    #[serde(default = "default_disable_oauth")]
+    pub disable_oauth: bool,
+}
+
+pub(crate) fn default_gcs_base_url() -> String {
+    "https://storage.googleapis.com".to_owned()
+}
+
+pub(crate) fn default_disable_oauth() -> bool {
+    false
+}
+
+impl ServiceAccountCredentials {
+    /// Create a new [`ServiceAccountCredentials`] from a file.
+    pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
+        reader_credentials_file(path)
+    }
+
+    /// Create a new [`ServiceAccountCredentials`] from a string.
+    pub fn from_key(key: &str) -> Result<Self> {
+        serde_json::from_str(key).context(DecodeCredentialsSnafu)
+    }
+
+    /// Create an [`OAuthProvider`] from this credentials struct.
+    pub fn token_provider(
+        self,
+        scope: &str,
+        audience: &str,
+    ) -> Result<Box<dyn TokenProvider>> {
+        Ok(Box::new(OAuthProvider::new(
+            self.client_email,
+            self.private_key,
+            scope.to_string(),
+            audience.to_string(),
+        )?) as Box<dyn TokenProvider>)
+    }
+}
+
+/// A no-op provider that returns empty tokens
+#[derive(Debug)]
+pub struct NoOpProvider;
+
+#[async_trait]
+impl TokenProvider for NoOpProvider {
+    /// Fetch a fresh token
+    async fn fetch_token(
+        &self,
+        _client: &Client,
+        _retry: &RetryConfig,
+    ) -> Result<TemporaryToken<String>> {
+        Ok(TemporaryToken {
+            token: "".to_string(),
+            expiry: Instant::now(),
+        })
+    }
+}
+

Review Comment:
   ```suggestion
   ```
   This doesn't appear to be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to