This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2f383e764a feat(object_store): Azure url signing (#5259)
2f383e764a is described below
commit 2f383e764aa2b79e52d562e24eb0d1dce41f5ce7
Author: Robert Pack <[email protected]>
AuthorDate: Thu Jan 4 17:40:51 2024 +0100
feat(object_store): Azure url signing (#5259)
* refactor: move current signing to new AzureAuthrizer
* feat: generate signed urls with master key
* feat: sign with user delegated keys
* chore: clippy
* pr feedback
* chore: clippy
* pr feedback II
* fix: move sigining test
---
.gitignore | 1 +
object_store/src/aws/mod.rs | 1 +
object_store/src/azure/client.rs | 117 +++++++++++++-
object_store/src/azure/credential.rs | 295 +++++++++++++++++++++++++++++++----
object_store/src/azure/mod.rs | 103 +++++++++++-
object_store/src/lib.rs | 23 +++
object_store/src/signer.rs | 16 ++
7 files changed, 518 insertions(+), 38 deletions(-)
diff --git a/.gitignore b/.gitignore
index efdd9de5fb..0788daea01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,7 @@ parquet/data.parquet
justfile
.prettierignore
.env
+.editorconfig
# local azurite file
__azurite*
__blobstorage__
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 75b43d448b..20e7b032ab 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -372,6 +372,7 @@ mod tests {
rename_and_copy(&integration).await;
stream_get(&integration).await;
multipart(&integration, &integration).await;
+ signing(&integration).await;
tagging(&integration, !config.disable_tagging, |p| {
let client = Arc::clone(&integration.client);
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 3c71e69da0..865e0a1a93 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::HashMap;
use std::sync::Arc;
+use std::time::Duration;
use url::Url;
const VERSION_HEADER: &str = "x-ms-version-id";
@@ -101,6 +102,18 @@ pub(crate) enum Error {
#[snafu(display("ETag required for conditional update"))]
MissingETag,
+
+ #[snafu(display("Error requesting user delegation key: {}", source))]
+ DelegationKeyRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting user delegation key response body: {}",
source))]
+ DelegationKeyResponseBody { source: reqwest::Error },
+
+ #[snafu(display("Got invalid user delegation key response: {}", source))]
+ DelegationKeyResponse { source: quick_xml::de::DeError },
+
+ #[snafu(display("Generating SAS keys with SAS tokens auth is not
supported"))]
+ SASforSASNotSupported,
}
impl From<Error> for crate::Error {
@@ -324,6 +337,78 @@ impl AzureClient {
Ok(())
}
+ /// Make a Get User Delegation Key request
+ ///
<https://docs.microsoft.com/en-us/rest/api/storageservices/get-user-delegation-key>
+ async fn get_user_delegation_key(
+ &self,
+ start: &DateTime<Utc>,
+ end: &DateTime<Utc>,
+ ) -> Result<UserDelegationKey> {
+ let credential = self.get_credential().await?;
+ let url = self.config.service.clone();
+
+ let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
+ let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
+
+ let mut body = String::new();
+ body.push_str("<?xml version=\"1.0\"
encoding=\"utf-8\"?>\n<KeyInfo>\n");
+ body.push_str(&format!(
+ "\t<Start>{start}</Start>\n\t<Expiry>{expiry}</Expiry>\n"
+ ));
+ body.push_str("</KeyInfo>");
+
+ let response = self
+ .client
+ .request(Method::POST, url)
+ .body(body)
+ .query(&[("restype", "service"), ("comp", "userdelegationkey")])
+ .with_azure_authorization(&credential, &self.config.account)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(DelegationKeyRequestSnafu)?
+ .bytes()
+ .await
+ .context(DelegationKeyResponseBodySnafu)?;
+
+ let response: UserDelegationKey =
+
quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?;
+
+ Ok(response)
+ }
+
+ /// Creat an AzureSigner for generating SAS tokens (pre-signed urls).
+ ///
+ /// Depending on the type of credential, this will either use the account
key or a user delegation key.
+ /// Since delegation keys are acquired ad-hoc, the signer aloows for
signing multiple urls with the same key.
+ pub async fn signer(&self, expires_in: Duration) -> Result<AzureSigner> {
+ let credential = self.get_credential().await?;
+ let signed_start = chrono::Utc::now();
+ let signed_expiry = signed_start + expires_in;
+ match credential.as_ref() {
+ AzureCredential::BearerToken(_) => {
+ let key = self
+ .get_user_delegation_key(&signed_start, &signed_expiry)
+ .await?;
+ let signing_key = AzureAccessKey::try_new(&key.value)?;
+ Ok(AzureSigner::new(
+ signing_key,
+ self.config.account.clone(),
+ signed_start,
+ signed_expiry,
+ Some(key),
+ ))
+ }
+ AzureCredential::AccessKey(key) => Ok(AzureSigner::new(
+ key.to_owned(),
+ self.config.account.clone(),
+ signed_start,
+ signed_expiry,
+ None,
+ )),
+ _ => Err(Error::SASforSASNotSupported.into()),
+ }
+ }
+
#[cfg(test)]
pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
let credential = self.get_credential().await?;
@@ -600,6 +685,18 @@ impl BlockList {
}
}
+#[derive(Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub(crate) struct UserDelegationKey {
+ pub signed_oid: String,
+ pub signed_tid: String,
+ pub signed_start: String,
+ pub signed_expiry: String,
+ pub signed_service: String,
+ pub signed_version: String,
+ pub value: String,
+}
+
#[cfg(test)]
mod tests {
use bytes::Bytes;
@@ -757,8 +854,7 @@ mod tests {
<NextMarker/>
</EnumerationResults>";
- let mut _list_blobs_response_internal: ListResultInternal =
- quick_xml::de::from_str(S).unwrap();
+ let _list_blobs_response_internal: ListResultInternal =
quick_xml::de::from_str(S).unwrap();
}
#[test]
@@ -778,4 +874,21 @@ mod tests {
assert_eq!(res, S)
}
+
+ #[test]
+ fn test_delegated_key_response() {
+ const S: &str = r#"<?xml version="1.0" encoding="utf-8"?>
+<UserDelegationKey>
+ <SignedOid>String containing a GUID value</SignedOid>
+ <SignedTid>String containing a GUID value</SignedTid>
+ <SignedStart>String formatted as ISO date</SignedStart>
+ <SignedExpiry>String formatted as ISO date</SignedExpiry>
+ <SignedService>b</SignedService>
+ <SignedVersion>String specifying REST api version to use to create the
user delegation key</SignedVersion>
+ <Value>String containing the user delegation key</Value>
+</UserDelegationKey>"#;
+
+ let _delegated_key_response_internal: UserDelegationKey =
+ quick_xml::de::from_str(S).unwrap();
+ }
}
diff --git a/object_store/src/azure/credential.rs
b/object_store/src/azure/credential.rs
index 2b8788d333..bfbbde8260 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -24,26 +24,27 @@ use crate::RetryConfig;
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
-use chrono::{DateTime, Utc};
-use reqwest::header::ACCEPT;
-use reqwest::{
- header::{
- HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING,
CONTENT_LANGUAGE,
- CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE,
IF_NONE_MATCH,
- IF_UNMODIFIED_SINCE, RANGE,
- },
- Client, Method, RequestBuilder,
+use chrono::{DateTime, SecondsFormat, Utc};
+use reqwest::header::{
+ HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION,
CONTENT_ENCODING, CONTENT_LANGUAGE,
+ CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE,
IF_NONE_MATCH,
+ IF_UNMODIFIED_SINCE, RANGE,
};
+use reqwest::{Client, Method, Request, RequestBuilder};
use serde::Deserialize;
use snafu::{ResultExt, Snafu};
use std::borrow::Cow;
+use std::collections::HashMap;
+use std::fmt::Debug;
use std::process::Command;
use std::str;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use url::Url;
-static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06");
+use super::client::UserDelegationKey;
+
+static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
pub(crate) static BLOB_TYPE: HeaderName =
HeaderName::from_static("x-ms-blob-type");
pub(crate) static DELETE_SNAPSHOTS: HeaderName =
HeaderName::from_static("x-ms-delete-snapshots");
@@ -83,6 +84,9 @@ pub enum Error {
#[snafu(display("Failed to parse azure cli response: {source}"))]
AzureCliResponse { source: serde_json::Error },
+
+ #[snafu(display("Generating SAS keys with SAS tokens auth is not
supported"))]
+ SASforSASNotSupported,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -97,7 +101,7 @@ impl From<Error> for crate::Error {
}
/// A shared Azure Storage Account Key
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Debug, Clone, Eq, PartialEq)]
pub struct AzureAccessKey(Vec<u8>);
impl AzureAccessKey {
@@ -137,33 +141,86 @@ pub mod authority_hosts {
pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com";
}
-pub(crate) trait CredentialExt {
- /// Apply authorization to requests against azure storage accounts
- ///
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-requests-to-azure-storage>
- fn with_azure_authorization(self, credential: &AzureCredential, account:
&str) -> Self;
+pub(crate) struct AzureSigner {
+ signing_key: AzureAccessKey,
+ start: DateTime<Utc>,
+ end: DateTime<Utc>,
+ account: String,
+ delegation_key: Option<UserDelegationKey>,
}
-impl CredentialExt for RequestBuilder {
- fn with_azure_authorization(mut self, credential: &AzureCredential,
account: &str) -> Self {
+impl AzureSigner {
+ pub fn new(
+ signing_key: AzureAccessKey,
+ account: String,
+ start: DateTime<Utc>,
+ end: DateTime<Utc>,
+ delegation_key: Option<UserDelegationKey>,
+ ) -> Self {
+ Self {
+ signing_key,
+ account,
+ start,
+ end,
+ delegation_key,
+ }
+ }
+
+ pub fn sign(&self, method: &Method, url: &mut Url) -> Result<()> {
+ let (str_to_sign, query_pairs) = match &self.delegation_key {
+ Some(delegation_key) => string_to_sign_user_delegation_sas(
+ url,
+ method,
+ &self.account,
+ &self.start,
+ &self.end,
+ delegation_key,
+ ),
+ None => string_to_sign_service_sas(url, method, &self.account,
&self.start, &self.end),
+ };
+ let auth = hmac_sha256(&self.signing_key.0, str_to_sign);
+ url.query_pairs_mut().extend_pairs(query_pairs);
+ url.query_pairs_mut()
+ .append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
+ Ok(())
+ }
+}
+
+/// Authorize a [`Request`] with an [`AzureAuthorizer`]
+#[derive(Debug)]
+pub struct AzureAuthorizer<'a> {
+ credential: &'a AzureCredential,
+ account: &'a str,
+}
+
+impl<'a> AzureAuthorizer<'a> {
+ /// Create a new [`AzureAuthorizer`]
+ pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self {
+ AzureAuthorizer {
+ credential,
+ account,
+ }
+ }
+
+ /// Authorize `request`
+ pub fn authorize(&self, request: &mut Request) {
// rfc2822 string should never contain illegal characters
let date = Utc::now();
let date_str = date.format(RFC1123_FMT).to_string();
// we formatted the data string ourselves, so unwrapping should be fine
let date_val = HeaderValue::from_str(&date_str).unwrap();
- self = self
- .header(DATE, &date_val)
- .header(&VERSION, &AZURE_VERSION);
+ request.headers_mut().insert(DATE, date_val);
+ request
+ .headers_mut()
+ .insert(&VERSION, AZURE_VERSION.clone());
- match credential {
+ match self.credential {
AzureCredential::AccessKey(key) => {
- let (client, request) = self.build_split();
- let mut request = request.expect("request valid");
-
let signature = generate_authorization(
request.headers(),
request.url(),
request.method(),
- account,
+ self.account,
key,
);
@@ -173,15 +230,40 @@ impl CredentialExt for RequestBuilder {
AUTHORIZATION,
HeaderValue::from_str(signature.as_str()).unwrap(),
);
-
- Self::from_parts(client, request)
}
- AzureCredential::BearerToken(token) => self.bearer_auth(token),
- AzureCredential::SASToken(query_pairs) => self.query(&query_pairs),
+ AzureCredential::BearerToken(token) => {
+ request.headers_mut().append(
+ AUTHORIZATION,
+ HeaderValue::from_str(format!("Bearer {}",
token).as_str()).unwrap(),
+ );
+ }
+ AzureCredential::SASToken(query_pairs) => {
+ request
+ .url_mut()
+ .query_pairs_mut()
+ .extend_pairs(query_pairs);
+ }
}
}
}
+pub(crate) trait CredentialExt {
+ /// Apply authorization to requests against azure storage accounts
+ ///
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-requests-to-azure-storage>
+ fn with_azure_authorization(self, credential: &AzureCredential, account:
&str) -> Self;
+}
+
+impl CredentialExt for RequestBuilder {
+ fn with_azure_authorization(self, credential: &AzureCredential, account:
&str) -> Self {
+ let (client, request) = self.build_split();
+ let mut request = request.expect("request valid");
+
+ AzureAuthorizer::new(credential, account).authorize(&mut request);
+
+ Self::from_parts(client, request)
+ }
+}
+
/// Generate signed key for authorization via access keys
///
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key>
fn generate_authorization(
@@ -205,6 +287,152 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName)
-> &'a str {
.unwrap_or_default()
}
+fn string_to_sign_sas(
+ u: &Url,
+ method: &Method,
+ account: &str,
+ start: &DateTime<Utc>,
+ end: &DateTime<Utc>,
+) -> (String, String, String, String, String) {
+ // NOTE: for now only blob signing is supported.
+ let signed_resource = "b".to_string();
+
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#permissions-for-a-directory-container-or-blob
+ let signed_permissions = match *method {
+ // read and list permissions
+ Method::GET => match signed_resource.as_str() {
+ "c" => "rl",
+ "b" => "r",
+ _ => unreachable!(),
+ },
+ // write permissions (also allows crating a new blob in a sub-key)
+ Method::PUT => "w",
+ // delete permissions
+ Method::DELETE => "d",
+ // other methods are not used in any of the current operations
+ _ => "",
+ }
+ .to_string();
+ let signed_start = start.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let signed_expiry = end.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let canonicalized_resource = if
u.host_str().unwrap_or_default().contains(account) {
+ format!("/blob/{}{}", account, u.path())
+ } else {
+ // NOTE: in case of the emulator, the account name is not part of the
host
+ // but the path starts with the account name
+ format!("/blob{}", u.path())
+ };
+
+ (
+ signed_resource,
+ signed_permissions,
+ signed_start,
+ signed_expiry,
+ canonicalized_resource,
+ )
+}
+
+/// Create a string to be signed for authorization via [service sas].
+///
+/// [service sas]:
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later
+fn string_to_sign_service_sas(
+ u: &Url,
+ method: &Method,
+ account: &str,
+ start: &DateTime<Utc>,
+ end: &DateTime<Utc>,
+) -> (String, HashMap<&'static str, String>) {
+ let (signed_resource, signed_permissions, signed_start, signed_expiry,
canonicalized_resource) =
+ string_to_sign_sas(u, method, account, start, end);
+
+ let string_to_sign = format!(
+ "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+ signed_permissions,
+ signed_start,
+ signed_expiry,
+ canonicalized_resource,
+ "", // signed identifier
+ "", // signed ip
+ "", // signed protocol
+ &AZURE_VERSION.to_str().unwrap(), // signed version
+ signed_resource, // signed resource
+ "", // signed snapshot time
+ "", // signed encryption scope
+ "", // rscc - response header:
Cache-Control
+ "", // rscd - response header:
Content-Disposition
+ "", // rsce - response header:
Content-Encoding
+ "", // rscl - response header:
Content-Language
+ "", // rsct - response header:
Content-Type
+ );
+
+ let mut pairs = HashMap::new();
+ pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string());
+ pairs.insert("sp", signed_permissions);
+ pairs.insert("st", signed_start);
+ pairs.insert("se", signed_expiry);
+ pairs.insert("sr", signed_resource);
+
+ (string_to_sign, pairs)
+}
+
+/// Create a string to be signed for authorization via [user delegation sas].
+///
+/// [user delegation sas]:
https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
+fn string_to_sign_user_delegation_sas(
+ u: &Url,
+ method: &Method,
+ account: &str,
+ start: &DateTime<Utc>,
+ end: &DateTime<Utc>,
+ delegation_key: &UserDelegationKey,
+) -> (String, HashMap<&'static str, String>) {
+ let (signed_resource, signed_permissions, signed_start, signed_expiry,
canonicalized_resource) =
+ string_to_sign_sas(u, method, account, start, end);
+
+ let string_to_sign = format!(
+
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+ signed_permissions,
+ signed_start,
+ signed_expiry,
+ canonicalized_resource,
+ delegation_key.signed_oid, // signed key object id
+ delegation_key.signed_tid, // signed key tenant id
+ delegation_key.signed_start, // signed key start
+ delegation_key.signed_expiry, // signed key expiry
+ delegation_key.signed_service, // signed key service
+ delegation_key.signed_version, // signed key version
+ "", // signed authorized user object id
+ "", // signed unauthorized user object id
+ "", // signed correlation id
+ "", // signed ip
+ "", // signed protocol
+ &AZURE_VERSION.to_str().unwrap(), // signed version
+ signed_resource, // signed resource
+ "", // signed snapshot time
+ "", // signed encryption scope
+ "", // rscc - response header:
Cache-Control
+ "", // rscd - response header:
Content-Disposition
+ "", // rsce - response header:
Content-Encoding
+ "", // rscl - response header:
Content-Language
+ "", // rsct - response header:
Content-Type
+ );
+
+ let mut pairs = HashMap::new();
+ pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string());
+ pairs.insert("sp", signed_permissions);
+ pairs.insert("st", signed_start);
+ pairs.insert("se", signed_expiry);
+ pairs.insert("sr", signed_resource);
+ pairs.insert("skoid", delegation_key.signed_oid.clone());
+ pairs.insert("sktid", delegation_key.signed_tid.clone());
+ pairs.insert("skt", delegation_key.signed_start.clone());
+ pairs.insert("ske", delegation_key.signed_expiry.clone());
+ pairs.insert("sks", delegation_key.signed_service.clone());
+ pairs.insert("skv", delegation_key.signed_version.clone());
+
+ (string_to_sign, pairs)
+}
+
///
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#constructing-the-signature-string>
fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) ->
String {
// content length must only be specified if != 0
@@ -232,7 +460,7 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method,
account: &str) -> Str
add_if_exists(h, &IF_UNMODIFIED_SINCE),
add_if_exists(h, &RANGE),
canonicalize_header(h),
- canonicalized_resource(account, u)
+ canonicalize_resource(account, u)
)
}
@@ -257,7 +485,7 @@ fn canonicalize_header(headers: &HeaderMap) -> String {
}
///
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#constructing-the-canonicalized-resource-string>
-fn canonicalized_resource(account: &str, uri: &Url) -> String {
+fn canonicalize_resource(account: &str, uri: &Url) -> String {
let mut can_res: String = String::new();
can_res.push('/');
can_res.push_str(account);
@@ -681,14 +909,15 @@ impl CredentialProvider for AzureCliCredential {
#[cfg(test)]
mod tests {
- use super::*;
- use crate::client::mock_server::MockServer;
use futures::executor::block_on;
use hyper::body::to_bytes;
use hyper::{Body, Response};
use reqwest::{Client, Method};
use tempfile::NamedTempFile;
+ use super::*;
+ use crate::client::mock_server::MockServer;
+
#[tokio::test]
async fn test_managed_identity() {
let server = MockServer::new();
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index af0a4cefa1..712b7a36c5 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -27,22 +27,26 @@
//! a way to drop old blocks. Instead unused blocks are automatically cleaned
up
//! after 7 days.
use crate::{
- multipart::{PartId, PutPart, WriteMultiPart},
+ multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
path::Path,
+ signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions, PutResult,
Result,
};
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
+use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
+use std::time::Duration;
use tokio::io::AsyncWrite;
+use url::Url;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
-pub use credential::authority_hosts;
+pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
mod builder;
mod client;
@@ -50,7 +54,6 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential =
AzureCredential>>;
-use crate::multipart::MultiPartStore;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;
@@ -67,6 +70,11 @@ impl MicrosoftAzure {
pub fn credentials(&self) -> &AzureCredentialProvider {
&self.client.config().credentials
}
+
+ /// Create a full URL to the resource specified by `path` with this
instance's configuration.
+ fn path_url(&self, path: &Path) -> url::Url {
+ self.client.config().path_url(path)
+ }
}
impl std::fmt::Display for MicrosoftAzure {
@@ -128,6 +136,62 @@ impl ObjectStore for MicrosoftAzure {
}
}
+#[async_trait]
+impl Signer for MicrosoftAzure {
+ /// Create a URL containing the relevant [Service SAS] query parameters
that authorize a request
+ /// via `method` to the resource at `path` valid for the duration
specified in `expires_in`.
+ ///
+ /// [Service SAS]:
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas
+ ///
+ /// # Example
+ ///
+ /// This example returns a URL that will enable a user to upload a file to
+ /// "some-folder/some-file.txt" in the next hour.
+ ///
+ /// ```
+ /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+ /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path,
signer::Signer};
+ /// # use reqwest::Method;
+ /// # use std::time::Duration;
+ /// #
+ /// let azure = MicrosoftAzureBuilder::new()
+ /// .with_account("my-account")
+ /// .with_access_key("my-access-key")
+ /// .with_container_name("my-container")
+ /// .build()?;
+ ///
+ /// let url = azure.signed_url(
+ /// Method::PUT,
+ /// &Path::from("some-folder/some-file.txt"),
+ /// Duration::from_secs(60 * 60)
+ /// ).await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ async fn signed_url(&self, method: Method, path: &Path, expires_in:
Duration) -> Result<Url> {
+ let mut url = self.path_url(path);
+ let signer = self.client.signer(expires_in).await?;
+ signer.sign(&method, &mut url)?;
+ Ok(url)
+ }
+
+ async fn signed_urls(
+ &self,
+ method: Method,
+ paths: &[Path],
+ expires_in: Duration,
+ ) -> Result<Vec<Url>> {
+ let mut urls = Vec::with_capacity(paths.len());
+ let signer = self.client.signer(expires_in).await?;
+ for path in paths {
+ let mut url = self.path_url(path);
+ signer.sign(&method, &mut url)?;
+ urls.push(url);
+ }
+ Ok(urls)
+ }
+}
+
/// Relevant docs:
<https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/>
/// In Azure Blob Store, parts are "blocks"
/// put_multipart_part -> PUT block
@@ -202,6 +266,7 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+ signing(&integration).await;
let validate = !integration.client.config().disable_tagging;
tagging(&integration, validate, |p| {
@@ -211,6 +276,38 @@ mod tests {
.await
}
+ #[ignore = "Used for manual testing against a real storage account."]
+ #[tokio::test]
+ async fn test_user_delegation_key() {
+ let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
+ let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
+ let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
+ let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
+ let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
+ let integration = MicrosoftAzureBuilder::new()
+ .with_account(account)
+ .with_container_name(container)
+ .with_client_id(client_id)
+ .with_client_secret(client_secret)
+ .with_tenant_id(&tenant_id)
+ .build()
+ .unwrap();
+
+ let data = Bytes::from("hello world");
+ let path = Path::from("file.txt");
+ integration.put(&path, data.clone()).await.unwrap();
+
+ let signed = integration
+ .signed_url(Method::GET, &path, Duration::from_secs(60))
+ .await
+ .unwrap();
+
+ let resp = reqwest::get(signed).await.unwrap();
+ let loaded = resp.bytes().await.unwrap();
+
+ assert_eq!(data, loaded);
+ }
+
#[test]
fn azure_test_config_get_value() {
let azure_client_id = "object_store:fake_access_key_id".to_string();
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b438254bdd..ab462cc156 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -2126,6 +2126,29 @@ mod tests {
assert_eq!(meta.size, chunk_size * 2);
}
+ #[cfg(any(feature = "azure", feature = "aws"))]
+ pub(crate) async fn signing<T>(integration: &T)
+ where
+ T: ObjectStore + crate::signer::Signer,
+ {
+ use reqwest::Method;
+ use std::time::Duration;
+
+ let data = Bytes::from("hello world");
+ let path = Path::from("file.txt");
+ integration.put(&path, data.clone()).await.unwrap();
+
+ let signed = integration
+ .signed_url(Method::GET, &path, Duration::from_secs(60))
+ .await
+ .unwrap();
+
+ let resp = reqwest::get(signed).await.unwrap();
+ let loaded = resp.bytes().await.unwrap();
+
+ assert_eq!(data, loaded);
+ }
+
#[cfg(any(feature = "aws", feature = "azure"))]
pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate:
bool, get_tags: F)
where
diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs
index ed92e28799..da55c689ae 100644
--- a/object_store/src/signer.rs
+++ b/object_store/src/signer.rs
@@ -31,4 +31,20 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static {
/// implementation's credentials such that the URL can be handed to
something that doesn't have
/// access to the object store's credentials, to allow limited access to
the object store.
async fn signed_url(&self, method: Method, path: &Path, expires_in:
Duration) -> Result<Url>;
+
+ /// Generate signed urls for multiple paths.
+ ///
+ /// See [`Signer::signed_url`] for more details.
+ async fn signed_urls(
+ &self,
+ method: Method,
+ paths: &[Path],
+ expires_in: Duration,
+ ) -> Result<Vec<Url>> {
+ let mut urls = Vec::with_capacity(paths.len());
+ for path in paths {
+ urls.push(self.signed_url(method.clone(), path,
expires_in).await?);
+ }
+ Ok(urls)
+ }
}