This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new def94a839 object_store: Add support for requester pays buckets (#6768)
def94a839 is described below
commit def94a839236f3b04727a07c378668c9ada807f0
Author: Kyle Barron <[email protected]>
AuthorDate: Fri Nov 22 14:55:12 2024 +0000
object_store: Add support for requester pays buckets (#6768)
* Add support for requester pays buckets
* Add tests
* fix rustdoc
---
object_store/src/aws/builder.rs | 24 ++++++++
object_store/src/aws/client.rs | 4 +-
object_store/src/aws/credential.rs | 114 +++++++++++++++++++++++++++++++++++++
object_store/src/aws/mod.rs | 3 +-
4 files changed, 143 insertions(+), 2 deletions(-)
diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index eb79f5e6d..840245a7b 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -170,6 +170,8 @@ pub struct AmazonS3Builder {
encryption_bucket_key_enabled: Option<ConfigValue<bool>>,
/// base64-encoded 256-bit customer encryption key for SSE-C.
encryption_customer_key_base64: Option<String>,
+ /// When set to true, charge requester for bucket operations
+ request_payer: ConfigValue<bool>,
}
/// Configuration keys for [`AmazonS3Builder`]
@@ -330,6 +332,13 @@ pub enum AmazonS3ConfigKey {
/// - `s3_express`
S3Express,
+ /// Enable Support for S3 Requester Pays
+ ///
+ /// Supported keys:
+ /// - `aws_request_payer`
+ /// - `request_payer`
+ RequestPayer,
+
/// Client options
Client(ClientConfigKey),
@@ -358,6 +367,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::CopyIfNotExists => "aws_copy_if_not_exists",
Self::ConditionalPut => "aws_conditional_put",
Self::DisableTagging => "aws_disable_tagging",
+ Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
}
@@ -389,6 +399,7 @@ impl FromStr for AmazonS3ConfigKey {
"aws_copy_if_not_exists" | "copy_if_not_exists" =>
Ok(Self::CopyIfNotExists),
"aws_conditional_put" | "conditional_put" =>
Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" =>
Ok(Self::DisableTagging),
+ "aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" => Ok(Self::Encryption(
@@ -510,6 +521,9 @@ impl AmazonS3Builder {
AmazonS3ConfigKey::ConditionalPut => {
self.conditional_put =
Some(ConfigValue::Deferred(value.into()))
}
+ AmazonS3ConfigKey::RequestPayer => {
+ self.request_payer = ConfigValue::Deferred(value.into())
+ }
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type =
Some(ConfigValue::Deferred(value.into()))
@@ -567,6 +581,7 @@ impl AmazonS3Builder {
self.conditional_put.as_ref().map(ToString::to_string)
}
AmazonS3ConfigKey::DisableTagging =>
Some(self.disable_tagging.to_string()),
+ AmazonS3ConfigKey::RequestPayer =>
Some(self.request_payer.to_string()),
AmazonS3ConfigKey::Encryption(key) => match key {
S3EncryptionConfigKey::ServerSideEncryption => {
self.encryption_type.as_ref().map(ToString::to_string)
@@ -845,6 +860,14 @@ impl AmazonS3Builder {
self
}
+ /// Set whether to charge requester for bucket operations.
+ ///
+ ///
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html>
+ pub fn with_request_payer(mut self, enabled: bool) -> Self {
+ self.request_payer = ConfigValue::Parsed(enabled);
+ self
+ }
+
/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
@@ -996,6 +1019,7 @@ impl AmazonS3Builder {
copy_if_not_exists,
conditional_put: put_precondition,
encryption_headers,
+ request_payer: self.request_payer.get()?,
};
let client = Arc::new(S3Client::new(config)?);
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 895308f58..b19e0e2ab 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -202,6 +202,7 @@ pub(crate) struct S3Config {
pub checksum: Option<Checksum>,
pub copy_if_not_exists: Option<S3CopyIfNotExists>,
pub conditional_put: Option<S3ConditionalPut>,
+ pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
}
@@ -249,7 +250,8 @@ impl<'a> SessionCredential<'a> {
fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
let mut authorizer =
AwsAuthorizer::new(self.credential.as_deref()?, "s3",
&self.config.region)
- .with_sign_payload(self.config.sign_payload);
+ .with_sign_payload(self.config.sign_payload)
+ .with_request_payer(self.config.request_payer);
if self.session_token {
let token = HeaderName::from_static("x-amz-s3session-token");
diff --git a/object_store/src/aws/credential.rs
b/object_store/src/aws/credential.rs
index 33972c6fa..ee2f8e2ec 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -101,11 +101,14 @@ pub struct AwsAuthorizer<'a> {
region: &'a str,
token_header: Option<HeaderName>,
sign_payload: bool,
+ request_payer: bool,
}
static DATE_HEADER: HeaderName = HeaderName::from_static("x-amz-date");
static HASH_HEADER: HeaderName =
HeaderName::from_static("x-amz-content-sha256");
static TOKEN_HEADER: HeaderName =
HeaderName::from_static("x-amz-security-token");
+static REQUEST_PAYER_HEADER: HeaderName =
HeaderName::from_static("x-amz-request-payer");
+static REQUEST_PAYER_HEADER_VALUE: HeaderValue =
HeaderValue::from_static("requester");
const ALGORITHM: &str = "AWS4-HMAC-SHA256";
impl<'a> AwsAuthorizer<'a> {
@@ -118,6 +121,7 @@ impl<'a> AwsAuthorizer<'a> {
date: None,
sign_payload: true,
token_header: None,
+ request_payer: false,
}
}
@@ -134,6 +138,14 @@ impl<'a> AwsAuthorizer<'a> {
self
}
+ /// Set whether to include requester pays headers
+ ///
+ ///
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html>
+ pub fn with_request_payer(mut self, request_payer: bool) -> Self {
+ self.request_payer = request_payer;
+ self
+ }
+
/// Authorize `request` with an optional pre-calculated SHA256 digest by
attaching
/// the relevant [AWS SigV4] headers
///
@@ -180,6 +192,15 @@ impl<'a> AwsAuthorizer<'a> {
let header_digest = HeaderValue::from_str(&digest).unwrap();
request.headers_mut().insert(&HASH_HEADER, header_digest);
+ if self.request_payer {
+ // For DELETE, GET, HEAD, POST, and PUT requests, include
x-amz-request-payer :
+ // requester in the header
+ //
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
+ request
+ .headers_mut()
+ .insert(&REQUEST_PAYER_HEADER,
REQUEST_PAYER_HEADER_VALUE.clone());
+ }
+
let (signed_headers, canonical_headers) =
canonicalize_headers(request.headers());
let scope = self.scope(date);
@@ -226,6 +247,13 @@ impl<'a> AwsAuthorizer<'a> {
.append_pair("X-Amz-Expires", &expires_in.as_secs().to_string())
.append_pair("X-Amz-SignedHeaders", "host");
+ if self.request_payer {
+ // For signed URLs, include x-amz-request-payer=requester in the
request
+ //
https://docs.aws.amazon.com/AmazonS3/latest/userguide/ObjectsinRequesterPaysBuckets.html
+ url.query_pairs_mut()
+ .append_pair("x-amz-request-payer", "requester");
+ }
+
// For S3, you must include the X-Amz-Security-Token query parameter
in the URL if
// using credentials sourced from the STS service.
if let Some(ref token) = self.credential.token {
@@ -763,12 +791,53 @@ mod tests {
region: "us-east-1",
sign_payload: true,
token_header: None,
+ request_payer: false,
};
signer.authorize(&mut request, None);
assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(),
"AWS4-HMAC-SHA256
Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request,
SignedHeaders=host;x-amz-content-sha256;x-amz-date,
Signature=a3c787a7ed37f7fdfbfd2d7056a3d7c9d85e6d52a2bfbec73793c0be6e7862d4")
}
+ #[test]
+ fn test_sign_with_signed_payload_request_payer() {
+ let client = Client::new();
+
+ // Test credentials from
https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
+ let credential = AwsCredential {
+ key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
+ secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
+ token: None,
+ };
+
+ // method = 'GET'
+ // service = 'ec2'
+ // host = 'ec2.amazonaws.com'
+ // region = 'us-east-1'
+ // endpoint = 'https://ec2.amazonaws.com'
+ // request_parameters = ''
+ let date = DateTime::parse_from_rfc3339("2022-08-06T18:01:34Z")
+ .unwrap()
+ .with_timezone(&Utc);
+
+ let mut request = client
+ .request(Method::GET, "https://ec2.amazon.com/")
+ .build()
+ .unwrap();
+
+ let signer = AwsAuthorizer {
+ date: Some(date),
+ credential: &credential,
+ service: "ec2",
+ region: "us-east-1",
+ sign_payload: true,
+ token_header: None,
+ request_payer: true,
+ };
+
+ signer.authorize(&mut request, None);
+ assert_eq!(request.headers().get(&AUTHORIZATION).unwrap(),
"AWS4-HMAC-SHA256
Credential=AKIAIOSFODNN7EXAMPLE/20220806/us-east-1/ec2/aws4_request,
SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-request-payer,
Signature=7030625a9e9b57ed2a40e63d749f4a4b7714b6e15004cab026152f870dd8565d")
+ }
+
#[test]
fn test_sign_with_unsigned_payload() {
let client = Client::new();
@@ -802,6 +871,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
+ request_payer: false,
};
authorizer.authorize(&mut request, None);
@@ -828,6 +898,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: false,
+ request_payer: false,
};
let mut url =
Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
@@ -848,6 +919,48 @@ mod tests {
);
}
+ #[test]
+ fn signed_get_url_request_payer() {
+ // Values from
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html
+ let credential = AwsCredential {
+ key_id: "AKIAIOSFODNN7EXAMPLE".to_string(),
+ secret_key: "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY".to_string(),
+ token: None,
+ };
+
+ let date = DateTime::parse_from_rfc3339("2013-05-24T00:00:00Z")
+ .unwrap()
+ .with_timezone(&Utc);
+
+ let authorizer = AwsAuthorizer {
+ date: Some(date),
+ credential: &credential,
+ service: "s3",
+ region: "us-east-1",
+ token_header: None,
+ sign_payload: false,
+ request_payer: true,
+ };
+
+ let mut url =
Url::parse("https://examplebucket.s3.amazonaws.com/test.txt").unwrap();
+ authorizer.sign(Method::GET, &mut url, Duration::from_secs(86400));
+
+ assert_eq!(
+ url,
+ Url::parse(
+ "https://examplebucket.s3.amazonaws.com/test.txt?\
+ X-Amz-Algorithm=AWS4-HMAC-SHA256&\
+
X-Amz-Credential=AKIAIOSFODNN7EXAMPLE%2F20130524%2Fus-east-1%2Fs3%2Faws4_request&\
+ X-Amz-Date=20130524T000000Z&\
+ X-Amz-Expires=86400&\
+ X-Amz-SignedHeaders=host&\
+ x-amz-request-payer=requester&\
+
X-Amz-Signature=9ad7c781cc30121f199b47d35ed3528473e4375b63c5d91cd87c927803e4e00a"
+ )
+ .unwrap()
+ );
+ }
+
#[test]
fn test_sign_port() {
let client = Client::new();
@@ -880,6 +993,7 @@ mod tests {
region: "us-east-1",
token_header: None,
sign_payload: true,
+ request_payer: false,
};
authorizer.authorize(&mut request, None);
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index b238d90eb..81511bad7 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -136,7 +136,8 @@ impl Signer for AmazonS3 {
/// ```
async fn signed_url(&self, method: Method, path: &Path, expires_in:
Duration) -> Result<Url> {
let credential = self.credentials().get_credential().await?;
- let authorizer = AwsAuthorizer::new(&credential, "s3",
&self.client.config.region);
+ let authorizer = AwsAuthorizer::new(&credential, "s3",
&self.client.config.region)
+ .with_request_payer(self.client.config.request_payer);
let path_url = self.path_url(path);
let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {