This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 36752c9  feat: add EKS Pod Identity support (#282) (#333)
36752c9 is described below

commit 36752c975d4f29e20b57c91f81a10872dcd48ae7
Author: Andreas Abros <[email protected]>
AuthorDate: Tue May 6 18:41:22 2025 +0100

    feat: add EKS Pod Identity support (#282) (#333)
    
    * feat: add EKS Pod Identity support (#282)
    
    * Spawn filesystem IO
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 src/aws/builder.rs    |  74 +++++++++++++++++++++++++++++++++-
 src/aws/credential.rs | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 181 insertions(+), 2 deletions(-)

diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 5dff94d..3aaad04 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -17,7 +17,8 @@
 
 use crate::aws::client::{S3Client, S3Config};
 use crate::aws::credential::{
-    InstanceCredentialProvider, SessionProvider, TaskCredentialProvider, 
WebIdentityProvider,
+    EKSPodCredentialProvider, InstanceCredentialProvider, SessionProvider, 
TaskCredentialProvider,
+    WebIdentityProvider,
 };
 use crate::aws::{
     AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, 
S3ConditionalPut, S3CopyIfNotExists,
@@ -151,6 +152,10 @@ pub struct AmazonS3Builder {
     metadata_endpoint: Option<String>,
     /// Container credentials URL, see 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
     container_credentials_relative_uri: Option<String>,
+    /// Container credentials full URL, see 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
+    container_credentials_full_uri: Option<String>,
+    /// Container authorization token file, see 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
+    container_authorization_token_file: Option<String>,
     /// Client options
     client_options: ClientOptions,
     /// Credentials
@@ -299,11 +304,21 @@ pub enum AmazonS3ConfigKey {
     /// - `metadata_endpoint`
     MetadataEndpoint,
 
-    /// Set the container credentials relative URI
+    /// Set the container credentials relative URI when used in ECS
     ///
     /// 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
     ContainerCredentialsRelativeUri,
 
+    /// Set the container credentials full URI when used in EKS
+    ///
+    /// 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
+    ContainerCredentialsFullUri,
+
+    /// Set the authorization token in plain text when used in EKS to 
authenticate with ContainerCredentialsFullUri
+    ///
+    /// 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
+    ContainerAuthorizationTokenFile,
+
     /// Configure how to provide `copy_if_not_exists`
     ///
     /// See [`S3CopyIfNotExists`]
@@ -364,6 +379,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::UnsignedPayload => "aws_unsigned_payload",
             Self::Checksum => "aws_checksum_algorithm",
             Self::ContainerCredentialsRelativeUri => 
"aws_container_credentials_relative_uri",
+            Self::ContainerCredentialsFullUri => 
"aws_container_credentials_full_uri",
+            Self::ContainerAuthorizationTokenFile => 
"aws_container_authorization_token_file",
             Self::SkipSignature => "aws_skip_signature",
             Self::CopyIfNotExists => "aws_copy_if_not_exists",
             Self::ConditionalPut => "aws_conditional_put",
@@ -396,6 +413,8 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_unsigned_payload" | "unsigned_payload" => 
Ok(Self::UnsignedPayload),
             "aws_checksum_algorithm" | "checksum_algorithm" => 
Ok(Self::Checksum),
             "aws_container_credentials_relative_uri" => 
Ok(Self::ContainerCredentialsRelativeUri),
+            "aws_container_credentials_full_uri" => 
Ok(Self::ContainerCredentialsFullUri),
+            "aws_container_authorization_token_file" => 
Ok(Self::ContainerAuthorizationTokenFile),
             "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
             "aws_copy_if_not_exists" | "copy_if_not_exists" => 
Ok(Self::CopyIfNotExists),
             "aws_conditional_put" | "conditional_put" => 
Ok(Self::ConditionalPut),
@@ -440,6 +459,8 @@ impl AmazonS3Builder {
     /// * `AWS_ENDPOINT` -> endpoint
     /// * `AWS_SESSION_TOKEN` -> token
     /// * `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` -> 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
+    /// * `AWS_CONTAINER_CREDENTIALS_FULL_URI` -> 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
+    /// * `AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE` -> 
<https://docs.aws.amazon.com/sdkref/latest/guide/feature-container-credentials.html>
     /// * `AWS_ALLOW_HTTP` -> set to "true" to permit HTTP connections without 
TLS
     /// * `AWS_REQUEST_PAYER` -> set to "true" to permit operations on 
requester-pays buckets.
     /// # Example
@@ -516,6 +537,12 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
                 self.container_credentials_relative_uri = Some(value.into())
             }
+            AmazonS3ConfigKey::ContainerCredentialsFullUri => {
+                self.container_credentials_full_uri = Some(value.into());
+            }
+            AmazonS3ConfigKey::ContainerAuthorizationTokenFile => {
+                self.container_authorization_token_file = Some(value.into());
+            }
             AmazonS3ConfigKey::Client(key) => {
                 self.client_options = self.client_options.with_config(key, 
value)
             }
@@ -579,6 +606,12 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
                 self.container_credentials_relative_uri.clone()
             }
+            AmazonS3ConfigKey::ContainerCredentialsFullUri => {
+                self.container_credentials_full_uri.clone()
+            }
+            AmazonS3ConfigKey::ContainerAuthorizationTokenFile => {
+                self.container_authorization_token_file.clone()
+            }
             AmazonS3ConfigKey::SkipSignature => 
Some(self.skip_signature.to_string()),
             AmazonS3ConfigKey::CopyIfNotExists => {
                 self.copy_if_not_exists.as_ref().map(ToString::to_string)
@@ -961,6 +994,21 @@ impl AmazonS3Builder {
                 client: http.connect(&options)?,
                 cache: Default::default(),
             }) as _
+        } else if let (Some(full_uri), Some(token_file)) = (
+            self.container_credentials_full_uri,
+            self.container_authorization_token_file,
+        ) {
+            info!("Using EKS Pod Identity credential provider");
+
+            let options = self.client_options.clone().with_allow_http(true);
+
+            Arc::new(EKSPodCredentialProvider {
+                url: full_uri,
+                token_file,
+                retry: self.retry_config.clone(),
+                client: http.connect(&options)?,
+                cache: Default::default(),
+            }) as _
         } else {
             info!("Using Instance credential provider");
 
@@ -1541,4 +1589,26 @@ mod tests {
             panic!("{} not propagated as ClientConfigKey", key);
         }
     }
+
+    #[test]
+    fn test_builder_eks_with_config() {
+        let builder = AmazonS3Builder::new()
+            .with_bucket_name("some-bucket")
+            .with_config(
+                AmazonS3ConfigKey::ContainerCredentialsFullUri,
+                "https://127.0.0.1/eks-credentials";,
+            )
+            .with_config(
+                AmazonS3ConfigKey::ContainerAuthorizationTokenFile,
+                "/tmp/fake-bearer-token",
+            );
+
+        let s3 = builder.build().expect("should build successfully");
+        let creds = &s3.client.config.credentials;
+        let debug_str = format!("{:?}", creds);
+        assert!(
+            debug_str.contains("EKSPodCredentialProvider"),
+            "expected EKS provider but got: {debug_str}"
+        );
+    }
 }
diff --git a/src/aws/credential.rs b/src/aws/credential.rs
index 1b62842..56aa952 100644
--- a/src/aws/credential.rs
+++ b/src/aws/credential.rs
@@ -719,6 +719,73 @@ async fn task_credential(
     })
 }
 
+/// EKS Pod Identity credential provider.
+///
+/// Uses the endpoint in `AWS_CONTAINER_CREDENTIALS_FULL_URI`
+/// and the bearer token in `AWS_CONTAINER_AUTHORIZATION_TOKEN_FILE`
+/// to fetch ephemeral AWS credentials from an EKS pod.
+#[derive(Debug)]
+pub(crate) struct EKSPodCredentialProvider {
+    pub url: String,
+    pub token_file: String,
+    pub retry: RetryConfig,
+    pub client: HttpClient,
+    pub cache: TokenCache<Arc<AwsCredential>>,
+}
+
+#[async_trait]
+impl CredentialProvider for EKSPodCredentialProvider {
+    type Credential = AwsCredential;
+
+    async fn get_credential(&self) -> Result<Arc<AwsCredential>> {
+        self.cache
+            .get_or_insert_with(|| {
+                eks_credential(&self.client, &self.retry, &self.url, 
&self.token_file)
+            })
+            .await
+            .map_err(|source| crate::Error::Generic {
+                store: STORE,
+                source,
+            })
+    }
+}
+
+/// Performs the actual credential retrieval and parsing for 
`EKSPodCredentialProvider`.
+///
+/// 
<https://mozillazg.com/2023/12/security-deep-dive-into-aws-eks-pod-identity-feature-en.html>
+async fn eks_credential(
+    client: &HttpClient,
+    retry: &RetryConfig,
+    url: &str,
+    token_file: &str,
+) -> Result<TemporaryToken<Arc<AwsCredential>>, StdError> {
+    // Spawn IO to blocking tokio pool if running in tokio context
+    let token = match tokio::runtime::Handle::try_current() {
+        Ok(runtime) => {
+            let path = token_file.to_string();
+            runtime
+                .spawn_blocking(move || std::fs::read_to_string(&path))
+                .await?
+        }
+        Err(_) => std::fs::read_to_string(token_file),
+    }
+    .map_err(|e| format!("Failed to read EKS token file '{token_file}': 
{e}"))?;
+
+    let mut req = client.request(Method::GET, url);
+    req = req.header("Authorization", token);
+
+    // The JSON from the EKS credential endpoint has the same shape as ECS 
task credentials
+    let creds: InstanceCredentials = 
req.send_retry(retry).await?.into_body().json().await?;
+
+    let now = Utc::now();
+    let ttl = (creds.expiration - now).to_std().unwrap_or_default();
+
+    Ok(TemporaryToken {
+        token: Arc::new(creds.into()),
+        expiry: Some(Instant::now() + ttl),
+    })
+}
+
 /// A session provider as used by S3 Express One Zone
 ///
 /// <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateSession.html>
@@ -773,6 +840,7 @@ struct CreateSessionOutput {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::aws::{AmazonS3Builder, AmazonS3ConfigKey};
     use crate::client::mock_server::MockServer;
     use crate::client::HttpClient;
     use http::Response;
@@ -1156,4 +1224,45 @@ mod tests {
             .await
             .unwrap_err();
     }
+
+    #[tokio::test]
+    async fn test_eks_pod_credential_provider() {
+        use crate::client::mock_server::MockServer;
+        use http::Response;
+        use std::fs::File;
+        use std::io::Write;
+
+        let mock_server = MockServer::new().await;
+
+        mock_server.push(Response::new(
+            r#"{
+            "AccessKeyId": "TEST_KEY",
+            "SecretAccessKey": "TEST_SECRET",
+            "Token": "TEST_SESSION_TOKEN",
+            "Expiration": "2100-01-01T00:00:00Z"
+        }"#
+            .to_string(),
+        ));
+
+        let token_file = tempfile::NamedTempFile::new().expect("cannot create 
temp file");
+        let path = token_file.path().to_string_lossy().into_owned();
+        let mut f = File::create(token_file.path()).unwrap();
+        write!(f, "TEST_BEARER_TOKEN").unwrap();
+
+        let builder = AmazonS3Builder::new()
+            .with_bucket_name("some-bucket")
+            .with_config(
+                AmazonS3ConfigKey::ContainerCredentialsFullUri,
+                mock_server.url(),
+            )
+            .with_config(AmazonS3ConfigKey::ContainerAuthorizationTokenFile, 
&path);
+
+        let s3 = builder.build().unwrap();
+
+        let cred = 
s3.client.config.credentials.get_credential().await.unwrap();
+
+        assert_eq!(cred.key_id, "TEST_KEY");
+        assert_eq!(cred.secret_key, "TEST_SECRET");
+        assert_eq!(cred.token.as_deref(), Some("TEST_SESSION_TOKEN"));
+    }
 }

Reply via email to