This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new def94a839 object_store: Add support for requester pays buckets (#6768)
def94a839 is described below

commit def94a839236f3b04727a07c378668c9ada807f0
Author: Kyle Barron <[email protected]>
AuthorDate: Fri Nov 22 14:55:12 2024 +0000

    object_store: Add support for requester pays buckets (#6768)
    
    * Add support for requester pays buckets
    
    * Add tests
    
    * fix rustdoc
---
 object_store/src/aws/builder.rs    |  24 ++++++++
 object_store/src/aws/client.rs     |   4 +-
 object_store/src/aws/credential.rs | 114 +++++++++++++++++++++++++++++++++++++
 object_store/src/aws/mod.rs        |   3 +-
 4 files changed, 143 insertions(+), 2 deletions(-)

diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index eb79f5e6d..840245a7b 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -170,6 +170,8 @@ pub struct AmazonS3Builder {
     encryption_bucket_key_enabled: Option<ConfigValue<bool>>,
     /// base64-encoded 256-bit customer encryption key for SSE-C.
     encryption_customer_key_base64: Option<String>,
+    /// When set to true, charge requester for bucket operations
+    request_payer: ConfigValue<bool>,
 }
 
 /// Configuration keys for [`AmazonS3Builder`]
@@ -330,6 +332,13 @@ pub enum AmazonS3ConfigKey {
     /// - `s3_express`
     S3Express,
 
+    /// Enable Support for S3 Requester Pays
+    ///
+    /// Supported keys:
+    /// - `aws_request_payer`
+    /// - `request_payer`
+    RequestPayer,
+
     /// Client options
     Client(ClientConfigKey),
 
@@ -358,6 +367,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::CopyIfNotExists => "aws_copy_if_not_exists",
             Self::ConditionalPut => "aws_conditional_put",
             Self::DisableTagging => "aws_disable_tagging",
+            Self::RequestPayer => "aws_request_payer",
             Self::Client(opt) => opt.as_ref(),
             Self::Encryption(opt) => opt.as_ref(),
         }
@@ -389,6 +399,7 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_copy_if_not_exists" | "copy_if_not_exists" => 
Ok(Self::CopyIfNotExists),
             "aws_conditional_put" | "conditional_put" => 
Ok(Self::ConditionalPut),
             "aws_disable_tagging" | "disable_tagging" => 
Ok(Self::DisableTagging),
+            "aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
             // Backwards compatibility
             "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             "aws_server_side_encryption" => Ok(Self::Encryption(
@@ -510,6 +521,9 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::ConditionalPut => {
                 self.conditional_put = 
Some(ConfigValue::Deferred(value.into()))
             }
+            AmazonS3ConfigKey::RequestPayer => {
+                self.request_payer = ConfigValue::Deferred(value.into())
+            }
             AmazonS3ConfigKey::Encryption(key) => match key {
                 S3EncryptionConfigKey::ServerSideEncryption => {
                     self.encryption_type = 
Some(ConfigValue::Deferred(value.into()))
@@ -567,6 +581,7 @@ impl AmazonS3Builder {
                 self.conditional_put.as_ref().map(ToString::to_string)
             }
             AmazonS3ConfigKey::DisableTagging => 
Some(self.disable_tagging.to_string()),
+            AmazonS3ConfigKey::RequestPayer => 
Some(self.request_payer.to_string()),
             AmazonS3ConfigKey::Encryption(key) => match key {
                 S3EncryptionConfigKey::ServerSideEncryption => {
                     self.encryption_type.as_ref().map(ToString::to_string)
@@ -845,6 +860,14 @@ impl AmazonS3Builder {
         self
     }
 
+    /// Set whether to charge requester for bucket operations.
+    ///
+    /// 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html>
+    pub fn with_request_payer(mut self, enabled: bool) -> Self {
+        self.request_payer = ConfigValue::Parsed(enabled);
+        self
+    }
+
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(mut self) -> Result<AmazonS3> {
@@ -996,6 +1019,7 @@ impl AmazonS3Builder {
             copy_if_not_exists,
             conditional_put: put_precondition,
             encryption_headers,
+            request_payer: self.request_payer.get()?,
         };
 
         let client = Arc::new(S3Client::new(config)?);
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 895308f58..b19e0e2ab 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -202,6 +202,7 @@ pub(crate) struct S3Config {
     pub checksum: Option<Checksum>,
     pub copy_if_not_exists: Option<S3CopyIfNotExists>,
     pub conditional_put: Option<S3ConditionalPut>,
+    pub request_payer: bool,
     pub(super) encryption_headers: S3EncryptionHeaders,
 }
 
@@ -249,7 +250,8 @@ impl<'a> SessionCredential<'a> {
     fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
         let mut authorizer =
             AwsAuthorizer::new(self.credential.as_deref()?, "s3", 
&self.config.region)
-                .with_sign_payload(self.config.sign_payload);
+                .with_sign_payload(self.config.sign_payload)
+                .with_request_payer(self.config.request_payer);
 
         if self.session_token {
             let token = HeaderName::from_static("x-amz-s3session-token");
diff --git a/object_store/src/aws/credential.rs 
b/object_store/src/aws/credential.rs
index 33972c6fa..ee2f8e2ec 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -101,11 +101,14 @@ pub struct AwsAuthorizer<'a> {
     region: &'a str,
     token_header: Option<HeaderName>,
     sign_payload: bool,
+    request_payer: bool,
 }
 
 static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date");
 static HASH_HEADER: HeaderName = 
HeaderName::from_static("x-amz-content-sha256");
 static TOKEN_HEADER: HeaderName = 
HeaderName::from_static("x-amz-security-token");
+static REQUEST_PAYER_HEADER: HeaderName = 
HeaderName::from_static("x-amz-request-payer");
+static REQUEST_PAYER_HEADER_VALUE: HeaderValue = 
HeaderValue::from_static("requester");
 const ALGORITHM: &str = "AWS4-HMAC-SHA256";
 
 impl<'a> AwsAuthorizer<'a> {
@@ -118,6 +121,7 @@ impl<'a> AwsAuthorizer<'a> {
             date: None,
             sign_payload: true,
             token_header: None,
+            request_payer: false,
         }
     }
 
@@ -134,6 +138,14 @@ impl<'a> AwsAuthorizer<'a> {
         self
     }
 
+    /// Set whether to include requester pays headers
+    ///
+    /// 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html>
+    pub fn with_request_payer(mut self, request_payer: bool) -> Self {
+        self.request_payer = request_payer;
+        self
+    }
+
     /// Authorize `request` with an optional pre-calculated SHA256 digest by 
attaching
     /// the relevant [AWS SigV4] headers
     ///
@@ -180,6 +192,15 @@ impl<'a> AwsAuthorizer<'a> {
         let header_digest = HeaderValue::from_str(&digest).unwrap();
         request.headers_mut().insert(&HASH_HEADER, header_digest);
 
+        if self.request_payer {
+            // For DELETE, GET, HEAD, POST, and PUT requests, include 
x-amz-request-payer :
+            // requester in the header
+            // 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
+            request
+                .headers_mut()
+                .insert(&REQUEST_PAYER_HEADER, 
REQUEST_PAYER_HEADER_VALUE.clone());
+        }
+
         let (signed_headers, canonical_headers) = 
canonicalize_headers(request.headers());
 
         let scope = self.scope(date);
@@ -226,6 +247,13 @@ impl<'a> AwsAuthorizer<'a> {
             .append_pair("X-Amz-Expires", &expires_in.as_secs().to_string())
             .append_pair("X-Amz-SignedHeaders", "host");
 
+        if self.request_payer {
+            // For signed URLs, include x-amz-request-payer=requester in the 
request
+            // 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
+            url.query_pairs_mut()
+                .append_pair("x-amz-request-payer", "requester");
+        }
+
         // For S3, you must include the X-Amz-Security-Token query parameter 
in the URL if
         // using credentials sourced from the STS service.
         if let Some(ref token) = self.credential.token {
@@ -763,12 +791,53 @@ mod tests {
             region: "us-east-1",
             sign_payload: true,
             token_header: None,
+            request_payer: false,
         };
 
         signer.authorize(&mut request, None);
         assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), 
"AWS4-HMAC-SHA256 
Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, 
SignedHeaders=host;x-amz-content-sha256;x-amz-date, 
Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4")
     }
 
+    #[test]
+    fn test_sign_with_signed_payload_request_payer() {
+        let client = Client::new();
+
+        // Test credentials from 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
+        let credential = AwsCredential {
+            key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
+            secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
+            token: None,
+        };
+
+        // method = 'GET'
+        // service = 'ec2'
+        // host = 'ec2.amazonaws.com'
+        // region = 'us-east-1'
+        // endpoint = 'https://ec2.amazonaws.com'
+        // request_parameters = ''
+        let date = DateTime::parse_from_rfc3339("2022-08-06T18:01:34Z")
+            .unwrap()
+            .with_timezone(&Utc);
+
+        let mut request = client
+            .request(Method::GET, "https://ec2.amazon.com/";)
+            .build()
+            .unwrap();
+
+        let signer = AwsAuthorizer {
+            date: Some(date),
+            credential: &credential,
+            service: "ec2",
+            region: "us-east-1",
+            sign_payload: true,
+            token_header: None,
+            request_payer: true,
+        };
+
+        signer.authorize(&mut request, None);
+        assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(), 
"AWS4-HMAC-SHA256 
Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request, 
SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-request-payer, 
Signature=7030625a9e9b57ed2a40e63d749f4a4b7714b6e15004cab026152f870dd8565d")
+    }
+
     #[test]
     fn test_sign_with_unsigned_payload() {
         let client = Client::new();
@@ -802,6 +871,7 @@ mod tests {
             region: "us-east-1",
             token_header: None,
             sign_payload: false,
+            request_payer: false,
         };
 
         authorizer.authorize(&mut request, None);
@@ -828,6 +898,7 @@ mod tests {
             region: "us-east-1",
             token_header: None,
             sign_payload: false,
+            request_payer: false,
         };
 
         let mut url = 
Url::parse("https://examplebucket.s3.amazonaws.com/test.txt";).unwrap();
@@ -848,6 +919,48 @@ mod tests {
         );
     }
 
+    #[test]
+    fn signed_get_url_request_payer() {
+        // Values from 
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
+        let credential = AwsCredential {
+            key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
+            secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
+            token: None,
+        };
+
+        let date = DateTime::parse_from_rfc3339("2013-05-24T00:00:00Z")
+            .unwrap()
+            .with_timezone(&Utc);
+
+        let authorizer = AwsAuthorizer {
+            date: Some(date),
+            credential: &credential,
+            service: "s3",
+            region: "us-east-1",
+            token_header: None,
+            sign_payload: false,
+            request_payer: true,
+        };
+
+        let mut url = 
Url::parse("https://examplebucket.s3.amazonaws.com/test.txt";).unwrap();
+        authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));
+
+        assert_eq!(
+            url,
+            Url::parse(
+                "https://examplebucket.s3.amazonaws.com/test.txt?\
+                X-Amz-Algorithm=AWS4-HMAC-SHA256&\
+                
X-Amz-Credential=AKIAIOSFODNN7EXAMPLE%2F20130524%2Fus-east-1%2Fs3%2Faws4_request&\
+                X-Amz-Date=20130524T000000Z&\
+                X-Amz-Expires=86400&\
+                X-Amz-SignedHeaders=host&\
+                x-amz-request-payer=requester&\
+                
X-Amz-Signature=9ad7c781cc30121f199b47d35ed3528473e4375b63c5d91cd87c927803e4e00a"
+            )
+            .unwrap()
+        );
+    }
+
     #[test]
     fn test_sign_port() {
         let client = Client::new();
@@ -880,6 +993,7 @@ mod tests {
             region: "us-east-1",
             token_header: None,
             sign_payload: true,
+            request_payer: false,
         };
 
         authorizer.authorize(&mut request, None);
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b238d90eb..81511bad7 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -136,7 +136,8 @@ impl Signer for AmazonS3 {
     /// ```
     async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url> {
         let credential = self.credentials().get_credential().await?;
-        let authorizer = AwsAuthorizer::new(&credential, "s3", 
&self.client.config.region);
+        let authorizer = AwsAuthorizer::new(&credential, "s3", 
&self.client.config.region)
+            .with_request_payer(self.client.config.request_payer);
 
         let path_url = self.path_url(path);
         let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {

Reply via email to