This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f383e764a feat(object_store): Azure url signing (#5259)
2f383e764a is described below

commit 2f383e764aa2b79e52d562e24eb0d1dce41f5ce7
Author: Robert Pack <[email protected]>
AuthorDate: Thu Jan 4 17:40:51 2024 +0100

    feat(object_store): Azure url signing (#5259)
    
    * refactor: move current signing to new AzureAuthrizer
    
    * feat: generate signed urls with master key
    
    * feat: sign with user delegated keys
    
    * chore: clippy
    
    * pr feedback
    
    * chore: clippy
    
    * pr feedback II
    
    * fix: move sigining test
---
 .gitignore                           |   1 +
 object_store/src/aws/mod.rs          |   1 +
 object_store/src/azure/client.rs     | 117 +++++++++++++-
 object_store/src/azure/credential.rs | 295 +++++++++++++++++++++++++++++++----
 object_store/src/azure/mod.rs        | 103 +++++++++++-
 object_store/src/lib.rs              |  23 +++
 object_store/src/signer.rs           |  16 ++
 7 files changed, 518 insertions(+), 38 deletions(-)

diff --git a/.gitignore b/.gitignore
index efdd9de5fb..0788daea01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,7 @@ parquet/data.parquet
 justfile
 .prettierignore
 .env
+.editorconfig
 # local azurite file
 __azurite*
 __blobstorage__
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 75b43d448b..20e7b032ab 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -372,6 +372,7 @@ mod tests {
         rename_and_copy(&integration).await;
         stream_get(&integration).await;
         multipart(&integration, &integration).await;
+        signing(&integration).await;
 
         tagging(&integration, !config.disable_tagging, |p| {
             let client = Arc::clone(&integration.client);
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 3c71e69da0..865e0a1a93 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -46,6 +46,7 @@ use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use std::collections::HashMap;
 use std::sync::Arc;
+use std::time::Duration;
 use url::Url;
 
 const VERSION_HEADER: &str = "x-ms-version-id";
@@ -101,6 +102,18 @@ pub(crate) enum Error {
 
     #[snafu(display("ETag required for conditional update"))]
     MissingETag,
+
+    #[snafu(display("Error requesting user delegation key: {}", source))]
+    DelegationKeyRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting user delegation key response body: {}", 
source))]
+    DelegationKeyResponseBody { source: reqwest::Error },
+
+    #[snafu(display("Got invalid user delegation key response: {}", source))]
+    DelegationKeyResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Generating SAS keys with SAS tokens auth is not 
supported"))]
+    SASforSASNotSupported,
 }
 
 impl From<Error> for crate::Error {
@@ -324,6 +337,78 @@ impl AzureClient {
         Ok(())
     }
 
+    /// Make a Get User Delegation Key request
+    /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/get-user-delegation-key>
+    async fn get_user_delegation_key(
+        &self,
+        start: &DateTime<Utc>,
+        end: &DateTime<Utc>,
+    ) -> Result<UserDelegationKey> {
+        let credential = self.get_credential().await?;
+        let url = self.config.service.clone();
+
+        let start = start.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
+        let expiry = end.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
+
+        let mut body = String::new();
+        body.push_str("<?xml version=\"1.0\" 
encoding=\"utf-8\"?>\n<KeyInfo>\n");
+        body.push_str(&format!(
+            "\t<Start>{start}</Start>\n\t<Expiry>{expiry}</Expiry>\n"
+        ));
+        body.push_str("</KeyInfo>");
+
+        let response = self
+            .client
+            .request(Method::POST, url)
+            .body(body)
+            .query(&[("restype", "service"), ("comp", "userdelegationkey")])
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(DelegationKeyRequestSnafu)?
+            .bytes()
+            .await
+            .context(DelegationKeyResponseBodySnafu)?;
+
+        let response: UserDelegationKey =
+            
quick_xml::de::from_reader(response.reader()).context(DelegationKeyResponseSnafu)?;
+
+        Ok(response)
+    }
+
+    /// Creat an AzureSigner for generating SAS tokens (pre-signed urls).
+    ///
+    /// Depending on the type of credential, this will either use the account 
key or a user delegation key.
+    /// Since delegation keys are acquired ad-hoc, the signer aloows for 
signing multiple urls with the same key.
+    pub async fn signer(&self, expires_in: Duration) -> Result<AzureSigner> {
+        let credential = self.get_credential().await?;
+        let signed_start = chrono::Utc::now();
+        let signed_expiry = signed_start + expires_in;
+        match credential.as_ref() {
+            AzureCredential::BearerToken(_) => {
+                let key = self
+                    .get_user_delegation_key(&signed_start, &signed_expiry)
+                    .await?;
+                let signing_key = AzureAccessKey::try_new(&key.value)?;
+                Ok(AzureSigner::new(
+                    signing_key,
+                    self.config.account.clone(),
+                    signed_start,
+                    signed_expiry,
+                    Some(key),
+                ))
+            }
+            AzureCredential::AccessKey(key) => Ok(AzureSigner::new(
+                key.to_owned(),
+                self.config.account.clone(),
+                signed_start,
+                signed_expiry,
+                None,
+            )),
+            _ => Err(Error::SASforSASNotSupported.into()),
+        }
+    }
+
     #[cfg(test)]
     pub async fn get_blob_tagging(&self, path: &Path) -> Result<Response> {
         let credential = self.get_credential().await?;
@@ -600,6 +685,18 @@ impl BlockList {
     }
 }
 
+#[derive(Debug, Clone, PartialEq, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub(crate) struct UserDelegationKey {
+    pub signed_oid: String,
+    pub signed_tid: String,
+    pub signed_start: String,
+    pub signed_expiry: String,
+    pub signed_service: String,
+    pub signed_version: String,
+    pub value: String,
+}
+
 #[cfg(test)]
 mod tests {
     use bytes::Bytes;
@@ -757,8 +854,7 @@ mod tests {
     <NextMarker/>
 </EnumerationResults>";
 
-        let mut _list_blobs_response_internal: ListResultInternal =
-            quick_xml::de::from_str(S).unwrap();
+        let _list_blobs_response_internal: ListResultInternal = 
quick_xml::de::from_str(S).unwrap();
     }
 
     #[test]
@@ -778,4 +874,21 @@ mod tests {
 
         assert_eq!(res, S)
     }
+
+    #[test]
+    fn test_delegated_key_response() {
+        const S: &str = r#"<?xml version="1.0" encoding="utf-8"?>
+<UserDelegationKey>
+    <SignedOid>String containing a GUID value</SignedOid>
+    <SignedTid>String containing a GUID value</SignedTid>
+    <SignedStart>String formatted as ISO date</SignedStart>
+    <SignedExpiry>String formatted as ISO date</SignedExpiry>
+    <SignedService>b</SignedService>
+    <SignedVersion>String specifying REST api version to use to create the 
user delegation key</SignedVersion>
+    <Value>String containing the user delegation key</Value>
+</UserDelegationKey>"#;
+
+        let _delegated_key_response_internal: UserDelegationKey =
+            quick_xml::de::from_str(S).unwrap();
+    }
 }
diff --git a/object_store/src/azure/credential.rs 
b/object_store/src/azure/credential.rs
index 2b8788d333..bfbbde8260 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -24,26 +24,27 @@ use crate::RetryConfig;
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
-use chrono::{DateTime, Utc};
-use reqwest::header::ACCEPT;
-use reqwest::{
-    header::{
-        HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_ENCODING, 
CONTENT_LANGUAGE,
-        CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, 
IF_NONE_MATCH,
-        IF_UNMODIFIED_SINCE, RANGE,
-    },
-    Client, Method, RequestBuilder,
+use chrono::{DateTime, SecondsFormat, Utc};
+use reqwest::header::{
+    HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, 
CONTENT_ENCODING, CONTENT_LANGUAGE,
+    CONTENT_LENGTH, CONTENT_TYPE, DATE, IF_MATCH, IF_MODIFIED_SINCE, 
IF_NONE_MATCH,
+    IF_UNMODIFIED_SINCE, RANGE,
 };
+use reqwest::{Client, Method, Request, RequestBuilder};
 use serde::Deserialize;
 use snafu::{ResultExt, Snafu};
 use std::borrow::Cow;
+use std::collections::HashMap;
+use std::fmt::Debug;
 use std::process::Command;
 use std::str;
 use std::sync::Arc;
 use std::time::{Duration, Instant, SystemTime};
 use url::Url;
 
-static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2021-08-06");
+use super::client::UserDelegationKey;
+
+static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
 static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
 pub(crate) static BLOB_TYPE: HeaderName = 
HeaderName::from_static("x-ms-blob-type");
 pub(crate) static DELETE_SNAPSHOTS: HeaderName = 
HeaderName::from_static("x-ms-delete-snapshots");
@@ -83,6 +84,9 @@ pub enum Error {
 
     #[snafu(display("Failed to parse azure cli response: {source}"))]
     AzureCliResponse { source: serde_json::Error },
+
+    #[snafu(display("Generating SAS keys with SAS tokens auth is not 
supported"))]
+    SASforSASNotSupported,
 }
 
 pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -97,7 +101,7 @@ impl From<Error> for crate::Error {
 }
 
 /// A shared Azure Storage Account Key
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Debug, Clone, Eq, PartialEq)]
 pub struct AzureAccessKey(Vec<u8>);
 
 impl AzureAccessKey {
@@ -137,33 +141,86 @@ pub mod authority_hosts {
     pub const AZURE_PUBLIC_CLOUD: &str = "https://login.microsoftonline.com";;
 }
 
-pub(crate) trait CredentialExt {
-    /// Apply authorization to requests against azure storage accounts
-    /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-requests-to-azure-storage>
-    fn with_azure_authorization(self, credential: &AzureCredential, account: 
&str) -> Self;
+pub(crate) struct AzureSigner {
+    signing_key: AzureAccessKey,
+    start: DateTime<Utc>,
+    end: DateTime<Utc>,
+    account: String,
+    delegation_key: Option<UserDelegationKey>,
 }
 
-impl CredentialExt for RequestBuilder {
-    fn with_azure_authorization(mut self, credential: &AzureCredential, 
account: &str) -> Self {
+impl AzureSigner {
+    pub fn new(
+        signing_key: AzureAccessKey,
+        account: String,
+        start: DateTime<Utc>,
+        end: DateTime<Utc>,
+        delegation_key: Option<UserDelegationKey>,
+    ) -> Self {
+        Self {
+            signing_key,
+            account,
+            start,
+            end,
+            delegation_key,
+        }
+    }
+
+    pub fn sign(&self, method: &Method, url: &mut Url) -> Result<()> {
+        let (str_to_sign, query_pairs) = match &self.delegation_key {
+            Some(delegation_key) => string_to_sign_user_delegation_sas(
+                url,
+                method,
+                &self.account,
+                &self.start,
+                &self.end,
+                delegation_key,
+            ),
+            None => string_to_sign_service_sas(url, method, &self.account, 
&self.start, &self.end),
+        };
+        let auth = hmac_sha256(&self.signing_key.0, str_to_sign);
+        url.query_pairs_mut().extend_pairs(query_pairs);
+        url.query_pairs_mut()
+            .append_pair("sig", BASE64_STANDARD.encode(auth).as_str());
+        Ok(())
+    }
+}
+
+/// Authorize a [`Request`] with an [`AzureAuthorizer`]
+#[derive(Debug)]
+pub struct AzureAuthorizer<'a> {
+    credential: &'a AzureCredential,
+    account: &'a str,
+}
+
+impl<'a> AzureAuthorizer<'a> {
+    /// Create a new [`AzureAuthorizer`]
+    pub fn new(credential: &'a AzureCredential, account: &'a str) -> Self {
+        AzureAuthorizer {
+            credential,
+            account,
+        }
+    }
+
+    /// Authorize `request`
+    pub fn authorize(&self, request: &mut Request) {
         // rfc2822 string should never contain illegal characters
         let date = Utc::now();
         let date_str = date.format(RFC1123_FMT).to_string();
         // we formatted the data string ourselves, so unwrapping should be fine
         let date_val = HeaderValue::from_str(&date_str).unwrap();
-        self = self
-            .header(DATE, &date_val)
-            .header(&VERSION, &AZURE_VERSION);
+        request.headers_mut().insert(DATE, date_val);
+        request
+            .headers_mut()
+            .insert(&VERSION, AZURE_VERSION.clone());
 
-        match credential {
+        match self.credential {
             AzureCredential::AccessKey(key) => {
-                let (client, request) = self.build_split();
-                let mut request = request.expect("request valid");
-
                 let signature = generate_authorization(
                     request.headers(),
                     request.url(),
                     request.method(),
-                    account,
+                    self.account,
                     key,
                 );
 
@@ -173,15 +230,40 @@ impl CredentialExt for RequestBuilder {
                     AUTHORIZATION,
                     HeaderValue::from_str(signature.as_str()).unwrap(),
                 );
-
-                Self::from_parts(client, request)
             }
-            AzureCredential::BearerToken(token) => self.bearer_auth(token),
-            AzureCredential::SASToken(query_pairs) => self.query(&query_pairs),
+            AzureCredential::BearerToken(token) => {
+                request.headers_mut().append(
+                    AUTHORIZATION,
+                    HeaderValue::from_str(format!("Bearer {}", 
token).as_str()).unwrap(),
+                );
+            }
+            AzureCredential::SASToken(query_pairs) => {
+                request
+                    .url_mut()
+                    .query_pairs_mut()
+                    .extend_pairs(query_pairs);
+            }
         }
     }
 }
 
+pub(crate) trait CredentialExt {
+    /// Apply authorization to requests against azure storage accounts
+    /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-requests-to-azure-storage>
+    fn with_azure_authorization(self, credential: &AzureCredential, account: 
&str) -> Self;
+}
+
+impl CredentialExt for RequestBuilder {
+    fn with_azure_authorization(self, credential: &AzureCredential, account: 
&str) -> Self {
+        let (client, request) = self.build_split();
+        let mut request = request.expect("request valid");
+
+        AzureAuthorizer::new(credential, account).authorize(&mut request);
+
+        Self::from_parts(client, request)
+    }
+}
+
 /// Generate signed key for authorization via access keys
 /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key>
 fn generate_authorization(
@@ -205,6 +287,152 @@ fn add_if_exists<'a>(h: &'a HeaderMap, key: &HeaderName) 
-> &'a str {
         .unwrap_or_default()
 }
 
+fn string_to_sign_sas(
+    u: &Url,
+    method: &Method,
+    account: &str,
+    start: &DateTime<Utc>,
+    end: &DateTime<Utc>,
+) -> (String, String, String, String, String) {
+    // NOTE: for now only blob signing is supported.
+    let signed_resource = "b".to_string();
+
+    // 
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#permissions-for-a-directory-container-or-blob
+    let signed_permissions = match *method {
+        // read and list permissions
+        Method::GET => match signed_resource.as_str() {
+            "c" => "rl",
+            "b" => "r",
+            _ => unreachable!(),
+        },
+        // write permissions (also allows crating a new blob in a sub-key)
+        Method::PUT => "w",
+        // delete permissions
+        Method::DELETE => "d",
+        // other methods are not used in any of the current operations
+        _ => "",
+    }
+    .to_string();
+    let signed_start = start.to_rfc3339_opts(SecondsFormat::Secs, true);
+    let signed_expiry = end.to_rfc3339_opts(SecondsFormat::Secs, true);
+    let canonicalized_resource = if 
u.host_str().unwrap_or_default().contains(account) {
+        format!("/blob/{}{}", account, u.path())
+    } else {
+        // NOTE: in case of the emulator, the account name is not part of the 
host
+        //      but the path starts with the account name
+        format!("/blob{}", u.path())
+    };
+
+    (
+        signed_resource,
+        signed_permissions,
+        signed_start,
+        signed_expiry,
+        canonicalized_resource,
+    )
+}
+
+/// Create a string to be signed for authorization via [service sas].
+///
+/// [service sas]: 
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas#version-2020-12-06-and-later
+fn string_to_sign_service_sas(
+    u: &Url,
+    method: &Method,
+    account: &str,
+    start: &DateTime<Utc>,
+    end: &DateTime<Utc>,
+) -> (String, HashMap<&'static str, String>) {
+    let (signed_resource, signed_permissions, signed_start, signed_expiry, 
canonicalized_resource) =
+        string_to_sign_sas(u, method, account, start, end);
+
+    let string_to_sign = format!(
+        "{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+        signed_permissions,
+        signed_start,
+        signed_expiry,
+        canonicalized_resource,
+        "",                               // signed identifier
+        "",                               // signed ip
+        "",                               // signed protocol
+        &AZURE_VERSION.to_str().unwrap(), // signed version
+        signed_resource,                  // signed resource
+        "",                               // signed snapshot time
+        "",                               // signed encryption scope
+        "",                               // rscc - response header: 
Cache-Control
+        "",                               // rscd - response header: 
Content-Disposition
+        "",                               // rsce - response header: 
Content-Encoding
+        "",                               // rscl - response header: 
Content-Language
+        "",                               // rsct - response header: 
Content-Type
+    );
+
+    let mut pairs = HashMap::new();
+    pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string());
+    pairs.insert("sp", signed_permissions);
+    pairs.insert("st", signed_start);
+    pairs.insert("se", signed_expiry);
+    pairs.insert("sr", signed_resource);
+
+    (string_to_sign, pairs)
+}
+
+/// Create a string to be signed for authorization via [user delegation sas].
+///
+/// [user delegation sas]: 
https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas#version-2020-12-06-and-later
+fn string_to_sign_user_delegation_sas(
+    u: &Url,
+    method: &Method,
+    account: &str,
+    start: &DateTime<Utc>,
+    end: &DateTime<Utc>,
+    delegation_key: &UserDelegationKey,
+) -> (String, HashMap<&'static str, String>) {
+    let (signed_resource, signed_permissions, signed_start, signed_expiry, 
canonicalized_resource) =
+        string_to_sign_sas(u, method, account, start, end);
+
+    let string_to_sign = format!(
+        
"{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}\n{}",
+        signed_permissions,
+        signed_start,
+        signed_expiry,
+        canonicalized_resource,
+        delegation_key.signed_oid,        // signed key object id
+        delegation_key.signed_tid,        // signed key tenant id
+        delegation_key.signed_start,      // signed key start
+        delegation_key.signed_expiry,     // signed key expiry
+        delegation_key.signed_service,    // signed key service
+        delegation_key.signed_version,    // signed key version
+        "",                               // signed authorized user object id
+        "",                               // signed unauthorized user object id
+        "",                               // signed correlation id
+        "",                               // signed ip
+        "",                               // signed protocol
+        &AZURE_VERSION.to_str().unwrap(), // signed version
+        signed_resource,                  // signed resource
+        "",                               // signed snapshot time
+        "",                               // signed encryption scope
+        "",                               // rscc - response header: 
Cache-Control
+        "",                               // rscd - response header: 
Content-Disposition
+        "",                               // rsce - response header: 
Content-Encoding
+        "",                               // rscl - response header: 
Content-Language
+        "",                               // rsct - response header: 
Content-Type
+    );
+
+    let mut pairs = HashMap::new();
+    pairs.insert("sv", AZURE_VERSION.to_str().unwrap().to_string());
+    pairs.insert("sp", signed_permissions);
+    pairs.insert("st", signed_start);
+    pairs.insert("se", signed_expiry);
+    pairs.insert("sr", signed_resource);
+    pairs.insert("skoid", delegation_key.signed_oid.clone());
+    pairs.insert("sktid", delegation_key.signed_tid.clone());
+    pairs.insert("skt", delegation_key.signed_start.clone());
+    pairs.insert("ske", delegation_key.signed_expiry.clone());
+    pairs.insert("sks", delegation_key.signed_service.clone());
+    pairs.insert("skv", delegation_key.signed_version.clone());
+
+    (string_to_sign, pairs)
+}
+
 /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#constructing-the-signature-string>
 fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, account: &str) -> 
String {
     // content length must only be specified if != 0
@@ -232,7 +460,7 @@ fn string_to_sign(h: &HeaderMap, u: &Url, method: &Method, 
account: &str) -> Str
         add_if_exists(h, &IF_UNMODIFIED_SINCE),
         add_if_exists(h, &RANGE),
         canonicalize_header(h),
-        canonicalized_resource(account, u)
+        canonicalize_resource(account, u)
     )
 }
 
@@ -257,7 +485,7 @@ fn canonicalize_header(headers: &HeaderMap) -> String {
 }
 
 /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/authorize-with-shared-key#constructing-the-canonicalized-resource-string>
-fn canonicalized_resource(account: &str, uri: &Url) -> String {
+fn canonicalize_resource(account: &str, uri: &Url) -> String {
     let mut can_res: String = String::new();
     can_res.push('/');
     can_res.push_str(account);
@@ -681,14 +909,15 @@ impl CredentialProvider for AzureCliCredential {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
-    use crate::client::mock_server::MockServer;
     use futures::executor::block_on;
     use hyper::body::to_bytes;
     use hyper::{Body, Response};
     use reqwest::{Client, Method};
     use tempfile::NamedTempFile;
 
+    use super::*;
+    use crate::client::mock_server::MockServer;
+
     #[tokio::test]
     async fn test_managed_identity() {
         let server = MockServer::new();
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index af0a4cefa1..712b7a36c5 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -27,22 +27,26 @@
 //! a way to drop old blocks. Instead unused blocks are automatically cleaned 
up
 //! after 7 days.
 use crate::{
-    multipart::{PartId, PutPart, WriteMultiPart},
+    multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
     path::Path,
+    signer::Signer,
     GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
     Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
+use reqwest::Method;
 use std::fmt::Debug;
 use std::sync::Arc;
+use std::time::Duration;
 use tokio::io::AsyncWrite;
+use url::Url;
 
 use crate::client::get::GetClientExt;
 use crate::client::list::ListClientExt;
 use crate::client::CredentialProvider;
-pub use credential::authority_hosts;
+pub use credential::{authority_hosts, AzureAccessKey, AzureAuthorizer};
 
 mod builder;
 mod client;
@@ -50,7 +54,6 @@ mod credential;
 
 /// [`CredentialProvider`] for [`MicrosoftAzure`]
 pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = 
AzureCredential>>;
-use crate::multipart::MultiPartStore;
 pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
 pub use credential::AzureCredential;
 
@@ -67,6 +70,11 @@ impl MicrosoftAzure {
     pub fn credentials(&self) -> &AzureCredentialProvider {
         &self.client.config().credentials
     }
+
+    /// Create a full URL to the resource specified by `path` with this 
instance's configuration.
+    fn path_url(&self, path: &Path) -> url::Url {
+        self.client.config().path_url(path)
+    }
 }
 
 impl std::fmt::Display for MicrosoftAzure {
@@ -128,6 +136,62 @@ impl ObjectStore for MicrosoftAzure {
     }
 }
 
+#[async_trait]
+impl Signer for MicrosoftAzure {
+    /// Create a URL containing the relevant [Service SAS] query parameters 
that authorize a request
+    /// via `method` to the resource at `path` valid for the duration 
specified in `expires_in`.
+    ///
+    /// [Service SAS]: 
https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas
+    ///
+    /// # Example
+    ///
+    /// This example returns a URL that will enable a user to upload a file to
+    /// "some-folder/some-file.txt" in the next hour.
+    ///
+    /// ```
+    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
+    /// # use object_store::{azure::MicrosoftAzureBuilder, path::Path, 
signer::Signer};
+    /// # use reqwest::Method;
+    /// # use std::time::Duration;
+    /// #
+    /// let azure = MicrosoftAzureBuilder::new()
+    ///     .with_account("my-account")
+    ///     .with_access_key("my-access-key")
+    ///     .with_container_name("my-container")
+    ///     .build()?;
+    ///
+    /// let url = azure.signed_url(
+    ///     Method::PUT,
+    ///     &Path::from("some-folder/some-file.txt"),
+    ///     Duration::from_secs(60 * 60)
+    /// ).await?;
+    /// #     Ok(())
+    /// # }
+    /// ```
+    async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url> {
+        let mut url = self.path_url(path);
+        let signer = self.client.signer(expires_in).await?;
+        signer.sign(&method, &mut url)?;
+        Ok(url)
+    }
+
+    async fn signed_urls(
+        &self,
+        method: Method,
+        paths: &[Path],
+        expires_in: Duration,
+    ) -> Result<Vec<Url>> {
+        let mut urls = Vec::with_capacity(paths.len());
+        let signer = self.client.signer(expires_in).await?;
+        for path in paths {
+            let mut url = self.path_url(path);
+            signer.sign(&method, &mut url)?;
+            urls.push(url);
+        }
+        Ok(urls)
+    }
+}
+
 /// Relevant docs: 
<https://azure.github.io/Storage/docs/application-and-user-data/basics/azure-blob-storage-upload-apis/>
 /// In Azure Blob Store, parts are "blocks"
 /// put_multipart_part -> PUT block
@@ -202,6 +266,7 @@ mod tests {
         stream_get(&integration).await;
         put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
+        signing(&integration).await;
 
         let validate = !integration.client.config().disable_tagging;
         tagging(&integration, validate, |p| {
@@ -211,6 +276,38 @@ mod tests {
         .await
     }
 
+    #[ignore = "Used for manual testing against a real storage account."]
+    #[tokio::test]
+    async fn test_user_delegation_key() {
+        let account = std::env::var("AZURE_ACCOUNT_NAME").unwrap();
+        let container = std::env::var("AZURE_CONTAINER_NAME").unwrap();
+        let client_id = std::env::var("AZURE_CLIENT_ID").unwrap();
+        let client_secret = std::env::var("AZURE_CLIENT_SECRET").unwrap();
+        let tenant_id = std::env::var("AZURE_TENANT_ID").unwrap();
+        let integration = MicrosoftAzureBuilder::new()
+            .with_account(account)
+            .with_container_name(container)
+            .with_client_id(client_id)
+            .with_client_secret(client_secret)
+            .with_tenant_id(&tenant_id)
+            .build()
+            .unwrap();
+
+        let data = Bytes::from("hello world");
+        let path = Path::from("file.txt");
+        integration.put(&path, data.clone()).await.unwrap();
+
+        let signed = integration
+            .signed_url(Method::GET, &path, Duration::from_secs(60))
+            .await
+            .unwrap();
+
+        let resp = reqwest::get(signed).await.unwrap();
+        let loaded = resp.bytes().await.unwrap();
+
+        assert_eq!(data, loaded);
+    }
+
     #[test]
     fn azure_test_config_get_value() {
         let azure_client_id = "object_store:fake_access_key_id".to_string();
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index b438254bdd..ab462cc156 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -2126,6 +2126,29 @@ mod tests {
         assert_eq!(meta.size, chunk_size * 2);
     }
 
+    #[cfg(any(feature = "azure", feature = "aws"))]
+    pub(crate) async fn signing<T>(integration: &T)
+    where
+        T: ObjectStore + crate::signer::Signer,
+    {
+        use reqwest::Method;
+        use std::time::Duration;
+
+        let data = Bytes::from("hello world");
+        let path = Path::from("file.txt");
+        integration.put(&path, data.clone()).await.unwrap();
+
+        let signed = integration
+            .signed_url(Method::GET, &path, Duration::from_secs(60))
+            .await
+            .unwrap();
+
+        let resp = reqwest::get(signed).await.unwrap();
+        let loaded = resp.bytes().await.unwrap();
+
+        assert_eq!(data, loaded);
+    }
+
     #[cfg(any(feature = "aws", feature = "azure"))]
     pub(crate) async fn tagging<F, Fut>(storage: &dyn ObjectStore, validate: 
bool, get_tags: F)
     where
diff --git a/object_store/src/signer.rs b/object_store/src/signer.rs
index ed92e28799..da55c689ae 100644
--- a/object_store/src/signer.rs
+++ b/object_store/src/signer.rs
@@ -31,4 +31,20 @@ pub trait Signer: Send + Sync + fmt::Debug + 'static {
     /// implementation's credentials such that the URL can be handed to 
something that doesn't have
     /// access to the object store's credentials, to allow limited access to 
the object store.
     async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url>;
+
+    /// Generate signed urls for multiple paths.
+    ///
+    /// See [`Signer::signed_url`] for more details.
+    async fn signed_urls(
+        &self,
+        method: Method,
+        paths: &[Path],
+        expires_in: Duration,
+    ) -> Result<Vec<Url>> {
+        let mut urls = Vec::with_capacity(paths.len());
+        for path in paths {
+            urls.push(self.signed_url(method.clone(), path, 
expires_in).await?);
+        }
+        Ok(urls)
+    }
 }

Reply via email to