tustvold commented on code in PR #2509:
URL: https://github.com/apache/arrow-rs/pull/2509#discussion_r953792823


##########
object_store/src/azure/client.rs:
##########
@@ -0,0 +1,746 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::credential::{AzureCredential, CredentialProvider};
+use crate::azure::credential::*;
+use crate::client::pagination::stream_paginated;
+use crate::client::retry::RetryExt;
+use crate::path::DELIMITER;
+use crate::util::{encode_path, format_http_range, format_prefix};
+use crate::{BoxStream, ListResult, ObjectMeta, Path, Result, RetryConfig, 
StreamExt};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, TimeZone, Utc};
+use itertools::Itertools;
+use reqwest::{
+    header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH, RANGE},
+    Client as ReqwestClient, Method, Response, StatusCode,
+};
+use serde::{Deserialize, Deserializer, Serialize};
+use snafu::{ResultExt, Snafu};
+use std::collections::HashMap;
+use std::ops::Range;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub(crate) enum Error {
+    #[snafu(display("Error performing get request {}: {}", path, source))]
+    GetRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing put request {}: {}", path, source))]
+    PutRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing delete request {}: {}", path, source))]
+    DeleteRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing copy request {}: {}", path, source))]
+    CopyRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing list request: {}", source))]
+    ListRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing create multipart request: {}", source))]
+    CreateMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing complete multipart request: {}", 
source))]
+    CompleteMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Got invalid list response: {}", source))]
+    InvalidListResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Got invalid multipart response: {}", source))]
+    InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Error authorizing request: {}", source))]
+    Authorization { source: crate::client::oauth::Error },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        match err {
+            Error::GetRequest { source, path }
+            | Error::DeleteRequest { source, path }
+            | Error::CopyRequest { source, path }
+            | Error::PutRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
+            {
+                Self::NotFound {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            Error::CopyRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::CONFLICT)) =>
+            {
+                Self::AlreadyExists {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            _ => Self::Generic {
+                store: "MicrosoftAzure",
+                source: Box::new(err),
+            },
+        }
+    }
+}
+
+/// Configuration for [AzureClient]
+#[derive(Debug)]
+pub struct AzureConfig {
+    pub account: String,
+    pub container: String,
+    pub credentials: CredentialProvider,
+    pub retry_config: RetryConfig,
+    pub allow_http: bool,
+    pub service: String,
+    pub is_emulator: bool,
+}
+
+impl AzureConfig {
+    fn path_url(&self, path: &Path) -> String {
+        if self.is_emulator {
+            format!(
+                "{}/{}/{}/{}",
+                self.service,
+                self.account,
+                self.container,
+                encode_path(path)
+            )
+        } else {
+            format!("{}/{}/{}", self.service, self.container, 
encode_path(path))
+        }
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct AzureClient {
+    config: AzureConfig,
+    client: ReqwestClient,
+}
+
+impl AzureClient {
+    /// create a new instance of [AzureClient]
+    pub fn new(config: AzureConfig) -> Self {
+        let client = reqwest::ClientBuilder::new()
+            .https_only(!config.allow_http)
+            .build()
+            .unwrap();
+
+        Self { config, client }
+    }
+
+    /// Returns the config
+    pub fn config(&self) -> &AzureConfig {
+        &self.config
+    }
+
+    async fn get_credential(&self) -> Result<AzureCredential> {
+        match &self.config.credentials {
+            CredentialProvider::AccessKey(key) => {
+                Ok(AzureCredential::AccessKey(key.to_owned()))
+            }
+            CredentialProvider::ClientSecret(cred) => {
+                let token = cred
+                    .fetch_token(&self.client, &self.config.retry_config)
+                    .await
+                    .context(AuthorizationSnafu)?;
+                Ok(AzureCredential::AuthorizationToken(
+                    // we do the conversion to a HeaderValue here, since it is 
fallible
+                    // and we wna to use it in an infallible function
+                    HeaderValue::from_str(&format!("Bearer {}", 
token)).map_err(
+                        |err| crate::Error::Generic {
+                            store: "MicrosoftAzure",
+                            source: Box::new(err),
+                        },
+                    )?,
+                ))
+            }
+            CredentialProvider::SASToken(sas) => {
+                Ok(AzureCredential::SASToken(sas.clone()))
+            }
+        }
+    }
+
+    /// Make an Azure PUT request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
+    pub async fn put_request<T: Serialize + crate::Debug + ?Sized + Sync>(
+        &self,
+        path: &Path,
+        bytes: Option<Bytes>,
+        is_block_op: bool,
+        query: &T,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+
+        let mut builder = self.client.request(Method::PUT, url);
+
+        if !is_block_op {
+            builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query);
+        } else {
+            builder = builder.query(query);
+        }
+
+        if let Some(bytes) = bytes {
+            builder = builder
+                .header(CONTENT_LENGTH, HeaderValue::from(bytes.len()))
+                .body(bytes)
+        } else {
+            builder = builder.header(CONTENT_LENGTH, 
HeaderValue::from_static("0"));
+        }
+
+        let response = builder
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(PutRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+
+    /// Make an Azure GET request
+    /// <https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob>
+    /// 
<https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties>
+    pub async fn get_request(
+        &self,
+        path: &Path,
+        range: Option<Range<usize>>,
+        head: bool,
+    ) -> Result<Response> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
+        };
+
+        let mut builder = self
+            .client
+            .request(method, url)
+            .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
+            .body(Bytes::new());
+
+        if let Some(range) = range {
+            builder = builder.header(RANGE, format_http_range(range));
+        }
+
+        let response = builder
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(GetRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+
+    /// Make an Azure Delete request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
+    pub async fn delete_request<T: Serialize + ?Sized + Sync>(
+        &self,
+        path: &Path,
+        query: &T,
+    ) -> Result<()> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(path);
+
+        self.client
+            .request(Method::DELETE, url)
+            .query(query)
+            .header(&DELETE_SNAPSHOTS, "include")
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(DeleteRequestSnafu {
+                path: path.as_ref(),
+            })?
+            .error_for_status()
+            .context(DeleteRequestSnafu {
+                path: path.as_ref(),
+            })?;
+
+        Ok(())
+    }
+
+    /// Make an Azure Copy request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob>
+    pub async fn copy_request(
+        &self,
+        from: &Path,
+        to: &Path,
+        overwrite: bool,
+    ) -> Result<()> {
+        let credential = self.get_credential().await?;
+        let url = self.config.path_url(to);
+        let mut source = self.config.path_url(from);
+
+        if let AzureCredential::SASToken(pairs) = self.get_credential().await? 
{
+            let query = pairs
+                .iter()
+                .map(|pair| format!("{}={}", pair.0, pair.1))
+                .join("&");
+            source = format!("{}?{}", source, query);

Review Comment:
   I can confirm that without this change SAS credentials don't work correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to